Back to home page

EIC code displayed by LXR

 
 

    


Warning, file /include/root/TMPWorkerExecutor.h was not indexed or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).

0001 /* @(#)root/multiproc:$Id$ */
0002 // Author: Enrico Guiraud July 2015
0003 
0004 /*************************************************************************
0005  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers.               *
0006  * All rights reserved.                                                  *
0007  *                                                                       *
0008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
0009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
0010  *************************************************************************/
0011 
0012 #ifndef ROOT_TMPWorkerExecutor
0013 #define ROOT_TMPWorkerExecutor
0014 
0015 #include "MPCode.h"
0016 #include "MPSendRecv.h"
0017 #include "PoolUtils.h"
0018 #include "TMPWorker.h"
0019 
0020 #include <string>
0021 #include <vector>
0022 
0023 //////////////////////////////////////////////////////////////////////////
0024 ///
0025 /// \class TMPWorkerExecutor
0026 ///
0027 /// This class works together with TProcessExecutor to allow the execution of
0028 /// functions in server processes. Depending on the exact task that the
0029 /// worker is required to execute, a different version of the class
0030 /// can be called.
0031 ///
0032 /// ### TMPWorkerExecutor<F, T, R>
0033 /// The most general case, used by
0034 /// TProcessExecutor::MapReduce(F func, T& args, R redfunc).
0035 /// This worker is build with:
0036 /// * a function of signature F (the one to be executed)
0037 /// * a collection of arguments of type T on which to apply the function
0038 /// * a reduce function with signature R to be used to squash many
0039 /// returned values together.
0040 ///
0041 /// ### Partial specializations
0042 /// A few partial specializations are provided for less general cases:
0043 /// * TMPWorkerExecutor<F, T, void> handles the case of a function that takes
0044 /// one argument and does not perform reduce operations
0045 /// (TProcessExecutor::Map(F func, T& args)).
0046 /// * TMPWorkerExecutor<F, void, R> handles the case of a function that takes
0047 /// no arguments, to be executed a specified amount of times, which
0048 /// returned values are squashed together (reduced)
0049 /// (TProcessExecutor::Map(F func, unsigned nTimes, R redfunc))
0050 /// * TMPWorkerExecutor<F, void, void> handles the case of a function that takes
0051 /// no arguments and whose arguments are not "reduced"
0052 /// (TProcessExecutor::Map(F func, unsigned nTimes))
0053 ///
0054 /// Since all the important data are passed to TMPWorkerExecutor at construction
0055 /// time, the kind of messages that client and workers have to exchange
0056 /// are usually very simple.
0057 ///
0058 //////////////////////////////////////////////////////////////////////////
0059 
0060 // Quick guide to TMPWorkerExecutor:
0061 // For each TProcessExecutor::Map and TProcessExecutor::MapReduce signature
0062 // there's a corresponding
0063 // specialization of TMPWorkerExecutor:
0064 // * Map(func, nTimes) --> TMPWorkerExecutor<F, void, void>
0065 // * Map(func, args)   --> TMPWorkerExecutor<F, T, void>
0066 // * MapReduce(func, nTimes, redfunc) --> TMPWorkerExecutor<F, void, R>
0067 // * MapReduce(func, args, redfunc)   --> TMPWorkerExecutor<F, T, R>
0068 // I thought about having four different classes (with different names)
0069 // instead of four specializations of the same class template, but it really
0070 // makes no difference in the end since the different classes would be class
0071 // templates anyway, and I would have to find a meaningful name for each one.
0072 // About code replication: looking carefully, it can be noticed that there's
0073 // very little code replication since the different versions of TMPWorkerExecutor
0074 // all behave slightly differently, in incompatible ways (e.g. they all need
0075 // different data members, different signatures for the ctors, and so on).
0076 
0077 template<class F, class T = void, class R = void>
0078 class TMPWorkerExecutor : public TMPWorker {
0079 public:
0080    // TProcessExecutor is in charge of checking the signatures for incompatibilities:
0081    // we trust that decltype(redfunc(std::vector<decltype(func(args[0]))>)) == decltype(args[0])
0082    // TODO document somewhere that fReducedResult must have a default ctor
0083    TMPWorkerExecutor(F func, const std::vector<T> &args, R redfunc) :
0084       TMPWorker(), fFunc(func), fArgs(args), fRedFunc(redfunc),
0085       fReducedResult(), fCanReduce(false)
0086    {}
0087    ~TMPWorkerExecutor() {}
0088 
0089    void HandleInput(MPCodeBufPair &msg) ///< Execute instructions received from a TProcessExecutor client
0090    {
0091       unsigned code = msg.first;
0092       TSocket *s = GetSocket();
0093       std::string reply = "S" + std::to_string(GetNWorker());
0094       if (code == MPCode::kExecFuncWithArg) {
0095          unsigned n;
0096          msg.second->ReadUInt(n);
0097          // execute function on argument n
0098          const auto &res = fFunc(fArgs[n]);
0099          // tell client we're done
0100          MPSend(s, MPCode::kIdling);
0101          // reduce arguments if possible
0102          if (fCanReduce) {
0103             using FINAL = decltype(fReducedResult);
0104             using ORIGINAL = decltype(fRedFunc({res, fReducedResult}));
0105             fReducedResult = ROOT::Internal::PoolUtils::ResultCaster<ORIGINAL, FINAL>::CastIfNeeded(fRedFunc({res, fReducedResult})); //TODO try not to copy these into a vector, do everything by ref. std::vector<T&>?
0106          } else {
0107             fCanReduce = true;
0108             fReducedResult = res;
0109          }
0110       } else if (code == MPCode::kSendResult) {
0111          MPSend(s, MPCode::kFuncResult, fReducedResult);
0112       } else {
0113          reply += ": unknown code received: " + std::to_string(code);
0114          MPSend(s, MPCode::kError, reply.c_str());
0115       }
0116    }
0117 
0118 private:
0119    F fFunc; ///< the function to be executed
0120    std::vector<T> fArgs; ///< a vector containing the arguments that must be passed to fFunc
0121    R fRedFunc; ///< the reduce function
0122    decltype(fFunc(fArgs.front())) fReducedResult; ///< the result of the execution
0123    bool fCanReduce; ///< true if fReducedResult can be reduced with a new result, false until we have produced one result
0124 };
0125 
0126 
0127 template<class F, class R>
0128 class TMPWorkerExecutor<F, void, R> : public TMPWorker {
0129 public:
0130    TMPWorkerExecutor(F func, R redfunc) :
0131       TMPWorker(), fFunc(func), fRedFunc(redfunc),
0132       fReducedResult(), fCanReduce(false)
0133    {}
0134    ~TMPWorkerExecutor() {}
0135 
0136    void HandleInput(MPCodeBufPair &msg) ///< Execute instructions received from a TProcessExecutor client
0137    {
0138       unsigned code = msg.first;
0139       TSocket *s = GetSocket();
0140       std::string reply = "S" + std::to_string(GetNWorker());
0141       if (code == MPCode::kExecFunc) {
0142          // execute function
0143          const auto &res = fFunc();
0144          // tell client we're done
0145          MPSend(s, MPCode::kIdling);
0146          // reduce arguments if possible
0147          if (fCanReduce) {
0148             fReducedResult = fRedFunc({res, fReducedResult}); //TODO try not to copy these into a vector, do everything by ref. std::vector<T&>?
0149          } else {
0150             fCanReduce = true;
0151             fReducedResult = res;
0152          }
0153       } else if (code == MPCode::kSendResult) {
0154          MPSend(s, MPCode::kFuncResult, fReducedResult);
0155       } else {
0156          reply += ": unknown code received: " + std::to_string(code);
0157          MPSend(s, MPCode::kError, reply.c_str());
0158       }
0159    }
0160 
0161 private:
0162    F fFunc; ///< the function to be executed
0163    R fRedFunc; ///< the reduce function
0164    decltype(fFunc()) fReducedResult; ///< the result of the execution
0165    bool fCanReduce; ///< true if fReducedResult can be reduced with a new result, false until we have produced one result
0166 };
0167 
0168 template<class F, class T>
0169 class TMPWorkerExecutor<F, T, void> : public TMPWorker {
0170 public:
0171    TMPWorkerExecutor(F func, const std::vector<T> &args) : TMPWorker(), fFunc(func), fArgs(std::move(args)) {}
0172    ~TMPWorkerExecutor() {}
0173    void HandleInput(MPCodeBufPair &msg) ///< Execute instructions received from a TProcessExecutor client
0174    {
0175       unsigned code = msg.first;
0176       TSocket *s = GetSocket();
0177       if (code == MPCode::kExecFuncWithArg) {
0178          unsigned n;
0179          msg.second->ReadUInt(n);
0180          MPSend(s, MPCode::kFuncResult, fFunc(fArgs[n]));
0181       } else {
0182          std::string reply = "S" + std::to_string(GetNWorker()) + ": unknown code received: " + std::to_string(code);
0183          MPSend(s, MPCode::kError, reply.c_str());
0184       }
0185    }
0186 
0187 private:
0188    F fFunc; ///< the function to be executed
0189    std::vector<T> fArgs; ///< a vector containing the arguments that must be passed to fFunc
0190 };
0191 
0192 
0193 // doxygen should ignore this specialization
0194 /// \cond
0195 // The most generic class template is meant to handle functions that
0196 // must be executed by passing one argument to them.
0197 // This partial specialization is used to handle the case
0198 // of functions which must be executed without passing any argument.
0199 template<class F>
0200 class TMPWorkerExecutor<F, void, void> : public TMPWorker {
0201 public:
0202    explicit TMPWorkerExecutor(F func) : TMPWorker(), fFunc(func) {}
0203    ~TMPWorkerExecutor() {}
0204    void HandleInput(MPCodeBufPair &msg)
0205    {
0206       unsigned code = msg.first;
0207       TSocket *s = GetSocket();
0208       std::string myId = "S" + std::to_string(GetPid());
0209       if (code == MPCode::kExecFunc) {
0210          MPSend(s, MPCode::kFuncResult, fFunc());
0211       } else {
0212          MPSend(s, MPCode::kError, (myId + ": unknown code received: " + std::to_string(code)).c_str());
0213       }
0214    }
0215 
0216 private:
0217    F fFunc;
0218 };
0219 /// \endcond
0220 
0221 #endif