Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:15:36

0001 //------------------------------------------------------------------------------
0002 // Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN)
0003 // Author: Krzysztof Jamrog <krzysztof.piotr.jamrog@cern.ch>,
0004 //         Michal Simon <michal.simon@cern.ch>
0005 //------------------------------------------------------------------------------
0006 // This file is part of the XRootD software suite.
0007 //
0008 // XRootD is free software: you can redistribute it and/or modify
0009 // it under the terms of the GNU Lesser General Public License as published by
0010 // the Free Software Foundation, either version 3 of the License, or
0011 // (at your option) any later version.
0012 //
0013 // XRootD is distributed in the hope that it will be useful,
0014 // but WITHOUT ANY WARRANTY; without even the implied warranty of
0015 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
0016 // GNU General Public License for more details.
0017 //
0018 // You should have received a copy of the GNU Lesser General Public License
0019 // along with XRootD.  If not, see <http://www.gnu.org/licenses/>.
0020 //
0021 // In applying this licence, CERN does not waive the privileges and immunities
0022 // granted to it by virtue of its status as an Intergovernmental Organization
0023 // or submit itself to any jurisdiction.
0024 //------------------------------------------------------------------------------
0025 
0026 #ifndef __XRD_CL_OPERATIONS_HH__
0027 #define __XRD_CL_OPERATIONS_HH__
0028 
0029 #include <memory>
0030 #include <stdexcept>
0031 #include <sstream>
0032 #include <tuple>
0033 #include <future>
0034 #include "XrdCl/XrdClXRootDResponses.hh"
0035 #include "XrdCl/XrdClOperationHandlers.hh"
0036 #include "XrdCl/XrdClArg.hh"
0037 #include "XrdCl/XrdClOperationTimeout.hh"
0038 #include "XrdCl/XrdClFinalOperation.hh"
0039 #include "XrdSys/XrdSysPthread.hh"
0040 
0041 #include "XrdCl/XrdClResponseJob.hh"
0042 #include "XrdCl/XrdClJobManager.hh"
0043 #include "XrdCl/XrdClPostMaster.hh"
0044 #include "XrdCl/XrdClDefaultEnv.hh"
0045 
0046 namespace XrdCl
0047 {
0048 
0049   template<bool HasHndl> class Operation;
0050 
0051   class Pipeline;
0052 
0053 
0054   //----------------------------------------------------------------------------
0055   //! Type of the recovery function to be provided by the user
0056   //----------------------------------------------------------------------------
0057   typedef std::function<Operation<true>*(const XRootDStatus&)>  rcvry_func;
0058 
0059   //----------------------------------------------------------------------------
0060   //! Wrapper for ResponseHandler, used only internally to run next operation
0061   //! after previous one is finished
0062   //----------------------------------------------------------------------------
0063   class PipelineHandler: public ResponseHandler
0064   {
0065       template<bool> friend class Operation;
0066 
0067     public:
0068 
0069       //------------------------------------------------------------------------
0070       //! Constructor.
0071       //!
0072       //! @param handler  : the handler of our operation
0073       //------------------------------------------------------------------------
0074       PipelineHandler( ResponseHandler   *handler );
0075 
0076       //------------------------------------------------------------------------
0077       //! Default Constructor.
0078       //------------------------------------------------------------------------
0079       PipelineHandler()
0080       {
0081       }
0082 
0083       //------------------------------------------------------------------------
0084       //! Callback function.
0085       //------------------------------------------------------------------------
0086       void HandleResponseWithHosts( XRootDStatus *status, AnyObject *response,
0087           HostList *hostList );
0088 
0089       //------------------------------------------------------------------------
0090       //! Callback function.
0091       //------------------------------------------------------------------------
0092       void HandleResponse( XRootDStatus *status, AnyObject *response );
0093 
0094       //------------------------------------------------------------------------
0095       //! Destructor.
0096       //------------------------------------------------------------------------
0097       ~PipelineHandler()
0098       {
0099       }
0100 
0101       //------------------------------------------------------------------------
0102       //! Add new operation to the pipeline
0103       //!
0104       //! @param operation  :  operation to add
0105       //------------------------------------------------------------------------
0106       void AddOperation( Operation<true> *operation );
0107 
0108       //------------------------------------------------------------------------
0109       //! Set workflow to this and all next handlers. In the last handler
0110       //! it is used to finish workflow execution
0111       //!
0112       //! @param  prms         :  a promis that the pipeline will have a result
0113       //! @param  final        :  a callable that should be called at the end of
0114       //!                         pipeline
0115       //------------------------------------------------------------------------
0116       void Assign( const Timeout                            &timeout,
0117                    std::promise<XRootDStatus>                prms,
0118                    std::function<void(const XRootDStatus&)>  final,
0119                    Operation<true>                          *opr );
0120 
0121       //------------------------------------------------------------------------
0122       //! Assign the finalization routine
0123       //------------------------------------------------------------------------
0124       void Assign( std::function<void(const XRootDStatus&)>  final );
0125 
0126       //------------------------------------------------------------------------
0127       //! Called by a pipeline on the handler of its first operation before Run
0128       //------------------------------------------------------------------------
0129       void PreparePipelineStart();
0130 
0131     private:
0132 
0133       //------------------------------------------------------------------------
0134       //! Callback function implementation;
0135       //------------------------------------------------------------------------
0136       void HandleResponseImpl( XRootDStatus *status, AnyObject *response,
0137           HostList *hostList = nullptr );
0138 
0139       inline void dealloc( XRootDStatus *status, AnyObject *response,
0140           HostList *hostList )
0141       {
0142         delete status;
0143         delete response;
0144         delete hostList;
0145       }
0146 
0147       //------------------------------------------------------------------------
0148       //! The handler of our operation
0149       //------------------------------------------------------------------------
0150       std::unique_ptr<ResponseHandler> responseHandler;
0151 
0152       //------------------------------------------------------------------------
0153       //! The operation the handler is assigned to
0154       //------------------------------------------------------------------------
0155       std::unique_ptr<Operation<true>> currentOperation;
0156 
0157       //------------------------------------------------------------------------
0158       //! Next operation in the pipeline
0159       //------------------------------------------------------------------------
0160       std::unique_ptr<Operation<true>> nextOperation;
0161 
0162       //------------------------------------------------------------------------
0163       //! Pipeline timeout
0164       //------------------------------------------------------------------------
0165       Timeout timeout;
0166 
0167       //------------------------------------------------------------------------
0168       //! The promise that there will be a result (traveling along the pipeline)
0169       //------------------------------------------------------------------------
0170       std::promise<XRootDStatus> prms;
0171 
0172       //------------------------------------------------------------------------
0173       //! The lambda/function/functor that should be called at the end of the
0174       //! pipeline (traveling along the pipeline)
0175       //------------------------------------------------------------------------
0176       std::function<void(const XRootDStatus&)> final;
0177   };
0178 
0179   //----------------------------------------------------------------------------
0180   //! Operation template. An Operation is a once-use-only object - once executed
0181   //! by a Workflow engine it is invalidated. Also if used as an argument for
0182   //! >> or | the original object gets invalidated.
0183   //!
0184   //! @arg HasHndl : true if operation has a handler, false otherwise
0185   //----------------------------------------------------------------------------
0186   template<bool HasHndl>
0187   class Operation
0188   {
0189       // Declare friendship between templates
0190       template<bool>
0191       friend class Operation;
0192 
0193       friend std::future<XRootDStatus> Async( Pipeline, uint16_t );
0194 
0195       friend class Pipeline;
0196       friend class PipelineHandler;
0197 
0198     public:
0199 
0200       //------------------------------------------------------------------------
0201       //! Constructor
0202       //------------------------------------------------------------------------
0203       Operation() : valid( true )
0204       {
0205       }
0206 
0207       //------------------------------------------------------------------------
0208       //! Move constructor between template instances.
0209       //------------------------------------------------------------------------
0210       template<bool from>
0211       Operation( Operation<from> && op ) :
0212           handler( std::move( op.handler ) ), valid( true )
0213       {
0214         if( !op.valid ) throw std::invalid_argument( "Cannot construct "
0215             "Operation from an invalid Operation!" );
0216         op.valid = false;
0217       }
0218 
0219       //------------------------------------------------------------------------
0220       //! Destructor
0221       //------------------------------------------------------------------------
0222       virtual ~Operation()
0223       {
0224       }
0225 
0226       //------------------------------------------------------------------------
0227       //! Name of the operation.
0228       //------------------------------------------------------------------------
0229       virtual std::string ToString() = 0;
0230 
0231       //------------------------------------------------------------------------
0232       //! Move current object into newly allocated instance
0233       //!
0234       //! @return : the new instance
0235       //------------------------------------------------------------------------
0236       virtual Operation<HasHndl>* Move() = 0;
0237 
0238       //------------------------------------------------------------------------
0239       //! Move current object into newly allocated instance, and convert
0240       //! it into 'handled' operation.
0241       //!
0242       //! @return : the new instance
0243       //------------------------------------------------------------------------
0244       virtual Operation<true>* ToHandled() = 0;
0245 
0246     protected:
0247 
0248       //------------------------------------------------------------------------
0249       //! Run operation
0250       //!
0251       //! @param prms   : the promise that we will have a result
0252       //! @param final  : the object to call at the end of pipeline
0253       //------------------------------------------------------------------------
0254       void Run( Timeout                                   timeout,
0255                 std::promise<XRootDStatus>                prms,
0256                 std::function<void(const XRootDStatus&)>  final )
0257       {
0258         static_assert(HasHndl, "Only an operation that has a handler can be assigned to workflow");
0259         handler->Assign( timeout, std::move( prms ), std::move( final ), this );
0260 
0261         PipelineHandler *h = handler.release();
0262         XRootDStatus st;
0263         try
0264         {
0265           st = RunImpl( h, timeout );
0266         }
0267         catch( const operation_expired& ex )
0268         {
0269           st = XRootDStatus( stError, errOperationExpired );
0270         }
0271         catch( const PipelineException& ex ) // probably not needed
0272         {
0273           st = ex.GetError();
0274         }
0275         catch( const std::exception& ex )
0276         {
0277           st = XRootDStatus( stError, errInternal, 0, ex.what() );
0278         }
0279 
0280         if( !st.IsOK() ){
0281           ResponseJob *job = new ResponseJob(h, new XRootDStatus(st), 0, nullptr);
0282           DefaultEnv::GetPostMaster()->GetJobManager()->QueueJob(job);
0283         }
0284       }
0285 
0286       //------------------------------------------------------------------------
0287       //! Run the actual operation
0288       //!
0289       //! @param params :  container with parameters forwarded from
0290       //!                  previous operation
0291       //! @return       :  status of the operation
0292       //------------------------------------------------------------------------
0293       virtual XRootDStatus RunImpl( PipelineHandler *handler, uint16_t timeout ) = 0;
0294 
0295       //------------------------------------------------------------------------
0296       //! Add next operation in the pipeline
0297       //!
0298       //! @param op : operation to add
0299       //------------------------------------------------------------------------
0300       void AddOperation( Operation<true> *op )
0301       {
0302         if( handler )
0303           handler->AddOperation( op );
0304       }
0305 
0306       //------------------------------------------------------------------------
0307       //! Operation handler
0308       //------------------------------------------------------------------------
0309       std::unique_ptr<PipelineHandler> handler;
0310 
0311       //------------------------------------------------------------------------
0312       //! Flag indicating if it is a valid object
0313       //------------------------------------------------------------------------
0314       bool valid;
0315   };
0316 
0317   //----------------------------------------------------------------------------
0318   //! A wrapper around operation pipeline. A Pipeline is a once-use-only
0319   //! object - once executed by a Workflow engine it is invalidated.
0320   //!
0321   //! Takes ownership of given operation pipeline (which is in most would
0322   //! be a temporary object)
0323   //----------------------------------------------------------------------------
0324   class Pipeline
0325   {
0326       template<bool> friend class ParallelOperation;
0327       friend std::future<XRootDStatus> Async( Pipeline, uint16_t );
0328       friend class PipelineHandler;
0329 
0330     public:
0331 
0332       //------------------------------------------------------------------------
0333       //! Default constructor
0334       //------------------------------------------------------------------------
0335       Pipeline()
0336       {
0337       }
0338 
0339       //------------------------------------------------------------------------
0340       //! Constructor
0341       //------------------------------------------------------------------------
0342       Pipeline( Operation<true> *op ) :
0343           operation( op->Move() )
0344       {
0345       }
0346 
0347       //------------------------------------------------------------------------
0348       //! Constructor
0349       //------------------------------------------------------------------------
0350       Pipeline( Operation<true> &op ) :
0351           operation( op.Move() )
0352       {
0353       }
0354 
0355       //------------------------------------------------------------------------
0356       //! Constructor
0357       //------------------------------------------------------------------------
0358       Pipeline( Operation<true> &&op ) :
0359           operation( op.Move() )
0360       {
0361       }
0362 
0363       Pipeline( Operation<false> *op ) :
0364           operation( op->ToHandled() )
0365       {
0366       }
0367 
0368       //------------------------------------------------------------------------
0369       //! Constructor
0370       //------------------------------------------------------------------------
0371       Pipeline( Operation<false> &op ) :
0372           operation( op.ToHandled() )
0373       {
0374       }
0375 
0376       //------------------------------------------------------------------------
0377       //! Constructor
0378       //------------------------------------------------------------------------
0379       Pipeline( Operation<false> &&op ) :
0380           operation( op.ToHandled() )
0381       {
0382       }
0383 
0384       Pipeline( Pipeline &&pipe ) :
0385           operation( std::move( pipe.operation ) )
0386       {
0387       }
0388 
0389       //------------------------------------------------------------------------
0390       //! Constructor
0391       //------------------------------------------------------------------------
0392       Pipeline& operator=( Pipeline &&pipe )
0393       {
0394         operation = std::move( pipe.operation );
0395         return *this;
0396       }
0397 
0398       //------------------------------------------------------------------------
0399       //! Extend pipeline
0400       //------------------------------------------------------------------------
0401       Pipeline& operator|=( Operation<true>&& op )
0402       {
0403         operation->AddOperation( op.Move() );
0404         return *this;
0405       }
0406 
0407       //------------------------------------------------------------------------
0408       //! Extend pipeline
0409       //------------------------------------------------------------------------
0410       Pipeline& operator|=( Operation<false>&& op )
0411       {
0412         operation->AddOperation( op.ToHandled() );
0413         return *this;
0414       }
0415 
0416       //------------------------------------------------------------------------
0417       //! Conversion to Operation<true>
0418       //!
0419       //! @throws : std::logic_error if pipeline is invalid
0420       //------------------------------------------------------------------------
0421       operator Operation<true>&()
0422       {
0423         if( !bool( operation ) ) throw std::logic_error( "Invalid pipeline." );
0424         return *operation.get();
0425       }
0426 
0427       //------------------------------------------------------------------------
0428       //! Conversion to boolean
0429       //!
0430       //! @return : true if it's a valid pipeline, false otherwise
0431       //------------------------------------------------------------------------
0432       operator bool()
0433       {
0434         return bool( operation );
0435       }
0436 
0437       //------------------------------------------------------------------------
0438       //! Stop the current pipeline
0439       //!
0440       //! @param status : the final status for the pipeline
0441       //------------------------------------------------------------------------
0442       static void Stop( const XRootDStatus &status = XrdCl::XRootDStatus() );
0443 
0444       //------------------------------------------------------------------------
0445       //! Repeat current operation
0446       //------------------------------------------------------------------------
0447       static void Repeat();
0448 
0449       //------------------------------------------------------------------------
0450       //! Replace current operation
0451       //------------------------------------------------------------------------
0452       static void Replace( Operation<false> &&opr );
0453 
0454       //------------------------------------------------------------------------
0455       //! Replace with pipeline
0456       //------------------------------------------------------------------------
0457       static void Replace( Pipeline p );
0458 
0459       //------------------------------------------------------------------------
0460       //! Ignore error and proceed with the pipeline
0461       //------------------------------------------------------------------------
0462       static void Ignore();
0463 
0464     private:
0465 
0466       //------------------------------------------------------------------------
0467       //! Member access declaration, provides access to the underlying
0468       //! operation.
0469       //!
0470       //! @return : pointer to the underlying
0471       //------------------------------------------------------------------------
0472       Operation<true>* operator->()
0473       {
0474         return operation.get();
0475       }
0476 
0477       //------------------------------------------------------------------------
0478       //! Schedules the underlying pipeline for execution.
0479       //!
0480       //! @param timeout : pipeline timeout value
0481       //! @param final   : to be called at the end of the pipeline
0482       //------------------------------------------------------------------------
0483       void Run( Timeout timeout, std::function<void(const XRootDStatus&)> final = nullptr )
0484       {
0485         if( ftr.valid() )
0486           throw std::logic_error( "Pipeline is already running!" );
0487 
0488         // a promise that the pipe will have a result
0489         std::promise<XRootDStatus> prms;
0490         ftr = prms.get_future();
0491 
0492         if( !operation ) std::logic_error( "Empty pipeline!" );
0493 
0494         Operation<true> *opr = operation.release();
0495         PipelineHandler *h = opr->handler.get();
0496         if( h )
0497           h->PreparePipelineStart();
0498 
0499         opr->Run( timeout, std::move( prms ), std::move( final ) );
0500       }
0501 
0502       //------------------------------------------------------------------------
0503       //! First operation in the pipeline
0504       //------------------------------------------------------------------------
0505       std::unique_ptr<Operation<true>> operation;
0506 
0507       //------------------------------------------------------------------------
0508       //! The future result of the pipeline
0509       //------------------------------------------------------------------------
0510       std::future<XRootDStatus> ftr;
0511 
0512   };
0513 
0514   //----------------------------------------------------------------------------
0515   //! Helper function, schedules execution of given pipeline
0516   //!
0517   //! @param pipeline : the pipeline to be executed
0518   //! @param timeout  : the pipeline timeout
0519   //!
0520   //! @return         : future status of the operation
0521   //----------------------------------------------------------------------------
0522   inline std::future<XRootDStatus> Async( Pipeline pipeline, uint16_t timeout = 0 )
0523   {
0524     pipeline.Run( timeout );
0525     return std::move( pipeline.ftr );
0526   }
0527 
0528   //----------------------------------------------------------------------------
0529   //! Helper function, schedules execution of given pipeline and waits for
0530   //! the status
0531   //!
0532   //! @param pipeline : the pipeline to be executed
0533   //! @param timeout  : the pipeline timeout
0534   //!
0535   //! @return         : status of the operation
0536   //----------------------------------------------------------------------------
0537   inline XRootDStatus WaitFor( Pipeline pipeline, uint16_t timeout = 0 )
0538   {
0539     return Async( std::move( pipeline ), timeout ).get();
0540   }
0541 
0542   //----------------------------------------------------------------------------
0543   //! Concrete Operation template
0544   //! Defines | and >> operator as well as operation arguments.
0545   //!
0546   //! @arg Derived : the class that derives from this template (CRTP)
0547   //! @arg HasHndl : true if operation has a handler, false otherwise
0548   //! @arg Args    : operation arguments
0549   //----------------------------------------------------------------------------
0550   template<template<bool> class Derived, bool HasHndl, typename HdlrFactory, typename ... Args>
0551   class ConcreteOperation: public Operation<HasHndl>
0552   {
0553       template<template<bool> class, bool, typename, typename ...>
0554       friend class ConcreteOperation;
0555 
0556     public:
0557 
0558       //------------------------------------------------------------------------
0559       //! Constructor
0560       //!
0561       //! @param args : operation arguments
0562       //------------------------------------------------------------------------
0563       ConcreteOperation( Args&&... args ) : args( std::tuple<Args...>( std::move( args )... ) ),
0564                                             timeout( 0 )
0565       {
0566         static_assert( !HasHndl, "It is only possible to construct operation without handler" );
0567       }
0568 
0569       //------------------------------------------------------------------------
0570       //! Move constructor from other states
0571       //!
0572       //! @arg from : state from which the object is being converted
0573       //!
0574       //! @param op : the object that is being converted
0575       //------------------------------------------------------------------------
0576       template<bool from>
0577       ConcreteOperation( ConcreteOperation<Derived, from, HdlrFactory, Args...> && op ) :
0578         Operation<HasHndl>( std::move( op ) ), args( std::move( op.args ) ), timeout( 0 )
0579       {
0580       }
0581 
0582       //------------------------------------------------------------------------
0583       //! Adds ResponseHandler/function/functor/lambda/future handler for
0584       //! the operation.
0585       //!
0586       //! Note: due to reference collapsing this covers both l-value and
0587       //!       r-value references.
0588       //!
0589       //! @param hdlr : function/functor/lambda
0590       //------------------------------------------------------------------------
0591       template<typename Hdlr>
0592       Derived<true> operator>>( Hdlr &&hdlr )
0593       {
0594         return this->StreamImpl( HdlrFactory::Create( hdlr ) );
0595       }
0596 
0597       //------------------------------------------------------------------------
0598       //! Creates a pipeline of 2 or more operations
0599       //!
0600       //! @param op  : operation to add
0601       //!
0602       //! @return    : handled operation
0603       //------------------------------------------------------------------------
0604       Derived<true> operator|( Operation<true> &op )
0605       {
0606         return PipeImpl( *this, op );
0607       }
0608 
0609       //------------------------------------------------------------------------
0610       //! Creates a pipeline of 2 or more operations
0611       //!
0612       //! @param op :  operation to add
0613       //!
0614       //! @return   :  handled operation
0615       //------------------------------------------------------------------------
0616       Derived<true> operator|( Operation<true> &&op )
0617       {
0618         return PipeImpl( *this, op );
0619       }
0620 
0621       //------------------------------------------------------------------------
0622       //! Creates a pipeline of 2 or more operations
0623       //!
0624       //! @param op   operation to add
0625       //!
0626       //! @return     handled operation
0627       //------------------------------------------------------------------------
0628       Derived<true> operator|( Operation<false> &op )
0629       {
0630         return PipeImpl( *this, op );
0631       }
0632 
0633       //------------------------------------------------------------------------
0634       //! Creates a pipeline of 2 or more operations
0635       //!
0636       //! @param op  : operation to add
0637       //!
0638       //! @return    : handled operation
0639       //------------------------------------------------------------------------
0640       Derived<true> operator|( Operation<false> &&op )
0641       {
0642         return PipeImpl( *this, op );
0643       }
0644 
0645       //------------------------------------------------------------------------
0646       //! Adds a final operation to the pipeline
0647       //------------------------------------------------------------------------
0648       Derived<true> operator|( FinalOperation &&fo )
0649       {
0650         AllocHandler( *this );
0651         this->handler->Assign( fo.final );
0652         return this->template Transform<true>();
0653       }
0654 
0655       //------------------------------------------------------------------------
0656       //! Move current object into newly allocated instance
0657       //!
0658       //! @return : the new instance
0659       //------------------------------------------------------------------------
0660       inline Operation<HasHndl>* Move()
0661       {
0662         Derived<HasHndl> *me = static_cast<Derived<HasHndl>*>( this );
0663         return new Derived<HasHndl>( std::move( *me ) );
0664       }
0665 
0666       //------------------------------------------------------------------------
0667       //! Transform operation to handled
0668       //!
0669       //! @return Operation<true>&
0670       //------------------------------------------------------------------------
0671       inline Operation<true>* ToHandled()
0672       {
0673         this->handler.reset( new PipelineHandler() );
0674         Derived<HasHndl> *me = static_cast<Derived<HasHndl>*>( this );
0675         return new Derived<true>( std::move( *me ) );
0676       }
0677 
0678       //------------------------------------------------------------------------
0679       //! Set operation timeout
0680       //------------------------------------------------------------------------
0681       Derived<HasHndl> Timeout( uint16_t timeout )
0682       {
0683         this->timeout = timeout;
0684         Derived<HasHndl> *me = static_cast<Derived<HasHndl>*>( this );
0685         return std::move( *me );
0686       }
0687 
0688     protected:
0689 
0690       //------------------------------------------------------------------------
0691       //! Transform into a new instance with desired state
0692       //!
0693       //! @return : new instance in the desired state
0694       //------------------------------------------------------------------------
0695       template<bool to>
0696       inline Derived<to> Transform()
0697       {
0698         Derived<HasHndl> *me = static_cast<Derived<HasHndl>*>( this );
0699         return Derived<to>( std::move( *me ) );
0700       }
0701 
0702       //------------------------------------------------------------------------
0703       //! Implements operator>> functionality
0704       //!
0705       //! @param handler :  handler to be added
0706       //!
0707       //! @return   :  return an instance of Derived<true>
0708       //------------------------------------------------------------------------
0709       inline Derived<true> StreamImpl( ResponseHandler *handler )
0710       {
0711         static_assert( !HasHndl, "Operator >> is available only for operation without handler" );
0712         this->handler.reset( new PipelineHandler( handler ) );
0713         return Transform<true>();
0714       }
0715 
0716       //------------------------------------------------------------------------
0717       // Allocate handler if necessary
0718       //------------------------------------------------------------------------
0719       inline static
0720       void AllocHandler( ConcreteOperation<Derived, true, HdlrFactory, Args...> &me )
0721       {
0722         // nothing to do
0723       }
0724 
0725       //------------------------------------------------------------------------
0726       // Allocate handler if necessary
0727       //------------------------------------------------------------------------
0728       inline static
0729       void AllocHandler( ConcreteOperation<Derived, false, HdlrFactory, Args...> &me )
0730       {
0731         me.handler.reset( new PipelineHandler() );
0732       }
0733 
0734       //------------------------------------------------------------------------
0735       //! Implements operator| functionality
0736       //!
0737       //! @param me  :  reference to myself (*this)
0738       //! @param op  :  reference to the other operation
0739       //!
0740       //! @return    :  move-copy of myself
0741       //------------------------------------------------------------------------
0742       inline static
0743       Derived<true> PipeImpl( ConcreteOperation<Derived, HasHndl, HdlrFactory,
0744           Args...> &me, Operation<true> &op )
0745       {
0746         AllocHandler( me ); // if HasHndl is false allocate handler
0747         me.AddOperation( op.Move() );
0748         return me.template Transform<true>();
0749       }
0750 
0751       //------------------------------------------------------------------------
0752       //! Implements operator| functionality
0753       //!
0754       //! @param me  :  reference to myself (*this)
0755       //! @param op  :  reference to the other operation
0756       //!
0757       //! @return    :  move-copy of myself
0758       //------------------------------------------------------------------------
0759       inline static
0760       Derived<true> PipeImpl( ConcreteOperation<Derived, HasHndl, HdlrFactory,
0761           Args...> &me, Operation<false> &op )
0762       {
0763         AllocHandler( me ); // if HasHndl is false allocate handler
0764         me.AddOperation( op.ToHandled() );
0765         return me.template Transform<true>();
0766       }
0767 
0768       //------------------------------------------------------------------------
0769       //! Operation arguments
0770       //------------------------------------------------------------------------
0771       std::tuple<Args...> args;
0772 
0773       //------------------------------------------------------------------------
0774       //! Operation timeout
0775       //------------------------------------------------------------------------
0776       uint16_t timeout;
0777     };
0778 }
0779 
0780 #endif // __XRD_CL_OPERATIONS_HH__