File indexing completed on 2025-01-18 10:10:51
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 #ifndef ROOT_TThreadedObject
0013 #define ROOT_TThreadedObject
0014
0015 #include "ROOT/TSpinMutex.hxx"
0016 #include "TDirectory.h"
0017 #include "TError.h"
0018 #include "TList.h"
0019 #include "TROOT.h"
0020
0021
0022 #include <algorithm>
0023 #include <exception>
0024 #include <deque>
0025 #include <functional>
0026 #include <map>
0027 #include <memory>
0028 #include <mutex>
0029 #include <sstream>
0030 #include <string>
0031 #include <thread>
0032 #include <vector>
0033
0034 class TH1;
0035
0036 namespace ROOT {
0037
0038
0039
0040
0041
0042 struct TNumSlots {
0043 unsigned int fVal;
0044 friend bool operator==(TNumSlots lhs, TNumSlots rhs) { return lhs.fVal == rhs.fVal; }
0045 friend bool operator!=(TNumSlots lhs, TNumSlots rhs) { return lhs.fVal != rhs.fVal; }
0046 };
0047
0048 namespace Internal {
0049
0050 namespace TThreadedObjectUtils {
0051
0052
0053 template<typename T, bool ISHISTO = std::is_base_of<TH1,T>::value>
0054 struct Detacher{
0055 static T* Detach(T* obj) {
0056 return obj;
0057 }
0058 };
0059
0060 template<typename T>
0061 struct Detacher<T, true>{
0062 static T* Detach(T* obj) {
0063 obj->SetDirectory(nullptr);
0064 obj->ResetBit(kMustCleanup);
0065 return obj;
0066 }
0067 };
0068
0069
0070 template<class T, bool isCopyConstructible = std::is_copy_constructible<T>::value>
0071 struct Cloner {
0072 static T *Clone(const T *obj, TDirectory* d = nullptr) {
0073 T* clone;
0074 if (d){
0075 TDirectory::TContext ctxt(d);
0076 clone = new T(*obj);
0077 } else {
0078 clone = new T(*obj);
0079 }
0080 return Detacher<T>::Detach(clone);
0081 }
0082 };
0083
0084 template<class T>
0085 struct Cloner<T, false> {
0086 static T *Clone(const T *obj, TDirectory* d = nullptr) {
0087 T* clone;
0088 if (d){
0089 TDirectory::TContext ctxt(d);
0090 clone = (T*)obj->Clone();
0091 } else {
0092 clone = (T*)obj->Clone();
0093 }
0094 return clone;
0095 }
0096 };
0097
0098 template <class T, bool ISHISTO = std::is_base_of<TH1, T>::value>
0099 struct DirCreator {
0100 static TDirectory *Create()
0101 {
0102 static unsigned dirCounter = 0;
0103 const std::string dirName = "__TThreaded_dir_" + std::to_string(dirCounter++) + "_";
0104 return gROOT->mkdir(dirName.c_str());
0105 }
0106 };
0107
0108 template <class T>
0109 struct DirCreator<T, true> {
0110 static TDirectory *Create() { return nullptr; }
0111 };
0112
0113 }
0114 }
0115
0116 namespace TThreadedObjectUtils {
0117
0118 template<class T>
0119 using MergeFunctionType = std::function<void(std::shared_ptr<T>, std::vector<std::shared_ptr<T>>&)>;
0120
0121
0122 template<class T>
0123 void MergeTObjects(std::shared_ptr<T> target, std::vector<std::shared_ptr<T>> &objs)
0124 {
0125 if (!target) return;
0126 TList objTList;
0127
0128 for (auto obj : objs) {
0129 if (obj && obj != target) objTList.Add(obj.get());
0130 }
0131 target->Merge(&objTList);
0132 }
0133 }
0134
0135
0136
0137
0138
0139
0140
0141
0142
0143
0144
0145
0146
0147
0148
0149
0150 template<class T>
0151 class TThreadedObject {
0152 public:
0153
0154
0155 static constexpr const TNumSlots fgMaxSlots{64};
0156
0157 TThreadedObject(const TThreadedObject&) = delete;
0158
0159
0160
0161
0162
0163
0164
0165
0166 template <class... ARGS>
0167 TThreadedObject(TNumSlots initSlots, ARGS &&... args) : fIsMerged(false)
0168 {
0169 const auto nSlots = initSlots.fVal;
0170 fObjPointers.resize(nSlots);
0171
0172
0173 fDirectories.emplace_back(Internal::TThreadedObjectUtils::DirCreator<T>::Create());
0174 for (auto i = 1u; i < nSlots; ++i)
0175 fDirectories.emplace_back(Internal::TThreadedObjectUtils::DirCreator<T>::Create());
0176
0177 TDirectory::TContext ctxt(fDirectories[0]);
0178 fModel.reset(Internal::TThreadedObjectUtils::Detacher<T>::Detach(new T(std::forward<ARGS>(args)...)));
0179 }
0180
0181
0182
0183 template<class ...ARGS>
0184 TThreadedObject(ARGS&&... args) : TThreadedObject(fgMaxSlots, args...) { }
0185
0186
0187
0188
0189
0190
0191 unsigned GetNSlots() const
0192 {
0193 std::lock_guard<ROOT::TSpinMutex> lg(fSpinMutex);
0194 return fObjPointers.size();
0195 }
0196
0197
0198
0199
0200
0201
0202 std::shared_ptr<T> GetAtSlot(unsigned i)
0203 {
0204 std::size_t nAvailableSlots;
0205 {
0206
0207 std::lock_guard<ROOT::TSpinMutex> lg(fSpinMutex);
0208 nAvailableSlots = fObjPointers.size();
0209 }
0210
0211 if (i >= nAvailableSlots) {
0212 Warning("TThreadedObject::GetAtSlot", "This slot does not exist.");
0213 return nullptr;
0214 }
0215
0216 auto &objPointer = fObjPointers[i];
0217 if (!objPointer)
0218 objPointer.reset(Internal::TThreadedObjectUtils::Cloner<T>::Clone(fModel.get(), fDirectories[i]));
0219 return objPointer;
0220 }
0221
0222
0223
0224
0225
0226
0227 void SetAtSlot(unsigned i, std::shared_ptr<T> v)
0228 {
0229 std::size_t nAvailableSlots;
0230 {
0231
0232 std::lock_guard<ROOT::TSpinMutex> lg(fSpinMutex);
0233 nAvailableSlots = fObjPointers.size();
0234 }
0235
0236 if (i >= nAvailableSlots) {
0237 Warning("TThreadedObject::SetAtSlot", "This slot does not exist, doing nothing.");
0238 return;
0239 }
0240
0241 fObjPointers[i] = v;
0242 }
0243
0244
0245
0246
0247
0248
0249 std::shared_ptr<T> GetAtSlotUnchecked(unsigned i) const
0250 {
0251 return fObjPointers[i];
0252 }
0253
0254
0255
0256
0257
0258
0259
0260 T* GetAtSlotRaw(unsigned i) const
0261 {
0262 return fObjPointers[i].get();
0263 }
0264
0265
0266
0267
0268
0269
0270
0271
0272
0273
0274
0275
0276
0277
0278
0279
0280 std::shared_ptr<T> Get()
0281 {
0282 return GetAtSlot(GetThisSlotNumber());
0283 }
0284
0285
0286 T *operator->()
0287 {
0288 return Get().get();
0289 }
0290
0291
0292
0293
0294 std::shared_ptr<T> Merge(TThreadedObjectUtils::MergeFunctionType<T> mergeFunction = TThreadedObjectUtils::MergeTObjects<T>)
0295 {
0296
0297 if (fIsMerged) {
0298 Warning("TThreadedObject::Merge", "This object was already merged. Returning the previous result.");
0299 return fObjPointers[0];
0300 }
0301
0302 auto vecOfObjPtrs = std::vector<std::shared_ptr<T>>(fObjPointers.begin(), fObjPointers.end());
0303 mergeFunction(fObjPointers[0], vecOfObjPtrs);
0304 fIsMerged = true;
0305 return fObjPointers[0];
0306 }
0307
0308
0309
0310
0311
0312 std::unique_ptr<T> SnapshotMerge(TThreadedObjectUtils::MergeFunctionType<T> mergeFunction = TThreadedObjectUtils::MergeTObjects<T>)
0313 {
0314 if (fIsMerged) {
0315 Warning("TThreadedObject::SnapshotMerge", "This object was already merged. Returning the previous result.");
0316 return std::unique_ptr<T>(Internal::TThreadedObjectUtils::Cloner<T>::Clone(fObjPointers[0].get()));
0317 }
0318 auto targetPtr = Internal::TThreadedObjectUtils::Cloner<T>::Clone(fModel.get());
0319 std::shared_ptr<T> targetPtrShared(targetPtr, [](T *) {});
0320
0321 auto vecOfObjPtrs = std::vector<std::shared_ptr<T>>(fObjPointers.begin(), fObjPointers.end());
0322 mergeFunction(targetPtrShared, vecOfObjPtrs);
0323 return std::unique_ptr<T>(targetPtr);
0324 }
0325
0326 private:
0327 std::unique_ptr<T> fModel;
0328
0329 std::deque<std::shared_ptr<T>> fObjPointers;
0330
0331
0332 std::deque<TDirectory*> fDirectories;
0333 std::map<std::thread::id, unsigned> fThrIDSlotMap;
0334 mutable ROOT::TSpinMutex fSpinMutex;
0335 bool fIsMerged : 1;
0336
0337
0338 unsigned GetThisSlotNumber()
0339 {
0340 const auto thisThreadID = std::this_thread::get_id();
0341 std::lock_guard<ROOT::TSpinMutex> lg(fSpinMutex);
0342 const auto thisSlotNumIt = fThrIDSlotMap.find(thisThreadID);
0343 if (thisSlotNumIt != fThrIDSlotMap.end())
0344 return thisSlotNumIt->second;
0345 const auto newIndex = fThrIDSlotMap.size();
0346 fThrIDSlotMap[thisThreadID] = newIndex;
0347 R__ASSERT(newIndex <= fObjPointers.size() && "This should never happen, we should create new slots as needed");
0348 if (newIndex == fObjPointers.size()) {
0349 fDirectories.emplace_back(Internal::TThreadedObjectUtils::DirCreator<T>::Create());
0350 fObjPointers.emplace_back(nullptr);
0351 }
0352 return newIndex;
0353 }
0354 };
0355
0356 template<class T>
0357 constexpr const TNumSlots TThreadedObject<T>::fgMaxSlots;
0358
0359 }
0360
0361
0362
0363
0364 namespace cling {
0365 template<class T>
0366 std::string printValue(ROOT::TThreadedObject<T> *val)
0367 {
0368 auto model = ((std::unique_ptr<T>*)(val))->get();
0369 std::ostringstream ret;
0370 ret << "A wrapper to make object instances thread private, lazily. "
0371 << "The model which is replicated is " << printValue(model);
0372 return ret.str();
0373 }
0374 }
0375
0376
0377 #endif