Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:10:51

0001 // @(#)root/thread:$Id$
0002 // Author: Danilo Piparo, CERN  11/2/2016
0003 
0004 /*************************************************************************
0005  * Copyright (C) 1995-2018, Rene Brun and Fons Rademakers.               *
0006  * All rights reserved.                                                  *
0007  *                                                                       *
0008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
0009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
0010  *************************************************************************/
0011 
0012 #ifndef ROOT_TThreadedObject
0013 #define ROOT_TThreadedObject
0014 
0015 #include "ROOT/TSpinMutex.hxx"
0016 #include "TDirectory.h"
0017 #include "TError.h"
0018 #include "TList.h"
0019 #include "TROOT.h"
0020 
0021 
0022 #include <algorithm>
0023 #include <exception>
0024 #include <deque>
0025 #include <functional>
0026 #include <map>
0027 #include <memory>
0028 #include <mutex>
0029 #include <sstream>
0030 #include <string>
0031 #include <thread>
0032 #include <vector>
0033 
0034 class TH1;
0035 
0036 namespace ROOT {
0037 
0038    /**
0039     * \class ROOT::TNumSlots
0040     * \brief Defines the number of threads in some of ROOT's interfaces.
0041     */
0042    struct TNumSlots {
0043       unsigned int fVal; // number of slots
0044       friend bool operator==(TNumSlots lhs, TNumSlots rhs) { return lhs.fVal == rhs.fVal; }
0045       friend bool operator!=(TNumSlots lhs, TNumSlots rhs) { return lhs.fVal != rhs.fVal; }
0046    };
0047 
0048    namespace Internal {
0049 
0050       namespace TThreadedObjectUtils {
0051 
0052 
0053          template<typename T, bool ISHISTO = std::is_base_of<TH1,T>::value>
0054          struct Detacher{
0055             static T* Detach(T* obj) {
0056                return obj;
0057             }
0058          };
0059 
0060          template<typename T>
0061          struct Detacher<T, true>{
0062             static T* Detach(T* obj) {
0063                obj->SetDirectory(nullptr);
0064                obj->ResetBit(kMustCleanup);
0065                return obj;
0066             }
0067          };
0068 
0069          /// Return a copy of the object or a "Clone" if the copy constructor is not implemented.
0070          template<class T, bool isCopyConstructible = std::is_copy_constructible<T>::value>
0071          struct Cloner {
0072             static T *Clone(const T *obj, TDirectory* d = nullptr) {
0073                T* clone;
0074                if (d){
0075                   TDirectory::TContext ctxt(d);
0076                   clone = new T(*obj);
0077                } else {
0078                   clone = new T(*obj);
0079                }
0080                return Detacher<T>::Detach(clone);
0081             }
0082          };
0083 
0084          template<class T>
0085          struct Cloner<T, false> {
0086             static T *Clone(const T *obj, TDirectory* d = nullptr) {
0087                T* clone;
0088                if (d){
0089                   TDirectory::TContext ctxt(d);
0090                   clone = (T*)obj->Clone();
0091                } else {
0092                   clone = (T*)obj->Clone();
0093                }
0094                return clone;
0095             }
0096          };
0097 
0098          template <class T, bool ISHISTO = std::is_base_of<TH1, T>::value>
0099          struct DirCreator {
0100             static TDirectory *Create()
0101             {
0102                static unsigned dirCounter = 0;
0103                const std::string dirName = "__TThreaded_dir_" + std::to_string(dirCounter++) + "_";
0104                return gROOT->mkdir(dirName.c_str());
0105             }
0106          };
0107 
0108          template <class T>
0109          struct DirCreator<T, true> {
0110             static TDirectory *Create() { return nullptr; }
0111          };
0112 
0113       } // End of namespace TThreadedObjectUtils
0114    } // End of namespace Internal
0115 
0116    namespace TThreadedObjectUtils {
0117 
0118       template<class T>
0119       using MergeFunctionType = std::function<void(std::shared_ptr<T>, std::vector<std::shared_ptr<T>>&)>;
0120 
0121       /// Merge TObjects
0122       template<class T>
0123       void MergeTObjects(std::shared_ptr<T> target, std::vector<std::shared_ptr<T>> &objs)
0124       {
0125          if (!target) return;
0126          TList objTList;
0127          // Cannot do better than this
0128          for (auto obj : objs) {
0129             if (obj && obj != target) objTList.Add(obj.get());
0130          }
0131          target->Merge(&objTList);
0132       }
0133    } // end of namespace TThreadedObjectUtils
0134 
0135    /**
0136     * \class ROOT::TThreadedObject
0137     * \brief A wrapper to make object instances thread private, lazily.
0138     * \tparam T Class of the object to be made thread private (e.g. TH1F)
0139     * \ingroup Parallelism
0140     *
0141     * A wrapper which makes objects thread private. The methods of the underlying
0142     * object can be invoked via the arrow operator. The object is created in
0143     * a specific thread lazily, i.e. upon invocation of one of its methods.
0144     * The correct object pointer from within a particular thread can be accessed
0145     * with the overloaded arrow operator or with the Get method.
0146     * In case an elaborate thread management is in place, e.g. in presence of
0147     * stream of operations or "processing slots", it is also possible to
0148     * manually select the correct object pointer explicitly.
0149     */
0150    template<class T>
0151    class TThreadedObject {
0152    public:
0153       /// The initial number of empty processing slots that a TThreadedObject is constructed with by default.
0154       /// Deprecated: TThreadedObject grows as more slots are required.
0155       static constexpr const TNumSlots fgMaxSlots{64};
0156 
0157       TThreadedObject(const TThreadedObject&) = delete;
0158 
0159       /// Construct the TThreadedObject with initSlots empty slots and the "model" of the thread private objects.
0160       /// \param initSlots Set the initial number of slots of the TThreadedObject.
0161       /// \tparam ARGS Arguments' class type of the constructor of T
0162       /// \param args variadic arguments
0163       ///
0164       /// This form of the constructor is useful to manually pre-set the content of a given number of slots
0165       /// when used in combination with TThreadedObject::SetAtSlot().
0166       template <class... ARGS>
0167       TThreadedObject(TNumSlots initSlots, ARGS &&... args) : fIsMerged(false)
0168       {
0169          const auto nSlots = initSlots.fVal;
0170          fObjPointers.resize(nSlots);
0171 
0172          // create at least one directory (we need it for fModel), plus others as needed by the size of fObjPointers
0173          fDirectories.emplace_back(Internal::TThreadedObjectUtils::DirCreator<T>::Create());
0174          for (auto i = 1u; i < nSlots; ++i)
0175             fDirectories.emplace_back(Internal::TThreadedObjectUtils::DirCreator<T>::Create());
0176 
0177          TDirectory::TContext ctxt(fDirectories[0]);
0178          fModel.reset(Internal::TThreadedObjectUtils::Detacher<T>::Detach(new T(std::forward<ARGS>(args)...)));
0179       }
0180 
0181       /// Construct the TThreadedObject and the "model" of the thread private objects.
0182       /// \tparam ARGS Arguments of the constructor of T
0183       template<class ...ARGS>
0184       TThreadedObject(ARGS&&... args) : TThreadedObject(fgMaxSlots, args...) { }
0185 
0186       /// Return the number of currently available slot.
0187       ///
0188       /// The method is safe to call concurrently to other TThreadedObject methods.
0189       /// Note that slots could be available but contain no data (i.e. a nullptr) if
0190       /// they have not been used yet.
0191       unsigned GetNSlots() const
0192       {
0193          std::lock_guard<ROOT::TSpinMutex> lg(fSpinMutex);
0194          return fObjPointers.size();
0195       }
0196 
0197       /// Access a particular processing slot.
0198       ///
0199       /// This method is thread-safe as long as concurrent calls request different slots (i.e. pass a different
0200       /// argument) and no thread accesses slot `i` via the arrow operator, so mixing usage of GetAtSlot
0201       /// with usage of the arrow operator can be dangerous.
0202       std::shared_ptr<T> GetAtSlot(unsigned i)
0203       {
0204          std::size_t nAvailableSlots;
0205          {
0206             // fObjPointers can grow due to a concurrent operation on this TThreadedObject, need to lock
0207             std::lock_guard<ROOT::TSpinMutex> lg(fSpinMutex);
0208             nAvailableSlots = fObjPointers.size();
0209          }
0210 
0211          if (i >= nAvailableSlots) {
0212             Warning("TThreadedObject::GetAtSlot", "This slot does not exist.");
0213             return nullptr;
0214          }
0215 
0216          auto &objPointer = fObjPointers[i];
0217          if (!objPointer)
0218             objPointer.reset(Internal::TThreadedObjectUtils::Cloner<T>::Clone(fModel.get(), fDirectories[i]));
0219          return objPointer;
0220       }
0221 
0222       /// Set the value of a particular slot.
0223       ///
0224       /// This method is thread-safe as long as concurrent calls access different slots (i.e. pass a different
0225       /// argument) and no thread accesses slot `i` via the arrow operator, so mixing usage of SetAtSlot
0226       /// with usage of the arrow operator can be dangerous.
0227       void SetAtSlot(unsigned i, std::shared_ptr<T> v)
0228       {
0229          std::size_t nAvailableSlots;
0230          {
0231             // fObjPointers can grow due to a concurrent operation on this TThreadedObject, need to lock
0232             std::lock_guard<ROOT::TSpinMutex> lg(fSpinMutex);
0233             nAvailableSlots = fObjPointers.size();
0234          }
0235 
0236          if (i >= nAvailableSlots) {
0237             Warning("TThreadedObject::SetAtSlot", "This slot does not exist, doing nothing.");
0238             return;
0239          }
0240 
0241          fObjPointers[i] = v;
0242       }
0243 
0244       /// Access a particular slot which corresponds to a single thread.
0245       /// This is in general faster than the GetAtSlot method but it is
0246       /// responsibility of the caller to make sure that the slot exists
0247       /// and to check that the contained object is initialized (and not
0248       /// a nullptr).
0249       std::shared_ptr<T> GetAtSlotUnchecked(unsigned i) const
0250       {
0251          return fObjPointers[i];
0252       }
0253 
0254       /// Access a particular slot which corresponds to a single thread.
0255       /// This overload is faster than the GetAtSlotUnchecked method but
0256       /// the caller is responsible to make sure that the slot exists, to
0257       /// check that the contained object is initialized and that the returned
0258       /// pointer will not outlive the TThreadedObject that returned it, which
0259       /// maintains ownership of the actual object.
0260       T* GetAtSlotRaw(unsigned i) const
0261       {
0262          return fObjPointers[i].get();
0263       }
0264 
0265       /// Access the pointer corresponding to the current slot. This method is
0266       /// not adequate for being called inside tight loops as it implies a
0267       /// lookup in a mapping between the threadIDs and the slot indices.
0268       /// A good practice consists in copying the pointer onto the stack and
0269       /// proceed with the loop as shown in this work item (psudo-code) which
0270       /// will be sent to different threads:
0271       /// ~~~{.cpp}
0272       /// auto workItem = [](){
0273       ///    auto objPtr = tthreadedObject.Get();
0274       ///    for (auto i : ROOT::TSeqI(1000)) {
0275       ///       // tthreadedObject->FastMethod(i); // don't do this! Inefficient!
0276       ///       objPtr->FastMethod(i);
0277       ///    }
0278       /// }
0279       /// ~~~
0280       std::shared_ptr<T> Get()
0281       {
0282          return GetAtSlot(GetThisSlotNumber());
0283       }
0284 
0285       /// Access the wrapped object and allow to call its methods.
0286       T *operator->()
0287       {
0288          return Get().get();
0289       }
0290 
0291       /// Merge all the thread private objects. Can be called once: it does not
0292       /// create any new object but destroys the present bookkeping collapsing
0293       /// all objects into the one at slot 0.
0294       std::shared_ptr<T> Merge(TThreadedObjectUtils::MergeFunctionType<T> mergeFunction = TThreadedObjectUtils::MergeTObjects<T>)
0295       {
0296          // We do not return if we already merged.
0297          if (fIsMerged) {
0298             Warning("TThreadedObject::Merge", "This object was already merged. Returning the previous result.");
0299             return fObjPointers[0];
0300          }
0301          // need to convert to std::vector because historically mergeFunction requires a vector
0302          auto vecOfObjPtrs = std::vector<std::shared_ptr<T>>(fObjPointers.begin(), fObjPointers.end());
0303          mergeFunction(fObjPointers[0], vecOfObjPtrs);
0304          fIsMerged = true;
0305          return fObjPointers[0];
0306       }
0307 
0308       /// Merge all the thread private objects. Can be called many times. It
0309       /// does create a new instance of class T to represent the "Sum" object.
0310       /// This method is not thread safe: correct or acceptable behaviours
0311       /// depend on the nature of T and of the merging function.
0312       std::unique_ptr<T> SnapshotMerge(TThreadedObjectUtils::MergeFunctionType<T> mergeFunction = TThreadedObjectUtils::MergeTObjects<T>)
0313       {
0314          if (fIsMerged) {
0315             Warning("TThreadedObject::SnapshotMerge", "This object was already merged. Returning the previous result.");
0316             return std::unique_ptr<T>(Internal::TThreadedObjectUtils::Cloner<T>::Clone(fObjPointers[0].get()));
0317          }
0318          auto targetPtr = Internal::TThreadedObjectUtils::Cloner<T>::Clone(fModel.get());
0319          std::shared_ptr<T> targetPtrShared(targetPtr, [](T *) {});
0320          // need to convert to std::vector because historically mergeFunction requires a vector
0321          auto vecOfObjPtrs = std::vector<std::shared_ptr<T>>(fObjPointers.begin(), fObjPointers.end());
0322          mergeFunction(targetPtrShared, vecOfObjPtrs);
0323          return std::unique_ptr<T>(targetPtr);
0324       }
0325 
0326    private:
0327       std::unique_ptr<T> fModel;                         ///< Use to store a "model" of the object
0328       // std::deque's guarantee that references to the elements are not invalidated when appending new slots
0329       std::deque<std::shared_ptr<T>> fObjPointers;       ///< An object pointer per slot
0330       // If the object is a histogram, we also create dummy directories that the histogram associates with
0331       // so we do not pollute gDirectory
0332       std::deque<TDirectory*> fDirectories;              ///< A TDirectory per slot
0333       std::map<std::thread::id, unsigned> fThrIDSlotMap; ///< A mapping between the thread IDs and the slots
0334       mutable ROOT::TSpinMutex fSpinMutex;               ///< Protects concurrent access to fThrIDSlotMap, fObjPointers
0335       bool fIsMerged : 1;                                ///< Remember if the objects have been merged already
0336 
0337       /// Get the slot number for this threadID, make a slot if needed
0338       unsigned GetThisSlotNumber()
0339       {
0340          const auto thisThreadID = std::this_thread::get_id();
0341          std::lock_guard<ROOT::TSpinMutex> lg(fSpinMutex);
0342          const auto thisSlotNumIt = fThrIDSlotMap.find(thisThreadID);
0343          if (thisSlotNumIt != fThrIDSlotMap.end())
0344             return thisSlotNumIt->second;
0345          const auto newIndex = fThrIDSlotMap.size();
0346          fThrIDSlotMap[thisThreadID] = newIndex;
0347          R__ASSERT(newIndex <= fObjPointers.size() && "This should never happen, we should create new slots as needed");
0348          if (newIndex == fObjPointers.size()) {
0349             fDirectories.emplace_back(Internal::TThreadedObjectUtils::DirCreator<T>::Create());
0350             fObjPointers.emplace_back(nullptr);
0351          }
0352          return newIndex;
0353       }
0354    };
0355 
0356    template<class T>
0357    constexpr const TNumSlots TThreadedObject<T>::fgMaxSlots;
0358 
0359 } // End ROOT namespace
0360 
0361 ////////////////////////////////////////////////////////////////////////////////
0362 /// Print a TThreadedObject at the prompt:
0363 
0364 namespace cling {
0365    template<class T>
0366    std::string printValue(ROOT::TThreadedObject<T> *val)
0367    {
0368       auto model = ((std::unique_ptr<T>*)(val))->get();
0369       std::ostringstream ret;
0370       ret << "A wrapper to make object instances thread private, lazily. "
0371           << "The model which is replicated is " << printValue(model);
0372       return ret.str();
0373    }
0374 }
0375 
0376 
0377 #endif