Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-19 10:11:00

0001 //------------------------------------------------------------------------------
0002 // Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN)
0003 // Author: Krzysztof Jamrog <krzysztof.piotr.jamrog@cern.ch>,
0004 //         Michal Simon <michal.simon@cern.ch>
0005 //------------------------------------------------------------------------------
0006 // This file is part of the XRootD software suite.
0007 //
0008 // XRootD is free software: you can redistribute it and/or modify
0009 // it under the terms of the GNU Lesser General Public License as published by
0010 // the Free Software Foundation, either version 3 of the License, or
0011 // (at your option) any later version.
0012 //
0013 // XRootD is distributed in the hope that it will be useful,
0014 // but WITHOUT ANY WARRANTY; without even the implied warranty of
0015 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
0016 // GNU General Public License for more details.
0017 //
0018 // You should have received a copy of the GNU Lesser General Public License
0019 // along with XRootD.  If not, see <http://www.gnu.org/licenses/>.
0020 //
0021 // In applying this licence, CERN does not waive the privileges and immunities
0022 // granted to it by virtue of its status as an Intergovernmental Organization
0023 // or submit itself to any jurisdiction.
0024 //------------------------------------------------------------------------------
0025 
0026 #ifndef __XRD_CL_PARALLELOPERATION_HH__
0027 #define __XRD_CL_PARALLELOPERATION_HH__
0028 
0029 #include "XrdCl/XrdClOperations.hh"
0030 #include "XrdCl/XrdClOperationHandlers.hh"
0031 #include "XrdCl/XrdClDefaultEnv.hh"
0032 #include "XrdCl/XrdClPostMaster.hh"
0033 #include "XrdCl/XrdClJobManager.hh"
0034 
0035 #include <atomic>
0036 #include <condition_variable>
0037 #include <mutex>
0038 
0039 namespace XrdCl
0040 {
0041   
0042   //----------------------------------------------------------------------------
0043   // Interface for different execution policies:
0044   // - all      : all operations need to succeed in order for the parallel
0045   //              operation to be successful
0046   // - any      : just one of the operations needs to succeed in order for
0047   //              the parallel operation to be successful
0048   // - some     : n (user defined) operations need to succeed in order for
0049   //              the parallel operation to be successful
0050   // - at least : at least n (user defined) operations need to succeed in
0051   //              order for the parallel operation to be successful (the
0052   //              user handler will be called only when all operations are
0053   //              resolved)
0054   //
0055   // @param status : status returned by one of the aggregated operations
0056   //
0057   // @return       : true if the status should be passed to the user handler,
0058   //                 false otherwise.
0059   //----------------------------------------------------------------------------
0060   struct PolicyExecutor
0061   {
0062     virtual ~PolicyExecutor()
0063     {
0064     }
0065 
0066     virtual bool Examine( const XrdCl::XRootDStatus &status ) = 0;
0067 
0068     virtual XRootDStatus Result() = 0;
0069   };
0070 
0071   //----------------------------------------------------------------------------
0072   //! Parallel operations, allows to execute two or more pipelines in
0073   //! parallel.
0074   //!
0075   //! @arg state : describes current operation configuration state
0076   //!              (@see Operation)
0077   //----------------------------------------------------------------------------
0078   template<bool HasHndl>
0079   class ParallelOperation: public ConcreteOperation<ParallelOperation, HasHndl, Resp<void>>
0080   {
0081       template<bool> friend class ParallelOperation;
0082 
0083     public:
0084 
0085       //------------------------------------------------------------------------
0086       //! Constructor: copy-move a ParallelOperation in different state
0087       //------------------------------------------------------------------------
0088       template<bool from>
0089       ParallelOperation( ParallelOperation<from> &&obj ) :
0090           ConcreteOperation<ParallelOperation, HasHndl, Resp<void>>( std::move( obj ) ),
0091           pipelines( std::move( obj.pipelines ) ),
0092           policy( std::move( obj.policy ) )
0093       {
0094       }
0095 
0096       //------------------------------------------------------------------------
0097       //! Constructor
0098       //!
0099       //! @arg   Container : iterable container type
0100       //!
0101       //! @param container : iterable container with pipelines
0102       //------------------------------------------------------------------------
0103       template<class Container>
0104       ParallelOperation( Container &&container )
0105       {
0106         static_assert( !HasHndl, "Constructor is available only operation without handler");
0107 
0108         pipelines.reserve( container.size() );
0109         auto begin = std::make_move_iterator( container.begin() );
0110         auto end   = std::make_move_iterator( container.end() );
0111         std::copy( begin, end, std::back_inserter( pipelines ) );
0112         container.clear(); // there's junk inside so we clear it
0113       }
0114 
0115       ~ParallelOperation()
0116       {
0117       }
0118 
0119       //------------------------------------------------------------------------
0120       //! @return : operation name
0121       //------------------------------------------------------------------------
0122       std::string ToString()
0123       {
0124         std::ostringstream oss;
0125         oss << "Parallel(";
0126         for( size_t i = 0; i < pipelines.size(); i++ )
0127         {
0128           oss << pipelines[i]->ToString();
0129           if( i + 1  != pipelines.size() )
0130           {
0131             oss << " && ";
0132           }
0133         }
0134         oss << ")";
0135         return oss.str();
0136       }
0137 
0138       //------------------------------------------------------------------------
0139       //! Set policy to `All` (default)
0140       //!
0141       //! All operations need to succeed in order for the parallel operation to
0142       //! be successful.
0143       //------------------------------------------------------------------------
0144       ParallelOperation<HasHndl> All()
0145       {
0146         policy.reset( new AllPolicy() );
0147         return std::move( *this );
0148       }
0149 
0150       //------------------------------------------------------------------------
0151       //! Set policy to `Any`
0152       //!
0153       //! Just one of the operations needs to succeed in order for the parallel
0154       //! operation to be successful.
0155       //------------------------------------------------------------------------
0156       ParallelOperation<HasHndl> Any()
0157       {
0158         policy.reset( new AnyPolicy( pipelines.size() ) );
0159         return std::move( *this );
0160       }
0161 
0162       //------------------------------------------------------------------------
0163       // Set policy to `Some`
0164       //!
0165       //! n (user defined) operations need to succeed in order for the parallel
0166       //! operation to be successful.
0167       //------------------------------------------------------------------------
0168       ParallelOperation<HasHndl> Some( size_t threshold )
0169       {
0170         policy.reset( new SomePolicy( pipelines.size(), threshold ) );
0171         return std::move( *this );
0172       }
0173 
0174       //------------------------------------------------------------------------
0175       //! Set policy to `At Least`.
0176       //!
0177       //! At least n (user defined) operations need to succeed in order for the
0178       //! parallel operation to be successful (the user handler will be called
0179       //! only when all operations are resolved).
0180       //------------------------------------------------------------------------
0181       ParallelOperation<HasHndl> AtLeast( size_t threshold )
0182       {
0183         policy.reset( new AtLeastPolicy( pipelines.size(), threshold ) );
0184         return std::move( *this );
0185       }
0186 
0187     private:
0188 
0189       //------------------------------------------------------------------------
0190       //! `All` policy implementation
0191       //!
0192       //! All operations need to succeed in order for the parallel operation to
0193       //! be successful.
0194       //------------------------------------------------------------------------
0195       struct AllPolicy : public PolicyExecutor
0196       {
0197         bool Examine( const XrdCl::XRootDStatus &status )
0198         {
0199           // keep the status in case this is the final result
0200           res = status;
0201           if( status.IsOK() ) return false;
0202           // we require all request to succeed
0203           return true;
0204         }
0205 
0206         XRootDStatus Result()
0207         {
0208           return res;
0209         }
0210 
0211         XRootDStatus res;
0212       };
0213 
0214       //------------------------------------------------------------------------
0215       //! `Any` policy implementation
0216       //!
0217       //! Just one of the operations needs to succeed in order for the parallel
0218       //! operation to be successful.
0219       //------------------------------------------------------------------------
0220       struct AnyPolicy : public PolicyExecutor
0221       {
0222         AnyPolicy( size_t size) : cnt( size )
0223         {
0224         }
0225 
0226         bool Examine( const XrdCl::XRootDStatus &status )
0227         {
0228           // keep the status in case this is the final result
0229           res = status;
0230           // decrement the counter
0231           size_t nb = cnt.fetch_sub( 1, std::memory_order_relaxed );
0232           // we require just one operation to be successful
0233           if( status.IsOK() ) return true;
0234           // lets see if this is the last one?
0235           if( nb == 1 ) return true;
0236           // we still have a chance there will be one that is successful
0237           return false;
0238         }
0239 
0240         XRootDStatus Result()
0241         {
0242           return res;
0243         }
0244 
0245         private:
0246           std::atomic<size_t> cnt;
0247           XRootDStatus        res;
0248       };
0249 
0250       //------------------------------------------------------------------------
0251       //! `Some` policy implementation
0252       //!
0253       //! n (user defined) operations need to succeed in order for the parallel
0254       //! operation to be successful.
0255       //------------------------------------------------------------------------
0256       struct SomePolicy : PolicyExecutor
0257       {
0258         SomePolicy( size_t size, size_t threshold ) : failed( 0 ), succeeded( 0 ),
0259                                                       threshold( threshold ), size( size )
0260         {
0261         }
0262 
0263         bool Examine( const XrdCl::XRootDStatus &status )
0264         {
0265           // keep the status in case this is the final result
0266           res = status;
0267           if( status.IsOK() )
0268           {
0269             size_t s = succeeded.fetch_add( 1, std::memory_order_relaxed );
0270             if( s + 1 == threshold ) return true; // we reached the threshold
0271             // we are not yet there
0272             return false;
0273           }
0274           size_t f = failed.fetch_add( 1, std::memory_order_relaxed );
0275           // did we drop below the threshold
0276           if( f == size - threshold ) return true;
0277           // we still have a chance there will be enough of successful operations
0278           return false;
0279         }
0280 
0281         XRootDStatus Result()
0282         {
0283           return res;
0284         }
0285 
0286         private:
0287           std::atomic<size_t> failed;
0288           std::atomic<size_t> succeeded;
0289           const size_t        threshold;
0290           const size_t        size;
0291           XRootDStatus        res;
0292       };
0293 
0294       //------------------------------------------------------------------------
0295       //! `At Least` policy implementation
0296       //!
0297       //! At least n (user defined) operations need to succeed in order for the
0298       //! parallel operation to be successful (the user handler will be called
0299       //! only when all operations are resolved).
0300       //------------------------------------------------------------------------
0301       struct AtLeastPolicy : PolicyExecutor
0302       {
0303         AtLeastPolicy( size_t size, size_t threshold ) : pending_cnt( size ),
0304                                                          failed_cnt( 0 ),
0305                                                          failed_threshold( size - threshold )
0306         {
0307         }
0308 
0309         bool Examine( const XrdCl::XRootDStatus &status )
0310         {
0311           // update number of pending operations
0312           size_t pending = pending_cnt.fetch_sub( 1, std::memory_order_relaxed ) - 1;
0313           // although we might have the minimum to succeed we wait for the rest
0314           if( status.IsOK() ) return ( pending == 0 );
0315           size_t nb = failed_cnt.fetch_add( 1, std::memory_order_relaxed );
0316           if( nb == failed_threshold ) res = status; // we dropped below the threshold
0317           // if we still have to wait for pending operations return false,
0318           // otherwise all is done, return true
0319           return ( pending == 0 );
0320         }
0321 
0322         XRootDStatus Result()
0323         {
0324           return res;
0325         }
0326 
0327         private:
0328           std::atomic<size_t> pending_cnt;
0329           std::atomic<size_t> failed_cnt;
0330           const size_t        failed_threshold;
0331           XRootDStatus        res;
0332       };
0333 
0334       //------------------------------------------------------------------------
0335       //! A wait barrier helper class
0336       //------------------------------------------------------------------------
0337       struct barrier_t
0338       {
0339         barrier_t() : on( true ) { }
0340 
0341         void wait()
0342         {
0343           std::unique_lock<std::mutex> lck( mtx );
0344           if( on ) cv.wait( lck );
0345         }
0346 
0347         void lift()
0348         {
0349           std::unique_lock<std::mutex> lck( mtx );
0350           on = false;
0351           cv.notify_all();
0352         }
0353 
0354         private:
0355           std::condition_variable cv;
0356           std::mutex              mtx;
0357           bool                    on;
0358       };
0359 
0360       //------------------------------------------------------------------------
0361       //! Helper class for handling the PipelineHandler of the
0362       //! ParallelOperation (RAII).
0363       //!
0364       //! Guarantees that the handler will be executed exactly once.
0365       //------------------------------------------------------------------------
0366       struct Ctx
0367       {
0368         //----------------------------------------------------------------------
0369         //! Constructor.
0370         //!
0371         //! @param handler : the PipelineHandler of the Parallel operation
0372         //----------------------------------------------------------------------
0373         Ctx( PipelineHandler *handler, PolicyExecutor  *policy  ): handler( handler ),
0374                                                                    policy( policy )
0375         {
0376         }
0377 
0378         //----------------------------------------------------------------------
0379         //! Destructor.
0380         //----------------------------------------------------------------------
0381         ~Ctx()
0382         {
0383           Handle( XRootDStatus() );
0384         }
0385 
0386         //----------------------------------------------------------------------
0387         //! Forwards the status to the PipelineHandler if the handler haven't
0388         //! been called yet.
0389         //!
0390         //! @param st : status
0391         //----------------------------------------------------------------------
0392         inline void Examine( const XRootDStatus &st )
0393         {
0394           if( policy->Examine( st ) )
0395             Handle( policy->Result() );
0396         }
0397 
0398         //----------------------------------------------------------------------
0399         //! Forwards the status to the PipelineHandler if the handler haven't
0400         //! been called yet.
0401         //!
0402         //! @param st : status
0403         //---------------------------------------------------------------------
0404         inline void Handle( const XRootDStatus &st )
0405         {
0406             PipelineHandler* hdlr = handler.exchange( nullptr, std::memory_order_relaxed );
0407             if( hdlr )
0408             {
0409               barrier.wait();
0410               hdlr->HandleResponse( new XRootDStatus( st ), nullptr );
0411             }
0412         }
0413 
0414         //----------------------------------------------------------------------
0415         //! PipelineHandler of the ParallelOperation
0416         //----------------------------------------------------------------------
0417         std::atomic<PipelineHandler*> handler;
0418 
0419         //----------------------------------------------------------------------
0420         //! Policy defining when the user handler should be called
0421         //----------------------------------------------------------------------
0422         std::unique_ptr<PolicyExecutor> policy;
0423 
0424         //----------------------------------------------------------------------
0425         //! wait barrier that assures handler is called only after RunImpl
0426         //! started all pipelines
0427         //----------------------------------------------------------------------
0428         barrier_t barrier;
0429       };
0430 
0431       //------------------------------------------------------------------------
0432       //! The thread-pool job for schedule Ctx::Examine
0433       //------------------------------------------------------------------------
0434       struct PipelineEnd : public Job
0435       {
0436         //----------------------------------------------------------------------
0437         // Constructor
0438         //----------------------------------------------------------------------
0439         PipelineEnd( std::shared_ptr<Ctx>      &ctx,
0440                      const XrdCl::XRootDStatus &st ) : ctx( ctx ), st( st )
0441         {
0442         }
0443 
0444         //----------------------------------------------------------------------
0445         // Run Ctx::Examine in the thread-pool
0446         //----------------------------------------------------------------------
0447         void Run( void* )
0448         {
0449           ctx->Examine( st );
0450           delete this;
0451         }
0452 
0453         private:
0454           std::shared_ptr<Ctx> ctx; //< ParallelOperaion context
0455           XrdCl::XRootDStatus  st;  //< final status of the ParallelOperation
0456       };
0457 
0458       //------------------------------------------------------------------------
0459       //! Schedule Ctx::Examine to be executed in the client thread-pool
0460       //------------------------------------------------------------------------
0461       inline static
0462       void Schedule( std::shared_ptr<Ctx> &ctx, const XrdCl::XRootDStatus &st)
0463       {
0464         XrdCl::JobManager *mgr = XrdCl::DefaultEnv::GetPostMaster()->GetJobManager();
0465         PipelineEnd *end = new PipelineEnd( ctx, st );
0466         mgr->QueueJob( end, nullptr );
0467       }
0468 
0469       //------------------------------------------------------------------------
0470       //! Run operation
0471       //!
0472       //! @param params :  container with parameters forwarded from
0473       //!                  previous operation
0474       //! @return       :  status of the operation
0475       //------------------------------------------------------------------------
0476       XRootDStatus RunImpl( PipelineHandler *handler, uint16_t pipelineTimeout )
0477       {
0478         // make sure we have a valid policy for the parallel operation
0479         if( !policy ) policy.reset( new AllPolicy() );
0480 
0481         std::shared_ptr<Ctx> ctx =
0482             std::make_shared<Ctx>( handler, policy.release() );
0483 
0484         uint16_t timeout = pipelineTimeout < this->timeout ?
0485                            pipelineTimeout : this->timeout;
0486 
0487         for( size_t i = 0; i < pipelines.size(); ++i )
0488         {
0489           if( !pipelines[i] ) continue;
0490           pipelines[i].Run( timeout,
0491               [ctx]( const XRootDStatus &st ) mutable { Schedule( ctx, st ); } );
0492         }
0493 
0494         ctx->barrier.lift();
0495         return XRootDStatus();
0496       }
0497 
0498       std::vector<Pipeline>           pipelines;
0499       std::unique_ptr<PolicyExecutor> policy;
0500   };
0501 
0502   //----------------------------------------------------------------------------
0503   //! Factory function for creating parallel operation from a vector
0504   //----------------------------------------------------------------------------
0505   template<class Container>
0506   inline ParallelOperation<false> Parallel( Container &&container )
0507   {
0508     return ParallelOperation<false>( container );
0509   }
0510 
0511   //----------------------------------------------------------------------------
0512   //! Helper function for converting parameter pack into a vector
0513   //----------------------------------------------------------------------------
0514   inline void PipesToVec( std::vector<Pipeline>& )
0515   {
0516     // base case
0517   }
0518 
0519   //----------------------------------------------------------------------------
0520   // Declare PipesToVec (we need to do declare those functions ahead of
0521   // definitions, as they may call each other.
0522   //----------------------------------------------------------------------------
0523   template<typename ... Others>
0524   inline void PipesToVec( std::vector<Pipeline> &v, Operation<false> &operation,
0525       Others&... others );
0526 
0527   template<typename ... Others>
0528   inline void PipesToVec( std::vector<Pipeline> &v, Operation<true> &operation,
0529       Others&... others );
0530 
0531   template<typename ... Others>
0532   inline void PipesToVec( std::vector<Pipeline> &v, Pipeline &pipeline,
0533       Others&... others );
0534 
0535   //----------------------------------------------------------------------------
0536   // Define PipesToVec
0537   //----------------------------------------------------------------------------
0538   template<typename ... Others>
0539   void PipesToVec( std::vector<Pipeline> &v, Operation<false> &operation,
0540       Others&... others )
0541   {
0542     v.emplace_back( operation );
0543     PipesToVec( v, others... );
0544   }
0545 
0546   template<typename ... Others>
0547   void PipesToVec( std::vector<Pipeline> &v, Operation<true> &operation,
0548       Others&... others )
0549   {
0550     v.emplace_back( operation );
0551     PipesToVec( v, others... );
0552   }
0553 
0554   template<typename ... Others>
0555   void PipesToVec( std::vector<Pipeline> &v, Pipeline &pipeline,
0556       Others&... others )
0557   {
0558     v.emplace_back( std::move( pipeline ) );
0559     PipesToVec( v, others... );
0560   }
0561 
0562   //----------------------------------------------------------------------------
0563   //! Factory function for creating parallel operation from
0564   //! a given number of operations
0565   //! (we use && reference since due to reference collapsing this will fit
0566   //! both r- and l-value references)
0567   //----------------------------------------------------------------------------
0568   template<typename ... Operations>
0569   inline ParallelOperation<false> Parallel( Operations&& ... operations )
0570   {
0571     constexpr size_t size = sizeof...( operations );
0572     std::vector<Pipeline> v;
0573     v.reserve( size );
0574     PipesToVec( v, operations... );
0575     return Parallel( v );
0576   }
0577 }
0578 
0579 #endif // __XRD_CL_OPERATIONS_HH__