Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:10:51

0001 /* @(#)root/multiproc:$Id$ */
0002 // Author: Enrico Guiraud July 2015
0003 // Modified: G Ganis Jan 2017
0004 
0005 /*************************************************************************
0006  * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers.               *
0007  * All rights reserved.                                                  *
0008  *                                                                       *
0009  * For the licensing terms see $ROOTSYS/LICENSE.                         *
0010  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
0011  *************************************************************************/
0012 
0013 #ifndef ROOT_TProcessExecutor
0014 #define ROOT_TProcessExecutor
0015 
0016 #include "MPCode.h"
0017 #include "MPSendRecv.h"
0018 #include "PoolUtils.h"
0019 #include "ROOT/TExecutorCRTP.hxx"
0020 #include "ROOT/TSeq.hxx"
0021 #include "ROOT/TypeTraits.hxx"
0022 #include "TError.h"
0023 #include "TFileCollection.h"
0024 #include "TFileInfo.h"
0025 #include "THashList.h"
0026 #include "TMPClient.h"
0027 #include "TMPWorkerExecutor.h"
0028 
0029 #include <algorithm> //std::generate
0030 #include <numeric> //std::iota
0031 #include <string>
0032 #include <functional> //std::reference_wrapper
0033 #include <vector>
0034 
0035 namespace ROOT {
0036 
0037 class TProcessExecutor : public TExecutorCRTP<TProcessExecutor>, private TMPClient {
0038    friend TExecutorCRTP;
0039 
0040 public:
0041    explicit TProcessExecutor(unsigned nWorkers = 0); //default number of workers is the number of processors
0042    ~TProcessExecutor() = default;
0043    //it doesn't make sense for a TProcessExecutor to be copied
0044    TProcessExecutor(const TProcessExecutor &) = delete;
0045    TProcessExecutor &operator=(const TProcessExecutor &) = delete;
0046 
0047    // Map
0048    //
0049    using TExecutorCRTP<TProcessExecutor>::Map;
0050 
0051    // MapReduce
0052    // Redefinition of the MapReduce classes of the base class, to adapt them to
0053    // TProcessExecutor's logic
0054    using TExecutorCRTP<TProcessExecutor>::MapReduce;
0055    template<class F, class R, class Cond = validMapReturnCond<F>>
0056    auto MapReduce(F func, unsigned nTimes, R redfunc) -> InvokeResult_t<F>;
0057    template<class F, class T, class R, class Cond = validMapReturnCond<F, T>>
0058    auto MapReduce(F func, std::vector<T> &args, R redfunc) -> InvokeResult_t<F, T>;
0059    template<class F, class T, class R, class Cond = validMapReturnCond<F, T>>
0060    auto MapReduce(F func, const std::vector<T> &args, R redfunc) -> InvokeResult_t<F, T>;
0061 
0062    // Reduce
0063    //
0064    using TExecutorCRTP<TProcessExecutor>::Reduce;
0065 
0066    void SetNWorkers(unsigned n) { TMPClient::SetNWorkers(n); }
0067 
0068    //////////////////////////////////////////////////////////////////////////
0069    /// \brief Return the number of pooled parallel workers.
0070    ///
0071    /// \return The number of workers in the pool.
0072    unsigned GetPoolSize() const { return TMPClient::GetNWorkers(); }
0073 
0074 private:
0075    // Implementation of the Map functions declared in the parent class (TExecutorCRTP)
0076    //
0077    template<class F, class Cond = validMapReturnCond<F>>
0078    auto MapImpl(F func, unsigned nTimes) -> std::vector<InvokeResult_t<F>>;
0079    template<class F, class INTEGER, class Cond = validMapReturnCond<F, INTEGER>>
0080    auto MapImpl(F func, ROOT::TSeq<INTEGER> args) -> std::vector<InvokeResult_t<F, INTEGER>>;
0081    template<class F, class T, class Cond = validMapReturnCond<F, T>>
0082    auto MapImpl(F func, std::vector<T> &args) -> std::vector<InvokeResult_t<F, T>>;
0083    template<class F, class T, class Cond = validMapReturnCond<F, T>>
0084    auto MapImpl(F func, const std::vector<T> &args) -> std::vector<InvokeResult_t<F, T>>;
0085 
0086    template<class T> void Collect(std::vector<T> &reslist);
0087    template<class T> void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector<T> &reslist);
0088 
0089    void Reset();
0090    void ReplyToFuncResult(TSocket *s);
0091    void ReplyToIdle(TSocket *s);
0092 
0093    unsigned fNProcessed; ///< number of arguments already passed to the workers
0094    unsigned fNToProcess; ///< total number of arguments to pass to the workers
0095 
0096    /// A collection of the types of tasks that TProcessExecutor can execute.
0097    /// It is used to interpret in the right way and properly reply to the
0098    /// messages received (see, for example, TProcessExecutor::HandleInput)
0099    enum class ETask : unsigned char {
0100       kNoTask,       ///< no task is being executed
0101       kMap,          ///< a Map method with no arguments is being executed
0102       kMapWithArg,   ///< a Map method with arguments is being executed
0103       kMapRed,       ///< a MapReduce method with no arguments is being executed
0104       kMapRedWithArg ///< a MapReduce method with arguments is being executed
0105    };
0106 
0107    ETask fTaskType = ETask::kNoTask; ///< the kind of task that is being executed, if any
0108 };
0109 
0110 
0111 /************ TEMPLATE METHODS IMPLEMENTATION ******************/
0112 
0113 //////////////////////////////////////////////////////////////////////////
0114 /// \brief Execute a function without arguments several times in parallel.
0115 /// Implementation of the Map method.
0116 ///
0117 /// \copydetails TExecutorCRTP::Map(F func,unsigned nTimes)
0118 template<class F, class Cond>
0119 auto TProcessExecutor::MapImpl(F func, unsigned nTimes) -> std::vector<InvokeResult_t<F>>
0120 {
0121    using retType = decltype(func());
0122    //prepare environment
0123    Reset();
0124    fTaskType = ETask::kMap;
0125 
0126    //fork max(nTimes, fNWorkers) times
0127    unsigned oldNWorkers = GetPoolSize();
0128    if (nTimes < oldNWorkers)
0129       SetNWorkers(nTimes);
0130    TMPWorkerExecutor<F> worker(func);
0131    bool ok = Fork(worker);
0132    SetNWorkers(oldNWorkers);
0133    if (!ok)
0134    {
0135       Error("TProcessExecutor::Map", "[E][C] Could not fork. Aborting operation.");
0136       return std::vector<retType>();
0137    }
0138 
0139    //give out tasks
0140    fNToProcess = nTimes;
0141    std::vector<retType> reslist;
0142    reslist.reserve(fNToProcess);
0143    fNProcessed = Broadcast(MPCode::kExecFunc, fNToProcess);
0144 
0145    //collect results, give out other tasks if needed
0146    Collect(reslist);
0147 
0148    //clean-up and return
0149    ReapWorkers();
0150    fTaskType = ETask::kNoTask;
0151    return reslist;
0152 }
0153 
0154 //////////////////////////////////////////////////////////////////////////
0155 /// \brief Execute a function over the elements of a vector in parallel
0156 /// Implementation of the Map method.
0157 ///
0158 /// \copydetails TExecutorCRTP::Map(F func,std::vector<T> &args)
0159 template<class F, class T, class Cond>
0160 auto TProcessExecutor::MapImpl(F func, std::vector<T> &args) -> std::vector<InvokeResult_t<F, T>>
0161 {
0162    //check whether func is callable
0163    using retType = decltype(func(args.front()));
0164    //prepare environment
0165    Reset();
0166    fTaskType = ETask::kMapWithArg;
0167 
0168    //fork max(args.size(), fNWorkers) times
0169    //N.B. from this point onwards, args is filled with undefined (but valid) values, since TMPWorkerExecutor moved its content away
0170    unsigned oldNWorkers = GetPoolSize();
0171    if (args.size() < oldNWorkers)
0172       SetNWorkers(args.size());
0173    TMPWorkerExecutor<F, T> worker(func, args);
0174    bool ok = Fork(worker);
0175    SetNWorkers(oldNWorkers);
0176    if (!ok)
0177    {
0178       Error("TProcessExecutor::Map", "[E][C] Could not fork. Aborting operation.");
0179       return std::vector<retType>();
0180    }
0181 
0182    //give out tasks
0183    fNToProcess = args.size();
0184    std::vector<retType> reslist;
0185    reslist.reserve(fNToProcess);
0186    std::vector<unsigned> range(fNToProcess);
0187    std::iota(range.begin(), range.end(), 0);
0188    fNProcessed = Broadcast(MPCode::kExecFuncWithArg, range);
0189 
0190    //collect results, give out other tasks if needed
0191    Collect(reslist);
0192 
0193    //clean-up and return
0194    ReapWorkers();
0195    fTaskType = ETask::kNoTask;
0196    return reslist;
0197 }
0198 
0199 //////////////////////////////////////////////////////////////////////////
0200 /// \brief Execute a function over the elements of an immutable vector in parallel
0201 /// Implementation of the Map method.
0202 ///
0203 /// \copydetails TExecutorCRTP::Map(F func,const std::vector<T> &args)
0204 template<class F, class T, class Cond>
0205 auto TProcessExecutor::MapImpl(F func, const std::vector<T> &args) -> std::vector<InvokeResult_t<F, T>>
0206 {
0207    //check whether func is callable
0208    using retType = decltype(func(args.front()));
0209    //prepare environment
0210    Reset();
0211    fTaskType = ETask::kMapWithArg;
0212 
0213    //fork max(args.size(), fNWorkers) times
0214    //N.B. from this point onwards, args is filled with undefined (but valid) values, since TMPWorkerExecutor moved its content away
0215    unsigned oldNWorkers = GetPoolSize();
0216    if (args.size() < oldNWorkers)
0217       SetNWorkers(args.size());
0218    TMPWorkerExecutor<F, T> worker(func, args);
0219    bool ok = Fork(worker);
0220    SetNWorkers(oldNWorkers);
0221    if (!ok)
0222    {
0223       Error("TProcessExecutor::Map", "[E][C] Could not fork. Aborting operation.");
0224       return std::vector<retType>();
0225    }
0226 
0227    //give out tasks
0228    fNToProcess = args.size();
0229    std::vector<retType> reslist;
0230    reslist.reserve(fNToProcess);
0231    std::vector<unsigned> range(fNToProcess);
0232    std::iota(range.begin(), range.end(), 0);
0233    fNProcessed = Broadcast(MPCode::kExecFuncWithArg, range);
0234 
0235    //collect results, give out other tasks if needed
0236    Collect(reslist);
0237 
0238    //clean-up and return
0239    ReapWorkers();
0240    fTaskType = ETask::kNoTask;
0241    return reslist;
0242 }
0243 
0244 //////////////////////////////////////////////////////////////////////////
0245 /// \brief Execute a function over a sequence of indexes in parallel.
0246 /// Implementation of the Map method.
0247 ///
0248 /// \copydetails TExecutorCRTP::Map(F func,ROOT::TSeq<INTEGER> args)
0249 template<class F, class INTEGER, class Cond>
0250 auto TProcessExecutor::MapImpl(F func, ROOT::TSeq<INTEGER> args) -> std::vector<InvokeResult_t<F, INTEGER>>
0251 {
0252    std::vector<INTEGER> vargs(args.size());
0253    std::copy(args.begin(), args.end(), vargs.begin());
0254    const auto &reslist = Map(func, vargs);
0255    return reslist;
0256 }
0257 
0258 //////////////////////////////////////////////////////////////////////////
0259 /// \brief Execute a function `nTimes` in parallel (Map) and accumulate the results into a single value (Reduce).
0260 /// \copydetails  ROOT::Internal::TExecutor::MapReduce(F func,unsigned nTimes,R redfunc)
0261 template<class F, class R, class Cond>
0262 auto TProcessExecutor::MapReduce(F func, unsigned nTimes, R redfunc) -> InvokeResult_t<F>
0263 {
0264    using retType = decltype(func());
0265    //prepare environment
0266    Reset();
0267    fTaskType= ETask::kMapRed;
0268 
0269    //fork max(nTimes, fNWorkers) times
0270    unsigned oldNWorkers = GetPoolSize();
0271    if (nTimes < oldNWorkers)
0272       SetNWorkers(nTimes);
0273    TMPWorkerExecutor<F, void, R> worker(func, redfunc);
0274    bool ok = Fork(worker);
0275    SetNWorkers(oldNWorkers);
0276    if (!ok) {
0277       std::cerr << "[E][C] Could not fork. Aborting operation\n";
0278       return retType();
0279    }
0280 
0281    //give workers their first task
0282    fNToProcess = nTimes;
0283    std::vector<retType> reslist;
0284    reslist.reserve(fNToProcess);
0285    fNProcessed = Broadcast(MPCode::kExecFunc, fNToProcess);
0286 
0287    //collect results/give workers their next task
0288    Collect(reslist);
0289 
0290    //clean-up and return
0291    ReapWorkers();
0292    fTaskType= ETask::kNoTask;
0293    return redfunc(reslist);
0294 }
0295 
0296 //////////////////////////////////////////////////////////////////////////
0297 /// \brief Execute a function in parallel over the elements of a vector (Map) and accumulate the results into a single value (Reduce).
0298 /// Benefits from partial reduction into `nChunks` intermediate results.
0299 ///
0300 /// \copydetails ROOT::Internal::TExecutor::MapReduce(F func,std::vector<T> &args,R redfunc,unsigned nChunks).
0301 template<class F, class T, class R, class Cond>
0302 auto TProcessExecutor::MapReduce(F func, std::vector<T> &args, R redfunc) -> InvokeResult_t<F, T>
0303 {
0304 
0305    using retType = decltype(func(args.front()));
0306    //prepare environment
0307    Reset();
0308    fTaskType= ETask::kMapRedWithArg;
0309 
0310    //fork max(args.size(), fNWorkers) times
0311    unsigned oldNWorkers = GetPoolSize();
0312    if (args.size() < oldNWorkers)
0313       SetNWorkers(args.size());
0314    TMPWorkerExecutor<F, T, R> worker(func, args, redfunc);
0315    bool ok = Fork(worker);
0316    SetNWorkers(oldNWorkers);
0317    if (!ok) {
0318       std::cerr << "[E][C] Could not fork. Aborting operation\n";
0319       return decltype(func(args.front()))();
0320    }
0321 
0322    //give workers their first task
0323    fNToProcess = args.size();
0324    std::vector<retType> reslist;
0325    reslist.reserve(fNToProcess);
0326    std::vector<unsigned> range(fNToProcess);
0327    std::iota(range.begin(), range.end(), 0);
0328    fNProcessed = Broadcast(MPCode::kExecFuncWithArg, range);
0329 
0330    //collect results/give workers their next task
0331    Collect(reslist);
0332 
0333    ReapWorkers();
0334    fTaskType= ETask::kNoTask;
0335    return Reduce(reslist, redfunc);
0336 }
0337 
0338 //////////////////////////////////////////////////////////////////////////
0339 /// \brief Execute a function in parallel over the elements of an immutable vector (Map) and accumulate the results into a single value (Reduce).
0340 /// Benefits from partial reduction into `nChunks` intermediate results.
0341 ///
0342 /// \copydetails ROOT::Internal::TExecutor::MapReduce(F func,const std::vector<T> &args,R redfunc,unsigned nChunks).
0343 template<class F, class T, class R, class Cond>
0344 auto TProcessExecutor::MapReduce(F func, const std::vector<T> &args, R redfunc) -> InvokeResult_t<F, T>
0345 {
0346 
0347    using retType = decltype(func(args.front()));
0348    //prepare environment
0349    Reset();
0350    fTaskType= ETask::kMapRedWithArg;
0351 
0352    //fork max(args.size(), fNWorkers) times
0353    unsigned oldNWorkers = GetPoolSize();
0354    if (args.size() < oldNWorkers)
0355       SetNWorkers(args.size());
0356    TMPWorkerExecutor<F, T, R> worker(func, args, redfunc);
0357    bool ok = Fork(worker);
0358    SetNWorkers(oldNWorkers);
0359    if (!ok) {
0360       std::cerr << "[E][C] Could not fork. Aborting operation\n";
0361       return decltype(func(args.front()))();
0362    }
0363 
0364    //give workers their first task
0365    fNToProcess = args.size();
0366    std::vector<retType> reslist;
0367    reslist.reserve(fNToProcess);
0368    std::vector<unsigned> range(fNToProcess);
0369    std::iota(range.begin(), range.end(), 0);
0370    fNProcessed = Broadcast(MPCode::kExecFuncWithArg, range);
0371 
0372    //collect results/give workers their next task
0373    Collect(reslist);
0374 
0375    ReapWorkers();
0376    fTaskType= ETask::kNoTask;
0377    return Reduce(reslist, redfunc);
0378 }
0379 
0380 //////////////////////////////////////////////////////////////////////////
0381 /// Handle message and reply to the worker
0382 template<class T>
0383 void TProcessExecutor::HandlePoolCode(MPCodeBufPair &msg, TSocket *s, std::vector<T> &reslist)
0384 {
0385    unsigned code = msg.first;
0386    if (code == MPCode::kFuncResult) {
0387       reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
0388       ReplyToFuncResult(s);
0389    } else if (code == MPCode::kIdling) {
0390       ReplyToIdle(s);
0391    } else if(code == MPCode::kProcResult) {
0392       if(msg.second != nullptr)
0393          reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
0394       MPSend(s, MPCode::kShutdownOrder);
0395    } else if(code == MPCode::kProcError) {
0396       const char *str = ReadBuffer<const char*>(msg.second.get());
0397       Error("TProcessExecutor::HandlePoolCode", "[E][C] a worker encountered an error: %s\n"
0398                                          "Continuing execution ignoring these entries.", str);
0399       ReplyToIdle(s);
0400       delete [] str;
0401    } else {
0402       // UNKNOWN CODE
0403       Error("TProcessExecutor::HandlePoolCode", "[W][C] unknown code received from server. code=%d", code);
0404    }
0405 }
0406 
0407 //////////////////////////////////////////////////////////////////////////
0408 /// Listen for messages sent by the workers and call the appropriate handler function.
0409 /// TProcessExecutor::HandlePoolCode is called on messages with a code < 1000 and
0410 /// TMPClient::HandleMPCode is called on messages with a code >= 1000.
0411 template<class T>
0412 void TProcessExecutor::Collect(std::vector<T> &reslist)
0413 {
0414    TMonitor &mon = GetMonitor();
0415    mon.ActivateAll();
0416    while (mon.GetActive() > 0) {
0417       TSocket *s = mon.Select();
0418       MPCodeBufPair msg = MPRecv(s);
0419       if (msg.first == MPCode::kRecvError) {
0420          Error("TProcessExecutor::Collect", "[E][C] Lost connection to a worker");
0421          Remove(s);
0422       } else if (msg.first < 1000)
0423          HandlePoolCode(msg, s, reslist);
0424       else
0425          HandleMPCode(msg, s);
0426    }
0427 }
0428 
0429 } // ROOT namespace
0430 
0431 #endif