Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:10:50

0001 // @(#)root/thread:$Id$
0002 // Author: Xavier Valls September 2020
0003 
0004 /*************************************************************************
0005  * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers.               *
0006  * All rights reserved.                                                  *
0007  *                                                                       *
0008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
0009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
0010  *************************************************************************/
0011 #ifndef ROOT_TExecutor
0012 #define ROOT_TExecutor
0013 
0014 #include "ROOT/RConfig.hxx"
0015 #include "ROOT/TExecutorCRTP.hxx"
0016 #include "ROOT/TSeq.hxx"
0017 #include "ROOT/TSequentialExecutor.hxx"
0018 #ifdef R__USE_IMT
0019 #include "ROOT/TThreadExecutor.hxx"
0020 #endif
0021 #ifndef R__WIN32
0022 #include "ROOT/TProcessExecutor.hxx"
0023 #endif
0024 #include "TROOT.h"
0025 #include "ROOT/EExecutionPolicy.hxx"
0026 
0027 #include <initializer_list>
0028 #include <memory>
0029 #include <thread>
0030 #include <type_traits> //std::enable_if
0031 #include <stdexcept> //std::invalid_argument
0032 #include <utility> //std::move
0033 
0034 namespace ROOT{
0035 
0036 namespace Internal{
0037 class TExecutor: public TExecutorCRTP<TExecutor> {
0038    friend TExecutorCRTP;
0039 
0040 public:
0041 
0042    /// \brief Class constructor. Sets the default execution policy and initializes the corresponding executor.
0043    /// Defaults to multithreaded execution policy if ROOT is compiled with IMT=ON and IsImplicitMTEnabled. Otherwise it defaults to a serial execution policy
0044    /// \param nWorkers [optional] Number of parallel workers, only taken into account if the execution policy is kMultiThread
0045    explicit TExecutor(unsigned nWorkers = 0) :
0046       TExecutor(ROOT::IsImplicitMTEnabled() ? ROOT::EExecutionPolicy::kMultiThread : ROOT::EExecutionPolicy::kSequential, nWorkers) {}
0047 
0048    /// \brief Class constructor. Sets the execution policy and initializes the corresponding executor.
0049    /// \param execPolicy Execution policy(kMultiThread, kMultiprocess, kSerial) to process the data
0050    /// \param nWorkers [optional] Number of parallel workers, only taken into account if the execution policy is kMultiThread
0051    explicit TExecutor(ROOT::EExecutionPolicy execPolicy, unsigned nWorkers = 0);
0052 
0053    TExecutor(const TExecutor &) = delete;
0054    TExecutor &operator=(const TExecutor &) = delete;
0055 
0056    /// Return the execution policy the executor is set to
0057    ROOT::EExecutionPolicy Policy() const { return fExecPolicy; }
0058 
0059    // Map
0060    //
0061    using TExecutorCRTP<TExecutor>::Map;
0062 
0063    // MapReduce
0064    // the late return types also check at compile-time whether redfunc is compatible with func,
0065    // other than checking that func is compatible with the type of arguments.
0066    // a static_assert check in TExecutor::Reduce is used to check that redfunc is compatible with the type returned by func
0067    using TExecutorCRTP<TExecutor>::MapReduce;
0068    template <class F, class R, class Cond = validMapReturnCond<F>>
0069    auto MapReduce(F func, unsigned nTimes, R redfunc, unsigned nChunks) -> InvokeResult_t<F>;
0070    template <class F, class INTEGER, class R, class Cond = validMapReturnCond<F, INTEGER>>
0071    auto MapReduce(F func, ROOT::TSeq<INTEGER> args, R redfunc, unsigned nChunks) -> InvokeResult_t<F, INTEGER>;
0072    template <class F, class T, class R, class Cond = validMapReturnCond<F, T>>
0073    auto MapReduce(F func, std::initializer_list<T> args, R redfunc, unsigned nChunks) -> InvokeResult_t<F, T>;
0074    template <class F, class T, class R, class Cond = validMapReturnCond<F, T>>
0075    auto MapReduce(F func, std::vector<T> &args, R redfunc, unsigned nChunks) -> InvokeResult_t<F, T>;
0076    template <class F, class T, class R, class Cond = validMapReturnCond<F, T>>
0077    auto MapReduce(F func, const std::vector<T> &args, R redfunc, unsigned nChunks) -> InvokeResult_t<F, T>;
0078 
0079    // Reduce
0080    //
0081    using TExecutorCRTP<TExecutor>::Reduce;
0082 
0083    unsigned GetPoolSize() const;
0084 
0085 private:
0086    // Implementation of the Map functions declared in the parent class (TExecutorCRTP)
0087    //
0088    template <class F, class Cond = validMapReturnCond<F>>
0089    auto MapImpl(F func, unsigned nTimes) -> std::vector<InvokeResult_t<F>>;
0090    template <class F, class INTEGER, class Cond = validMapReturnCond<F, INTEGER>>
0091    auto MapImpl(F func, ROOT::TSeq<INTEGER> args) -> std::vector<InvokeResult_t<F, INTEGER>>;
0092    template <class F, class T, class Cond = validMapReturnCond<F, T>>
0093    auto MapImpl(F func, std::vector<T> &args) -> std::vector<InvokeResult_t<F, T>>;
0094    template <class F, class T, class Cond = validMapReturnCond<F, T>>
0095    auto MapImpl(F func, const std::vector<T> &args) -> std::vector<InvokeResult_t<F, T>>;
0096 
0097    ROOT::EExecutionPolicy fExecPolicy;
0098 
0099    // When they are not available, we use a placeholder type instead of TThreadExecutor or TProcessExecutor.
0100    // The corresponding data members will not be used.
0101    using Unused_t = ROOT::TSequentialExecutor;
0102 #ifdef R__USE_IMT
0103 # define R__EXECUTOR_THREAD ROOT::TThreadExecutor
0104 #else
0105 # define R__EXECUTOR_THREAD Unused_t
0106 #endif
0107 #ifndef R__WIN32
0108 # define R__EXECUTOR_PROCESS ROOT::TProcessExecutor
0109 #else
0110 # define R__EXECUTOR_PROCESS Unused_t
0111 #endif
0112 
0113    std::unique_ptr<R__EXECUTOR_THREAD> fThreadExecutor;
0114    std::unique_ptr<R__EXECUTOR_PROCESS> fProcessExecutor;
0115    std::unique_ptr<ROOT::TSequentialExecutor> fSequentialExecutor;
0116 
0117 #undef R__EXECUTOR_THREAD
0118 #undef R__EXECUTOR_PROCESS
0119 
0120    /// \brief Helper class to get the correct return type from the Map function,
0121    /// necessary to infer the ResolveExecutorAndMap function type
0122    template<class F, class CONTAINER>
0123    struct MapRetType {
0124       using type = InvokeResult_t<F, typename CONTAINER::value_type>;
0125    };
0126 
0127    template<class F>
0128    struct MapRetType<F, unsigned> {
0129       using type = InvokeResult_t<F>;
0130    };
0131 
0132 
0133    /// \brief Function called from Map to select and execute the correct Executor
0134    /// according to the set Execution Policy.
0135    template<class F, class T>
0136    auto ResolveExecutorAndMap(F func, T&& args) -> std::vector<typename MapRetType<F, typename std::decay<T>::type>::type> {
0137       std::vector<typename MapRetType<F, typename std::decay<T>::type>::type> res;
0138       switch(fExecPolicy) {
0139          case ROOT::EExecutionPolicy::kSequential:
0140             res = fSequentialExecutor->Map(func, std::forward<T>(args));
0141             break;
0142          case ROOT::EExecutionPolicy::kMultiThread:
0143             res = fThreadExecutor->Map(func, std::forward<T>(args));
0144             break;
0145          case ROOT::EExecutionPolicy::kMultiProcess:
0146             res = fProcessExecutor->Map(func, std::forward<T>(args));
0147             break;
0148          default:
0149             break;
0150       }
0151       return res;
0152    }
0153 };
0154 
0155 
0156 //////////////////////////////////////////////////////////////////////////
0157 /// \brief Execute a function without arguments several times.
0158 /// Implementation of the Map method.
0159 ///
0160 /// \copydetails TExecutorCRTP::Map(F func,unsigned nTimes)
0161 template <class F, class Cond>
0162 auto TExecutor::MapImpl(F func, unsigned nTimes) -> std::vector<InvokeResult_t<F>>
0163 {
0164    return ResolveExecutorAndMap(func, nTimes);
0165 }
0166 
0167 //////////////////////////////////////////////////////////////////////////
0168 /// \brief Execute a function over a sequence of indexes.
0169 /// Implementation of the Map method.
0170 ///
0171 /// \copydetails TExecutorCRTP::Map(F func,ROOT::TSeq<INTEGER> args)
0172 template <class F, class INTEGER, class Cond>
0173 auto TExecutor::MapImpl(F func, ROOT::TSeq<INTEGER> args) -> std::vector<InvokeResult_t<F, INTEGER>>
0174 {
0175    return ResolveExecutorAndMap(func, args);
0176 }
0177 
0178 //////////////////////////////////////////////////////////////////////////
0179 /// \brief Execute a function over the elements of a vector.
0180 /// Implementation of the Map method.
0181 ///
0182 /// \copydetails TExecutorCRTP::Map(F func,std::vector<T> &args)
0183 template <class F, class T, class Cond>
0184 auto TExecutor::MapImpl(F func, std::vector<T> &args) -> std::vector<InvokeResult_t<F, T>>
0185 {
0186    return ResolveExecutorAndMap(func, args);
0187 }
0188 
0189 //////////////////////////////////////////////////////////////////////////
0190 /// \brief Execute a function over the elements of an immutable vector.
0191 /// Implementation of the Map method.
0192 ///
0193 /// \copydetails TExecutorCRTP::Map(F func,const std::vector<T> &args)
0194 template <class F, class T, class Cond>
0195 auto TExecutor::MapImpl(F func, const std::vector<T> &args) -> std::vector<InvokeResult_t<F, T>>
0196 {
0197    return ResolveExecutorAndMap(func, args);
0198 }
0199 
0200 //////////////////////////////////////////////////////////////////////////
0201 /// \brief Execute a function `nTimes` (Map) and accumulate the results into a single value (Reduce).
0202 /// Benefits from partial reduction into `nChunks` intermediate results if the execution policy is multithreaded.
0203 /// Otherwise, <b>it ignores the nChunks argument</b> and performs a normal MapReduce operation.
0204 ///
0205 /// \param func Function to be executed. Must take an element of the sequence passed as second argument as a parameter.
0206 /// \param nTimes Number of times function should be called.
0207 /// \param redfunc Reduction function to combine the results of the calls to `func` into partial results, and these
0208 /// into a final result. Must return the same type as `func` and should be callable with `const std::vector<T>` where T
0209 /// is the output of `func`.
0210 /// \param nChunks Number of chunks to split the input data for processing.
0211 /// \return A value result of "reducing" the vector returned by the Map operation into a single object.
0212 template <class F, class R, class Cond>
0213 auto TExecutor::MapReduce(F func, unsigned nTimes, R redfunc, unsigned nChunks) -> InvokeResult_t<F>
0214 {
0215    // check we can apply reduce to objs
0216    static_assert(std::is_invocable_v<R, std::vector<InvokeResult_t<F>>>, "redfunc does not have the correct signature");
0217    if (fExecPolicy == ROOT::EExecutionPolicy::kMultiThread) {
0218       return fThreadExecutor->MapReduce(func, nTimes, redfunc, nChunks);
0219    }
0220    return Reduce(Map(func, nTimes), redfunc);
0221 }
0222 
0223 //////////////////////////////////////////////////////////////////////////
0224 /// \brief Execute a function over a sequence of indexes (Map) and accumulate the results into a single value (Reduce).
0225 /// Benefits from partial reduction into `nChunks` intermediate results if the execution policy is multithreaded.
0226 /// Otherwise, <b>it ignores the nChunks argument</b> and performs a normal MapReduce operation.
0227 ///
0228 /// \param func Function to be executed. Must take an element of the sequence passed assecond argument as a parameter.
0229 /// \param args Sequence of indexes to execute `func` on.
0230 /// \param redfunc Reduction function to combine the results of the calls to `func` into partial results, and these
0231 /// into a final result. Must return the same type as `func` and should be callable with `std::vector<T>` where T is the
0232 /// output of `func`.
0233 /// \param nChunks Number of chunks to split the input data for processing.
0234 /// \return A value result of  "reducing" the vector returned by the Map operation into a single object.
0235 template <class F, class INTEGER, class R, class Cond>
0236 auto TExecutor::MapReduce(F func, ROOT::TSeq<INTEGER> args, R redfunc, unsigned nChunks) -> InvokeResult_t<F, INTEGER>
0237 {
0238    static_assert(std::is_invocable_v<R, std::vector<InvokeResult_t<F, INTEGER>>>,
0239                  "redfunc does not have the correct signature");
0240    if (fExecPolicy == ROOT::EExecutionPolicy::kMultiThread) {
0241       return fThreadExecutor->MapReduce(func, args, redfunc, nChunks);
0242    }
0243    return Reduce(Map(func, args), redfunc);
0244 }
0245 
0246 //////////////////////////////////////////////////////////////////////////
0247 /// \brief Execute a function over the elements of an initializer_list (Map) and accumulate the results into a single
0248 /// value (Reduce). Benefits from partial reduction into `nChunks` intermediate results if the execution policy is
0249 /// multithreaded. Otherwise, <b>it ignores the nChunks argument</b> and performs a normal MapReduce operation.
0250 ///
0251 /// \param func Function to be executed. Must take an element of the sequence passed as second argument as a parameter.
0252 /// \param args initializer_list for a vector to apply `func` on.
0253 /// \param redfunc Reduction function to combine the results of the calls to `func` into partial results, and these
0254 /// into a final result. Must return the same type as `func` and should be callable with `const std::vector<T>` where T
0255 /// is the output of `func`.
0256 /// \param nChunks Number of chunks to split the input data for processing.
0257 /// \return A value result of "reducing" the vector returned by the Map operation into a single object.
0258 template <class F, class T, class R, class Cond>
0259 auto TExecutor::MapReduce(F func, std::initializer_list<T> args, R redfunc, unsigned nChunks) -> InvokeResult_t<F, T>
0260 {
0261    static_assert(std::is_invocable_v<R, std::vector<InvokeResult_t<F, T>>>,
0262                  "redfunc does not have the correct signature");
0263    if (fExecPolicy == ROOT::EExecutionPolicy::kMultiThread) {
0264       return fThreadExecutor->MapReduce(func, args, redfunc, nChunks);
0265    }
0266    return Reduce(Map(func, args), redfunc);
0267 }
0268 
0269 //////////////////////////////////////////////////////////////////////////
0270 /// \brief Execute a function over the elements of a vector (Map) and accumulate the results into a single value
0271 /// (Reduce). Benefits from partial reduction into `nChunks` intermediate results if the execution policy is
0272 /// multithreaded. Otherwise, <b>it ignores the nChunks argument</b> and performs a normal MapReduce operation.
0273 ///
0274 /// \param func Function to be executed. Must take an element of the sequence passed assecond argument as a parameter.
0275 /// \param args Vector of elements passed as an argument to `func`.
0276 /// \param redfunc Reduction function to combine the results of the calls to `func` into partial results, and these
0277 /// into a final result. Must return the same type as `func` and should be callable with `const std::vector<T>` where T
0278 /// is the output of `func`.
0279 /// \param nChunks Number of chunks to split the input data for processing.
0280 /// \return A value result of "reducing" the vector returned by the Map operation into a single object.
0281 template <class F, class T, class R, class Cond>
0282 auto TExecutor::MapReduce(F func, std::vector<T> &args, R redfunc, unsigned nChunks) -> InvokeResult_t<F, T>
0283 {
0284    static_assert(std::is_invocable_v<R, std::vector<InvokeResult_t<F, T>>>,
0285                  "redfunc does not have the correct signature");
0286    if (fExecPolicy == ROOT::EExecutionPolicy::kMultiThread) {
0287       return fThreadExecutor->MapReduce(func, args, redfunc, nChunks);
0288    }
0289    return Reduce(Map(func, args), redfunc);
0290 }
0291 
0292 //////////////////////////////////////////////////////////////////////////
0293 /// \brief Execute a function over the elements of an immutable vector (Map) and accumulate the results into a single
0294 /// value (Reduce). Benefits from partial reduction into `nChunks` intermediate results if the execution policy is
0295 /// multithreaded. Otherwise, <b>it ignores the nChunks argument</b> and performs a normal MapReduce operation.
0296 ///
0297 /// \param func Function to be executed. Must take an element of the sequence passed assecond argument as a parameter.
0298 /// \param args Immutable vector, whose elements are passed as an argument to `func`.
0299 /// \param redfunc Reduction function to combine the results of the calls to `func` into partial results, and these
0300 /// into a final result. Must return the same type as `func` and should be callable with `const std::vector<T>` where T
0301 /// is the output of `func`.
0302 /// \param nChunks Number of chunks to split the input data for processing.
0303 /// \return A value result of "reducing" the vector returned by the Map operation into a single object.
0304 template <class F, class T, class R, class Cond>
0305 auto TExecutor::MapReduce(F func, const std::vector<T> &args, R redfunc, unsigned nChunks) -> InvokeResult_t<F, T>
0306 {
0307    static_assert(std::is_invocable_v<R, std::vector<InvokeResult_t<F, T>>>,
0308                  "redfunc does not have the correct signature");
0309    if (fExecPolicy == ROOT::EExecutionPolicy::kMultiThread) {
0310       return fThreadExecutor->MapReduce(func, args, redfunc, nChunks);
0311    }
0312    return Reduce(Map(func, args), redfunc);
0313 }
0314 
0315 //////////////////////////////////////////////////////////////////////////
0316 /// \brief Return the number of pooled workers.
0317 ///
0318 /// \return The number of workers in the pool in the executor used as a backend.
0319 
0320 inline unsigned TExecutor::GetPoolSize() const
0321 {
0322    unsigned poolSize{0u};
0323    switch(fExecPolicy){
0324       case ROOT::EExecutionPolicy::kSequential:
0325          poolSize = fSequentialExecutor->GetPoolSize();
0326          break;
0327       case ROOT::EExecutionPolicy::kMultiThread:
0328          poolSize = fThreadExecutor->GetPoolSize();
0329          break;
0330       case ROOT::EExecutionPolicy::kMultiProcess:
0331          poolSize = fProcessExecutor->GetPoolSize();
0332          break;
0333       default:
0334          break;
0335    }
0336    return poolSize;
0337 }
0338 
0339 } // namespace Internal
0340 } // namespace ROOT
0341 
0342 #endif