|
||||
File indexing completed on 2025-01-19 10:11:00
0001 //------------------------------------------------------------------------------ 0002 // Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN) 0003 // Author: Krzysztof Jamrog <krzysztof.piotr.jamrog@cern.ch>, 0004 // Michal Simon <michal.simon@cern.ch> 0005 //------------------------------------------------------------------------------ 0006 // This file is part of the XRootD software suite. 0007 // 0008 // XRootD is free software: you can redistribute it and/or modify 0009 // it under the terms of the GNU Lesser General Public License as published by 0010 // the Free Software Foundation, either version 3 of the License, or 0011 // (at your option) any later version. 0012 // 0013 // XRootD is distributed in the hope that it will be useful, 0014 // but WITHOUT ANY WARRANTY; without even the implied warranty of 0015 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 0016 // GNU General Public License for more details. 0017 // 0018 // You should have received a copy of the GNU Lesser General Public License 0019 // along with XRootD. If not, see <http://www.gnu.org/licenses/>. 0020 // 0021 // In applying this licence, CERN does not waive the privileges and immunities 0022 // granted to it by virtue of its status as an Intergovernmental Organization 0023 // or submit itself to any jurisdiction. 0024 //------------------------------------------------------------------------------ 0025 0026 #ifndef __XRD_CL_PARALLELOPERATION_HH__ 0027 #define __XRD_CL_PARALLELOPERATION_HH__ 0028 0029 #include "XrdCl/XrdClOperations.hh" 0030 #include "XrdCl/XrdClOperationHandlers.hh" 0031 #include "XrdCl/XrdClDefaultEnv.hh" 0032 #include "XrdCl/XrdClPostMaster.hh" 0033 #include "XrdCl/XrdClJobManager.hh" 0034 0035 #include <atomic> 0036 #include <condition_variable> 0037 #include <mutex> 0038 0039 namespace XrdCl 0040 { 0041 0042 //---------------------------------------------------------------------------- 0043 // Interface for different execution policies: 0044 // - all : all operations need to succeed in order for the parallel 0045 // operation to be successful 0046 // - any : just one of the operations needs to succeed in order for 0047 // the parallel operation to be successful 0048 // - some : n (user defined) operations need to succeed in order for 0049 // the parallel operation to be successful 0050 // - at least : at least n (user defined) operations need to succeed in 0051 // order for the parallel operation to be successful (the 0052 // user handler will be called only when all operations are 0053 // resolved) 0054 // 0055 // @param status : status returned by one of the aggregated operations 0056 // 0057 // @return : true if the status should be passed to the user handler, 0058 // false otherwise. 0059 //---------------------------------------------------------------------------- 0060 struct PolicyExecutor 0061 { 0062 virtual ~PolicyExecutor() 0063 { 0064 } 0065 0066 virtual bool Examine( const XrdCl::XRootDStatus &status ) = 0; 0067 0068 virtual XRootDStatus Result() = 0; 0069 }; 0070 0071 //---------------------------------------------------------------------------- 0072 //! Parallel operations, allows to execute two or more pipelines in 0073 //! parallel. 0074 //! 0075 //! @arg state : describes current operation configuration state 0076 //! (@see Operation) 0077 //---------------------------------------------------------------------------- 0078 template<bool HasHndl> 0079 class ParallelOperation: public ConcreteOperation<ParallelOperation, HasHndl, Resp<void>> 0080 { 0081 template<bool> friend class ParallelOperation; 0082 0083 public: 0084 0085 //------------------------------------------------------------------------ 0086 //! Constructor: copy-move a ParallelOperation in different state 0087 //------------------------------------------------------------------------ 0088 template<bool from> 0089 ParallelOperation( ParallelOperation<from> &&obj ) : 0090 ConcreteOperation<ParallelOperation, HasHndl, Resp<void>>( std::move( obj ) ), 0091 pipelines( std::move( obj.pipelines ) ), 0092 policy( std::move( obj.policy ) ) 0093 { 0094 } 0095 0096 //------------------------------------------------------------------------ 0097 //! Constructor 0098 //! 0099 //! @arg Container : iterable container type 0100 //! 0101 //! @param container : iterable container with pipelines 0102 //------------------------------------------------------------------------ 0103 template<class Container> 0104 ParallelOperation( Container &&container ) 0105 { 0106 static_assert( !HasHndl, "Constructor is available only operation without handler"); 0107 0108 pipelines.reserve( container.size() ); 0109 auto begin = std::make_move_iterator( container.begin() ); 0110 auto end = std::make_move_iterator( container.end() ); 0111 std::copy( begin, end, std::back_inserter( pipelines ) ); 0112 container.clear(); // there's junk inside so we clear it 0113 } 0114 0115 ~ParallelOperation() 0116 { 0117 } 0118 0119 //------------------------------------------------------------------------ 0120 //! @return : operation name 0121 //------------------------------------------------------------------------ 0122 std::string ToString() 0123 { 0124 std::ostringstream oss; 0125 oss << "Parallel("; 0126 for( size_t i = 0; i < pipelines.size(); i++ ) 0127 { 0128 oss << pipelines[i]->ToString(); 0129 if( i + 1 != pipelines.size() ) 0130 { 0131 oss << " && "; 0132 } 0133 } 0134 oss << ")"; 0135 return oss.str(); 0136 } 0137 0138 //------------------------------------------------------------------------ 0139 //! Set policy to `All` (default) 0140 //! 0141 //! All operations need to succeed in order for the parallel operation to 0142 //! be successful. 0143 //------------------------------------------------------------------------ 0144 ParallelOperation<HasHndl> All() 0145 { 0146 policy.reset( new AllPolicy() ); 0147 return std::move( *this ); 0148 } 0149 0150 //------------------------------------------------------------------------ 0151 //! Set policy to `Any` 0152 //! 0153 //! Just one of the operations needs to succeed in order for the parallel 0154 //! operation to be successful. 0155 //------------------------------------------------------------------------ 0156 ParallelOperation<HasHndl> Any() 0157 { 0158 policy.reset( new AnyPolicy( pipelines.size() ) ); 0159 return std::move( *this ); 0160 } 0161 0162 //------------------------------------------------------------------------ 0163 // Set policy to `Some` 0164 //! 0165 //! n (user defined) operations need to succeed in order for the parallel 0166 //! operation to be successful. 0167 //------------------------------------------------------------------------ 0168 ParallelOperation<HasHndl> Some( size_t threshold ) 0169 { 0170 policy.reset( new SomePolicy( pipelines.size(), threshold ) ); 0171 return std::move( *this ); 0172 } 0173 0174 //------------------------------------------------------------------------ 0175 //! Set policy to `At Least`. 0176 //! 0177 //! At least n (user defined) operations need to succeed in order for the 0178 //! parallel operation to be successful (the user handler will be called 0179 //! only when all operations are resolved). 0180 //------------------------------------------------------------------------ 0181 ParallelOperation<HasHndl> AtLeast( size_t threshold ) 0182 { 0183 policy.reset( new AtLeastPolicy( pipelines.size(), threshold ) ); 0184 return std::move( *this ); 0185 } 0186 0187 private: 0188 0189 //------------------------------------------------------------------------ 0190 //! `All` policy implementation 0191 //! 0192 //! All operations need to succeed in order for the parallel operation to 0193 //! be successful. 0194 //------------------------------------------------------------------------ 0195 struct AllPolicy : public PolicyExecutor 0196 { 0197 bool Examine( const XrdCl::XRootDStatus &status ) 0198 { 0199 // keep the status in case this is the final result 0200 res = status; 0201 if( status.IsOK() ) return false; 0202 // we require all request to succeed 0203 return true; 0204 } 0205 0206 XRootDStatus Result() 0207 { 0208 return res; 0209 } 0210 0211 XRootDStatus res; 0212 }; 0213 0214 //------------------------------------------------------------------------ 0215 //! `Any` policy implementation 0216 //! 0217 //! Just one of the operations needs to succeed in order for the parallel 0218 //! operation to be successful. 0219 //------------------------------------------------------------------------ 0220 struct AnyPolicy : public PolicyExecutor 0221 { 0222 AnyPolicy( size_t size) : cnt( size ) 0223 { 0224 } 0225 0226 bool Examine( const XrdCl::XRootDStatus &status ) 0227 { 0228 // keep the status in case this is the final result 0229 res = status; 0230 // decrement the counter 0231 size_t nb = cnt.fetch_sub( 1, std::memory_order_relaxed ); 0232 // we require just one operation to be successful 0233 if( status.IsOK() ) return true; 0234 // lets see if this is the last one? 0235 if( nb == 1 ) return true; 0236 // we still have a chance there will be one that is successful 0237 return false; 0238 } 0239 0240 XRootDStatus Result() 0241 { 0242 return res; 0243 } 0244 0245 private: 0246 std::atomic<size_t> cnt; 0247 XRootDStatus res; 0248 }; 0249 0250 //------------------------------------------------------------------------ 0251 //! `Some` policy implementation 0252 //! 0253 //! n (user defined) operations need to succeed in order for the parallel 0254 //! operation to be successful. 0255 //------------------------------------------------------------------------ 0256 struct SomePolicy : PolicyExecutor 0257 { 0258 SomePolicy( size_t size, size_t threshold ) : failed( 0 ), succeeded( 0 ), 0259 threshold( threshold ), size( size ) 0260 { 0261 } 0262 0263 bool Examine( const XrdCl::XRootDStatus &status ) 0264 { 0265 // keep the status in case this is the final result 0266 res = status; 0267 if( status.IsOK() ) 0268 { 0269 size_t s = succeeded.fetch_add( 1, std::memory_order_relaxed ); 0270 if( s + 1 == threshold ) return true; // we reached the threshold 0271 // we are not yet there 0272 return false; 0273 } 0274 size_t f = failed.fetch_add( 1, std::memory_order_relaxed ); 0275 // did we drop below the threshold 0276 if( f == size - threshold ) return true; 0277 // we still have a chance there will be enough of successful operations 0278 return false; 0279 } 0280 0281 XRootDStatus Result() 0282 { 0283 return res; 0284 } 0285 0286 private: 0287 std::atomic<size_t> failed; 0288 std::atomic<size_t> succeeded; 0289 const size_t threshold; 0290 const size_t size; 0291 XRootDStatus res; 0292 }; 0293 0294 //------------------------------------------------------------------------ 0295 //! `At Least` policy implementation 0296 //! 0297 //! At least n (user defined) operations need to succeed in order for the 0298 //! parallel operation to be successful (the user handler will be called 0299 //! only when all operations are resolved). 0300 //------------------------------------------------------------------------ 0301 struct AtLeastPolicy : PolicyExecutor 0302 { 0303 AtLeastPolicy( size_t size, size_t threshold ) : pending_cnt( size ), 0304 failed_cnt( 0 ), 0305 failed_threshold( size - threshold ) 0306 { 0307 } 0308 0309 bool Examine( const XrdCl::XRootDStatus &status ) 0310 { 0311 // update number of pending operations 0312 size_t pending = pending_cnt.fetch_sub( 1, std::memory_order_relaxed ) - 1; 0313 // although we might have the minimum to succeed we wait for the rest 0314 if( status.IsOK() ) return ( pending == 0 ); 0315 size_t nb = failed_cnt.fetch_add( 1, std::memory_order_relaxed ); 0316 if( nb == failed_threshold ) res = status; // we dropped below the threshold 0317 // if we still have to wait for pending operations return false, 0318 // otherwise all is done, return true 0319 return ( pending == 0 ); 0320 } 0321 0322 XRootDStatus Result() 0323 { 0324 return res; 0325 } 0326 0327 private: 0328 std::atomic<size_t> pending_cnt; 0329 std::atomic<size_t> failed_cnt; 0330 const size_t failed_threshold; 0331 XRootDStatus res; 0332 }; 0333 0334 //------------------------------------------------------------------------ 0335 //! A wait barrier helper class 0336 //------------------------------------------------------------------------ 0337 struct barrier_t 0338 { 0339 barrier_t() : on( true ) { } 0340 0341 void wait() 0342 { 0343 std::unique_lock<std::mutex> lck( mtx ); 0344 if( on ) cv.wait( lck ); 0345 } 0346 0347 void lift() 0348 { 0349 std::unique_lock<std::mutex> lck( mtx ); 0350 on = false; 0351 cv.notify_all(); 0352 } 0353 0354 private: 0355 std::condition_variable cv; 0356 std::mutex mtx; 0357 bool on; 0358 }; 0359 0360 //------------------------------------------------------------------------ 0361 //! Helper class for handling the PipelineHandler of the 0362 //! ParallelOperation (RAII). 0363 //! 0364 //! Guarantees that the handler will be executed exactly once. 0365 //------------------------------------------------------------------------ 0366 struct Ctx 0367 { 0368 //---------------------------------------------------------------------- 0369 //! Constructor. 0370 //! 0371 //! @param handler : the PipelineHandler of the Parallel operation 0372 //---------------------------------------------------------------------- 0373 Ctx( PipelineHandler *handler, PolicyExecutor *policy ): handler( handler ), 0374 policy( policy ) 0375 { 0376 } 0377 0378 //---------------------------------------------------------------------- 0379 //! Destructor. 0380 //---------------------------------------------------------------------- 0381 ~Ctx() 0382 { 0383 Handle( XRootDStatus() ); 0384 } 0385 0386 //---------------------------------------------------------------------- 0387 //! Forwards the status to the PipelineHandler if the handler haven't 0388 //! been called yet. 0389 //! 0390 //! @param st : status 0391 //---------------------------------------------------------------------- 0392 inline void Examine( const XRootDStatus &st ) 0393 { 0394 if( policy->Examine( st ) ) 0395 Handle( policy->Result() ); 0396 } 0397 0398 //---------------------------------------------------------------------- 0399 //! Forwards the status to the PipelineHandler if the handler haven't 0400 //! been called yet. 0401 //! 0402 //! @param st : status 0403 //--------------------------------------------------------------------- 0404 inline void Handle( const XRootDStatus &st ) 0405 { 0406 PipelineHandler* hdlr = handler.exchange( nullptr, std::memory_order_relaxed ); 0407 if( hdlr ) 0408 { 0409 barrier.wait(); 0410 hdlr->HandleResponse( new XRootDStatus( st ), nullptr ); 0411 } 0412 } 0413 0414 //---------------------------------------------------------------------- 0415 //! PipelineHandler of the ParallelOperation 0416 //---------------------------------------------------------------------- 0417 std::atomic<PipelineHandler*> handler; 0418 0419 //---------------------------------------------------------------------- 0420 //! Policy defining when the user handler should be called 0421 //---------------------------------------------------------------------- 0422 std::unique_ptr<PolicyExecutor> policy; 0423 0424 //---------------------------------------------------------------------- 0425 //! wait barrier that assures handler is called only after RunImpl 0426 //! started all pipelines 0427 //---------------------------------------------------------------------- 0428 barrier_t barrier; 0429 }; 0430 0431 //------------------------------------------------------------------------ 0432 //! The thread-pool job for schedule Ctx::Examine 0433 //------------------------------------------------------------------------ 0434 struct PipelineEnd : public Job 0435 { 0436 //---------------------------------------------------------------------- 0437 // Constructor 0438 //---------------------------------------------------------------------- 0439 PipelineEnd( std::shared_ptr<Ctx> &ctx, 0440 const XrdCl::XRootDStatus &st ) : ctx( ctx ), st( st ) 0441 { 0442 } 0443 0444 //---------------------------------------------------------------------- 0445 // Run Ctx::Examine in the thread-pool 0446 //---------------------------------------------------------------------- 0447 void Run( void* ) 0448 { 0449 ctx->Examine( st ); 0450 delete this; 0451 } 0452 0453 private: 0454 std::shared_ptr<Ctx> ctx; //< ParallelOperaion context 0455 XrdCl::XRootDStatus st; //< final status of the ParallelOperation 0456 }; 0457 0458 //------------------------------------------------------------------------ 0459 //! Schedule Ctx::Examine to be executed in the client thread-pool 0460 //------------------------------------------------------------------------ 0461 inline static 0462 void Schedule( std::shared_ptr<Ctx> &ctx, const XrdCl::XRootDStatus &st) 0463 { 0464 XrdCl::JobManager *mgr = XrdCl::DefaultEnv::GetPostMaster()->GetJobManager(); 0465 PipelineEnd *end = new PipelineEnd( ctx, st ); 0466 mgr->QueueJob( end, nullptr ); 0467 } 0468 0469 //------------------------------------------------------------------------ 0470 //! Run operation 0471 //! 0472 //! @param params : container with parameters forwarded from 0473 //! previous operation 0474 //! @return : status of the operation 0475 //------------------------------------------------------------------------ 0476 XRootDStatus RunImpl( PipelineHandler *handler, uint16_t pipelineTimeout ) 0477 { 0478 // make sure we have a valid policy for the parallel operation 0479 if( !policy ) policy.reset( new AllPolicy() ); 0480 0481 std::shared_ptr<Ctx> ctx = 0482 std::make_shared<Ctx>( handler, policy.release() ); 0483 0484 uint16_t timeout = pipelineTimeout < this->timeout ? 0485 pipelineTimeout : this->timeout; 0486 0487 for( size_t i = 0; i < pipelines.size(); ++i ) 0488 { 0489 if( !pipelines[i] ) continue; 0490 pipelines[i].Run( timeout, 0491 [ctx]( const XRootDStatus &st ) mutable { Schedule( ctx, st ); } ); 0492 } 0493 0494 ctx->barrier.lift(); 0495 return XRootDStatus(); 0496 } 0497 0498 std::vector<Pipeline> pipelines; 0499 std::unique_ptr<PolicyExecutor> policy; 0500 }; 0501 0502 //---------------------------------------------------------------------------- 0503 //! Factory function for creating parallel operation from a vector 0504 //---------------------------------------------------------------------------- 0505 template<class Container> 0506 inline ParallelOperation<false> Parallel( Container &&container ) 0507 { 0508 return ParallelOperation<false>( container ); 0509 } 0510 0511 //---------------------------------------------------------------------------- 0512 //! Helper function for converting parameter pack into a vector 0513 //---------------------------------------------------------------------------- 0514 inline void PipesToVec( std::vector<Pipeline>& ) 0515 { 0516 // base case 0517 } 0518 0519 //---------------------------------------------------------------------------- 0520 // Declare PipesToVec (we need to do declare those functions ahead of 0521 // definitions, as they may call each other. 0522 //---------------------------------------------------------------------------- 0523 template<typename ... Others> 0524 inline void PipesToVec( std::vector<Pipeline> &v, Operation<false> &operation, 0525 Others&... others ); 0526 0527 template<typename ... Others> 0528 inline void PipesToVec( std::vector<Pipeline> &v, Operation<true> &operation, 0529 Others&... others ); 0530 0531 template<typename ... Others> 0532 inline void PipesToVec( std::vector<Pipeline> &v, Pipeline &pipeline, 0533 Others&... others ); 0534 0535 //---------------------------------------------------------------------------- 0536 // Define PipesToVec 0537 //---------------------------------------------------------------------------- 0538 template<typename ... Others> 0539 void PipesToVec( std::vector<Pipeline> &v, Operation<false> &operation, 0540 Others&... others ) 0541 { 0542 v.emplace_back( operation ); 0543 PipesToVec( v, others... ); 0544 } 0545 0546 template<typename ... Others> 0547 void PipesToVec( std::vector<Pipeline> &v, Operation<true> &operation, 0548 Others&... others ) 0549 { 0550 v.emplace_back( operation ); 0551 PipesToVec( v, others... ); 0552 } 0553 0554 template<typename ... Others> 0555 void PipesToVec( std::vector<Pipeline> &v, Pipeline &pipeline, 0556 Others&... others ) 0557 { 0558 v.emplace_back( std::move( pipeline ) ); 0559 PipesToVec( v, others... ); 0560 } 0561 0562 //---------------------------------------------------------------------------- 0563 //! Factory function for creating parallel operation from 0564 //! a given number of operations 0565 //! (we use && reference since due to reference collapsing this will fit 0566 //! both r- and l-value references) 0567 //---------------------------------------------------------------------------- 0568 template<typename ... Operations> 0569 inline ParallelOperation<false> Parallel( Operations&& ... operations ) 0570 { 0571 constexpr size_t size = sizeof...( operations ); 0572 std::vector<Pipeline> v; 0573 v.reserve( size ); 0574 PipesToVec( v, operations... ); 0575 return Parallel( v ); 0576 } 0577 } 0578 0579 #endif // __XRD_CL_OPERATIONS_HH__
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.3.7 LXR engine. The LXR team |