|
||||
Warning, file /include/root/TMPWorkerExecutor.h was not indexed or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).
0001 /* @(#)root/multiproc:$Id$ */ 0002 // Author: Enrico Guiraud July 2015 0003 0004 /************************************************************************* 0005 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. * 0006 * All rights reserved. * 0007 * * 0008 * For the licensing terms see $ROOTSYS/LICENSE. * 0009 * For the list of contributors see $ROOTSYS/README/CREDITS. * 0010 *************************************************************************/ 0011 0012 #ifndef ROOT_TMPWorkerExecutor 0013 #define ROOT_TMPWorkerExecutor 0014 0015 #include "MPCode.h" 0016 #include "MPSendRecv.h" 0017 #include "PoolUtils.h" 0018 #include "TMPWorker.h" 0019 0020 #include <string> 0021 #include <vector> 0022 0023 ////////////////////////////////////////////////////////////////////////// 0024 /// 0025 /// \class TMPWorkerExecutor 0026 /// 0027 /// This class works together with TProcessExecutor to allow the execution of 0028 /// functions in server processes. Depending on the exact task that the 0029 /// worker is required to execute, a different version of the class 0030 /// can be called. 0031 /// 0032 /// ### TMPWorkerExecutor<F, T, R> 0033 /// The most general case, used by 0034 /// TProcessExecutor::MapReduce(F func, T& args, R redfunc). 0035 /// This worker is build with: 0036 /// * a function of signature F (the one to be executed) 0037 /// * a collection of arguments of type T on which to apply the function 0038 /// * a reduce function with signature R to be used to squash many 0039 /// returned values together. 0040 /// 0041 /// ### Partial specializations 0042 /// A few partial specializations are provided for less general cases: 0043 /// * TMPWorkerExecutor<F, T, void> handles the case of a function that takes 0044 /// one argument and does not perform reduce operations 0045 /// (TProcessExecutor::Map(F func, T& args)). 0046 /// * TMPWorkerExecutor<F, void, R> handles the case of a function that takes 0047 /// no arguments, to be executed a specified amount of times, which 0048 /// returned values are squashed together (reduced) 0049 /// (TProcessExecutor::Map(F func, unsigned nTimes, R redfunc)) 0050 /// * TMPWorkerExecutor<F, void, void> handles the case of a function that takes 0051 /// no arguments and whose arguments are not "reduced" 0052 /// (TProcessExecutor::Map(F func, unsigned nTimes)) 0053 /// 0054 /// Since all the important data are passed to TMPWorkerExecutor at construction 0055 /// time, the kind of messages that client and workers have to exchange 0056 /// are usually very simple. 0057 /// 0058 ////////////////////////////////////////////////////////////////////////// 0059 0060 // Quick guide to TMPWorkerExecutor: 0061 // For each TProcessExecutor::Map and TProcessExecutor::MapReduce signature 0062 // there's a corresponding 0063 // specialization of TMPWorkerExecutor: 0064 // * Map(func, nTimes) --> TMPWorkerExecutor<F, void, void> 0065 // * Map(func, args) --> TMPWorkerExecutor<F, T, void> 0066 // * MapReduce(func, nTimes, redfunc) --> TMPWorkerExecutor<F, void, R> 0067 // * MapReduce(func, args, redfunc) --> TMPWorkerExecutor<F, T, R> 0068 // I thought about having four different classes (with different names) 0069 // instead of four specializations of the same class template, but it really 0070 // makes no difference in the end since the different classes would be class 0071 // templates anyway, and I would have to find a meaningful name for each one. 0072 // About code replication: looking carefully, it can be noticed that there's 0073 // very little code replication since the different versions of TMPWorkerExecutor 0074 // all behave slightly differently, in incompatible ways (e.g. they all need 0075 // different data members, different signatures for the ctors, and so on). 0076 0077 template<class F, class T = void, class R = void> 0078 class TMPWorkerExecutor : public TMPWorker { 0079 public: 0080 // TProcessExecutor is in charge of checking the signatures for incompatibilities: 0081 // we trust that decltype(redfunc(std::vector<decltype(func(args[0]))>)) == decltype(args[0]) 0082 // TODO document somewhere that fReducedResult must have a default ctor 0083 TMPWorkerExecutor(F func, const std::vector<T> &args, R redfunc) : 0084 TMPWorker(), fFunc(func), fArgs(args), fRedFunc(redfunc), 0085 fReducedResult(), fCanReduce(false) 0086 {} 0087 ~TMPWorkerExecutor() {} 0088 0089 void HandleInput(MPCodeBufPair &msg) ///< Execute instructions received from a TProcessExecutor client 0090 { 0091 unsigned code = msg.first; 0092 TSocket *s = GetSocket(); 0093 std::string reply = "S" + std::to_string(GetNWorker()); 0094 if (code == MPCode::kExecFuncWithArg) { 0095 unsigned n; 0096 msg.second->ReadUInt(n); 0097 // execute function on argument n 0098 const auto &res = fFunc(fArgs[n]); 0099 // tell client we're done 0100 MPSend(s, MPCode::kIdling); 0101 // reduce arguments if possible 0102 if (fCanReduce) { 0103 using FINAL = decltype(fReducedResult); 0104 using ORIGINAL = decltype(fRedFunc({res, fReducedResult})); 0105 fReducedResult = ROOT::Internal::PoolUtils::ResultCaster<ORIGINAL, FINAL>::CastIfNeeded(fRedFunc({res, fReducedResult})); //TODO try not to copy these into a vector, do everything by ref. std::vector<T&>? 0106 } else { 0107 fCanReduce = true; 0108 fReducedResult = res; 0109 } 0110 } else if (code == MPCode::kSendResult) { 0111 MPSend(s, MPCode::kFuncResult, fReducedResult); 0112 } else { 0113 reply += ": unknown code received: " + std::to_string(code); 0114 MPSend(s, MPCode::kError, reply.c_str()); 0115 } 0116 } 0117 0118 private: 0119 F fFunc; ///< the function to be executed 0120 std::vector<T> fArgs; ///< a vector containing the arguments that must be passed to fFunc 0121 R fRedFunc; ///< the reduce function 0122 decltype(fFunc(fArgs.front())) fReducedResult; ///< the result of the execution 0123 bool fCanReduce; ///< true if fReducedResult can be reduced with a new result, false until we have produced one result 0124 }; 0125 0126 0127 template<class F, class R> 0128 class TMPWorkerExecutor<F, void, R> : public TMPWorker { 0129 public: 0130 TMPWorkerExecutor(F func, R redfunc) : 0131 TMPWorker(), fFunc(func), fRedFunc(redfunc), 0132 fReducedResult(), fCanReduce(false) 0133 {} 0134 ~TMPWorkerExecutor() {} 0135 0136 void HandleInput(MPCodeBufPair &msg) ///< Execute instructions received from a TProcessExecutor client 0137 { 0138 unsigned code = msg.first; 0139 TSocket *s = GetSocket(); 0140 std::string reply = "S" + std::to_string(GetNWorker()); 0141 if (code == MPCode::kExecFunc) { 0142 // execute function 0143 const auto &res = fFunc(); 0144 // tell client we're done 0145 MPSend(s, MPCode::kIdling); 0146 // reduce arguments if possible 0147 if (fCanReduce) { 0148 fReducedResult = fRedFunc({res, fReducedResult}); //TODO try not to copy these into a vector, do everything by ref. std::vector<T&>? 0149 } else { 0150 fCanReduce = true; 0151 fReducedResult = res; 0152 } 0153 } else if (code == MPCode::kSendResult) { 0154 MPSend(s, MPCode::kFuncResult, fReducedResult); 0155 } else { 0156 reply += ": unknown code received: " + std::to_string(code); 0157 MPSend(s, MPCode::kError, reply.c_str()); 0158 } 0159 } 0160 0161 private: 0162 F fFunc; ///< the function to be executed 0163 R fRedFunc; ///< the reduce function 0164 decltype(fFunc()) fReducedResult; ///< the result of the execution 0165 bool fCanReduce; ///< true if fReducedResult can be reduced with a new result, false until we have produced one result 0166 }; 0167 0168 template<class F, class T> 0169 class TMPWorkerExecutor<F, T, void> : public TMPWorker { 0170 public: 0171 TMPWorkerExecutor(F func, const std::vector<T> &args) : TMPWorker(), fFunc(func), fArgs(std::move(args)) {} 0172 ~TMPWorkerExecutor() {} 0173 void HandleInput(MPCodeBufPair &msg) ///< Execute instructions received from a TProcessExecutor client 0174 { 0175 unsigned code = msg.first; 0176 TSocket *s = GetSocket(); 0177 if (code == MPCode::kExecFuncWithArg) { 0178 unsigned n; 0179 msg.second->ReadUInt(n); 0180 MPSend(s, MPCode::kFuncResult, fFunc(fArgs[n])); 0181 } else { 0182 std::string reply = "S" + std::to_string(GetNWorker()) + ": unknown code received: " + std::to_string(code); 0183 MPSend(s, MPCode::kError, reply.c_str()); 0184 } 0185 } 0186 0187 private: 0188 F fFunc; ///< the function to be executed 0189 std::vector<T> fArgs; ///< a vector containing the arguments that must be passed to fFunc 0190 }; 0191 0192 0193 // doxygen should ignore this specialization 0194 /// \cond 0195 // The most generic class template is meant to handle functions that 0196 // must be executed by passing one argument to them. 0197 // This partial specialization is used to handle the case 0198 // of functions which must be executed without passing any argument. 0199 template<class F> 0200 class TMPWorkerExecutor<F, void, void> : public TMPWorker { 0201 public: 0202 explicit TMPWorkerExecutor(F func) : TMPWorker(), fFunc(func) {} 0203 ~TMPWorkerExecutor() {} 0204 void HandleInput(MPCodeBufPair &msg) 0205 { 0206 unsigned code = msg.first; 0207 TSocket *s = GetSocket(); 0208 std::string myId = "S" + std::to_string(GetPid()); 0209 if (code == MPCode::kExecFunc) { 0210 MPSend(s, MPCode::kFuncResult, fFunc()); 0211 } else { 0212 MPSend(s, MPCode::kError, (myId + ": unknown code received: " + std::to_string(code)).c_str()); 0213 } 0214 } 0215 0216 private: 0217 F fFunc; 0218 }; 0219 /// \endcond 0220 0221 #endif
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.3.7 LXR engine. The LXR team |