|
||||
File indexing completed on 2025-01-18 10:17:38
0001 0002 #pragma once 0003 #include <mutex> 0004 0005 #include <JANA/JEventProcessor.h> 0006 #include <JANA/Services/JGlobalRootLock.h> 0007 0008 /// This class can be used to safely implement sequential code while ensuring 0009 /// the factory algorithms are run in parallel using the JANA ROOT lock. 0010 /// 0011 /// n.b. If you need to run sequentially, but for reasons other than ROOT, 0012 /// then please look at the JEventProcessorSequential class. 0013 /// 0014 /// This is a specialized version of JEventProcessor that allows data types 0015 /// that will be needed to be specified in the subclass definition. The 0016 /// types will be prefetched via event->Get() calls and then a mutex locked 0017 /// prior to calling the ProcessSequential method. The types are specified 0018 /// by adding PrefetchT data members to the class as illustrated in the 0019 /// example below. 0020 /// 0021 /// Note that this will automatically lock the 0022 /// 0023 /// class DaveTestProcessor: public JEventProcessorSequentialRoot { 0024 /// 0025 /// public: 0026 /// 0027 /// // These declare object types that should be automatically fetched 0028 /// // from the event before ProcessSequential is called. 0029 /// PrefetchT<Hit> hits = {this}; 0030 /// PrefetchT<Cluster> clusters = {this, "MyTag"}; 0031 /// 0032 /// // This will be run sequentially 0033 /// void ProcessSequential(const std::shared_ptr<const JEvent>& event) override { 0034 /// 0035 /// // Do NOT call event->Get() here! 0036 /// 0037 /// // The hits and clusters objects will already be filled by calls 0038 /// // to event->Get(). Just use them here via their operator(). 0039 /// for( auto h : hits() ){ 0040 /// // h is const Hit* 0041 /// } 0042 /// } 0043 /// 0044 /// // Boilerplate stuff 0045 /// DaveTestProcessor() { SetTypeName(NAME_OF_THIS); } 0046 /// void Init() override {} 0047 /// void Finish() override {} 0048 /// }; 0049 /// 0050 class JEventProcessorSequentialRoot : public JEventProcessor { 0051 // This works in the following way: 0052 // 0053 // Two utility classes are defined here: Prefetch and PrefetchT with the 0054 // latter being a template. Subclasses can add any number of data members 0055 // of type PrefetchT. Using initializers, a list of the Prefetch members 0056 // is stored in the JEventProcessorSequentialRoot object itself. 0057 // 0058 // The Process method here loops over the Prefetch members calling 0059 // event->Get() for each and storing the resulting vector<const *T> 0060 // in the Prefetch member object. It then locks its own mutex before 0061 // calling ProcessSequential(). 0062 // 0063 // The Process method is defined here and marked as "final" so subclasses 0064 // may not overide it. Instead, a new virtual method, ProcessSequential, 0065 // is added that the user may override. 0066 // 0067 // The tricky business here is capturing a list of the Prefetch data 0068 // members so they can be looped over at run time. Thus, the strange syntax 0069 // of passing the "this" argument to the PrefetchT constructors. 0070 // Similarly, each Prefetch object passes its "this" pointer back to the 0071 // JEventProcessorSequentialRoot object that owns it. 0072 0073 public: 0074 0075 JEventProcessorSequentialRoot() {} 0076 virtual ~JEventProcessorSequentialRoot() = default; 0077 0078 // non-templated base class 0079 class Prefetch { 0080 public: 0081 virtual void Get(const std::shared_ptr<const JEvent> &event) = 0; 0082 virtual void Fill(const std::shared_ptr<const JEvent> &event) = 0; 0083 }; 0084 0085 // typed class 0086 template<typename T> 0087 class PrefetchT : public Prefetch { 0088 public: 0089 // This constructor gets called by the {this} initializer for the data member. 0090 PrefetchT(JEventProcessorSequentialRoot *jeps, const std::string &tag = "") : mTag( 0091 tag) { jeps->mPrefetch.push_back(this); } 0092 0093 std::vector<const T*> &operator()() { return mObjs; } 0094 void Get(const std::shared_ptr<const JEvent> &event) { event->Get<T>(mTag); } 0095 void Fill(const std::shared_ptr<const JEvent> &event) { mObjs = event->Get<T>(mTag); } 0096 0097 private: 0098 PrefetchT() = default; // user must pass "this" so member can be added to our mPrefetch 0099 0100 std::string mTag; 0101 std::vector<const T *> mObjs; 0102 }; 0103 0104 0105 // JEventProcessorSequentialRoot takes control of Init, Finish, and Process. The user overrides 0106 // ProcessSequential, InitWithGlobalRootLock, and FinishWithGlobalRootLock instead. 0107 0108 void Init() final { 0109 mGlobalWriteLock = GetApplication()->GetService<JGlobalRootLock>(); 0110 mGlobalWriteLock->acquire_write_lock(); 0111 try { 0112 InitWithGlobalRootLock(); 0113 mGlobalWriteLock->release_lock(); 0114 } 0115 catch (...) { 0116 mGlobalWriteLock->release_lock(); 0117 // Ideally we'd use the STL read-write lock (std::shared_mutex/lock) intead of the pthreads one. 0118 // However for now we are limited to C++14 (we would need C++17). 0119 throw; 0120 // It is important to re-throw exceptions so that the framework can handle them correctly 0121 } 0122 }; 0123 0124 void Finish() final { 0125 mGlobalWriteLock->acquire_write_lock(); 0126 try { 0127 FinishWithGlobalRootLock(); 0128 mGlobalWriteLock->release_lock(); 0129 } 0130 catch (...) { 0131 mGlobalWriteLock->release_lock(); 0132 // Ideally we'd use the STL read-write lock (std::shared_mutex/lock) intead of the pthreads one. 0133 // However for now we are limited to C++14 (we would need C++17). 0134 throw; 0135 // It is important to re-throw exceptions so that the framework can handle them correctly 0136 } 0137 }; 0138 0139 void Process(const std::shared_ptr<const JEvent> &event) override final { 0140 for (auto p: mPrefetch) p->Get(event); // make sure all factories have been activated 0141 0142 std::lock_guard<std::mutex> lock(mFillMutex); 0143 for (auto p: mPrefetch) p->Fill(event); // Copy object pointers into members 0144 ProcessSequential(event); 0145 } 0146 0147 // These are what the user implements (in lieu of Process, Init, and Finish) 0148 0149 virtual void ProcessSequential(const std::shared_ptr<const JEvent>&) {}; 0150 0151 virtual void InitWithGlobalRootLock() {}; 0152 0153 virtual void FinishWithGlobalRootLock() {}; 0154 0155 private: 0156 0157 std::vector<Prefetch *> mPrefetch; 0158 std::shared_ptr<JGlobalRootLock> mGlobalWriteLock; 0159 std::mutex mFillMutex; // We don't need the global root lock to fill a histogram as long as all reads and writes happen inside ProcessSequential! 0160 }; 0161 0162 0163
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.3.7 LXR engine. The LXR team |