Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:17:38

0001 
0002 // Copyright 2020, Jefferson Science Associates, LLC.
0003 // Subject to the terms in the LICENSE file found in the top-level directory.
0004 
0005 #pragma once
0006 
0007 #include <JANA/Components/JComponent.h>
0008 #include <JANA/Components/JHasInputs.h>
0009 #include <JANA/Components/JHasRunCallbacks.h>
0010 #include <JANA/JEvent.h>
0011 #include <mutex>
0012 
0013 class JApplication;
0014 
0015 
0016 class JEventProcessor : public jana::components::JComponent, 
0017                         public jana::components::JHasRunCallbacks, 
0018                         public jana::components::JHasInputs {
0019 public:
0020 
0021     JEventProcessor() = default;
0022     virtual ~JEventProcessor() = default;
0023 
0024     // TODO: Deprecate
0025     explicit JEventProcessor(JApplication* app) {
0026         m_app = app;
0027     }
0028 
0029 
0030     std::string GetResourceName() const { return m_resource_name; }
0031 
0032     uint64_t GetEventCount() const { return m_event_count; };
0033 
0034 
0035     virtual void DoInitialize() {
0036         std::lock_guard<std::mutex> lock(m_mutex);
0037         for (auto* parameter : m_parameters) {
0038             parameter->Configure(*(m_app->GetJParameterManager()), m_prefix);
0039         }
0040         for (auto* service : m_services) {
0041             service->Fetch(m_app);
0042         }
0043         CallWithJExceptionWrapper("JEventProcessor::Init", [&](){ Init(); });
0044         m_status = Status::Initialized;
0045     }
0046 
0047 
0048     virtual void DoMap(const JEvent& event) {
0049 
0050         if (m_callback_style == CallbackStyle::LegacyMode) {
0051             throw JException("Called DoMap() on a legacy-mode JEventProcessor");
0052         }
0053         for (auto* input : m_inputs) {
0054             input->PrefetchCollection(event);
0055         }
0056         if (m_callback_style == CallbackStyle::ExpertMode) {
0057             ProcessParallel(event);
0058         }
0059         else {
0060             ProcessParallel(event.GetRunNumber(), event.GetEventNumber(), event.GetEventIndex());
0061         }
0062     }
0063 
0064 
0065     virtual void DoTap(const JEvent& event) {
0066 
0067         if (m_callback_style == CallbackStyle::LegacyMode) {
0068             throw JException("Called DoReduce() on a legacy-mode JEventProcessor");
0069         }
0070         std::lock_guard<std::mutex> lock(m_mutex);
0071         // In principle DoReduce() is being called by one thread at a time, but we hold a lock anyway 
0072         // so that this runs correctly even if that isn't happening. This lock shouldn't experience
0073         // any contention.
0074 
0075         if (m_status == Status::Uninitialized) {
0076             throw JException("JEventProcessor: Attempted to call DoTap() before Initialize()");
0077         }
0078         else if (m_status == Status::Finalized) {
0079             throw JException("JEventProcessor: Attempted to call DoMap() after Finalize()");
0080         }
0081         for (auto* input : m_inputs) {
0082             // This collection should have already been computed during DoMap()
0083             // We do this before ChangeRun() just in case we will need to pull data out of
0084             // a begin-of-run event.
0085             input->GetCollection(event);
0086         }
0087         auto run_number = event.GetRunNumber();
0088         if (m_last_run_number != run_number) {
0089             for (auto* resource : m_resources) {
0090                 resource->ChangeRun(event.GetRunNumber(), m_app);
0091             }
0092             m_last_run_number = run_number;
0093             CallWithJExceptionWrapper("JEventProcessor::ChangeRun", [&](){ ChangeRun(event); });
0094         }
0095         if (m_callback_style == CallbackStyle::DeclarativeMode) {
0096             CallWithJExceptionWrapper("JEventProcessor::Process", [&](){ 
0097                 Process(event.GetRunNumber(), event.GetEventNumber(), event.GetEventIndex());
0098             });
0099         }
0100         else if (m_callback_style == CallbackStyle::ExpertMode) {
0101             CallWithJExceptionWrapper("JEventProcessor::Process", [&](){ Process(event); });
0102         }
0103         m_event_count += 1;
0104     }
0105 
0106 
0107     virtual void DoLegacyProcess(const std::shared_ptr<const JEvent>& event) {
0108 
0109         // DoLegacyProcess holds a lock to make sure that {Begin,Change,End}Run() are always called before Process(). 
0110         // Note that in LegacyMode, Process() requires the user to manage a _separate_ lock for its critical section.
0111         // This arrangement means that {Begin,Change,End}Run() will definitely be called at least once before `Process`, but there
0112         // may be races when there are multiple run numbers present in the stream. This isn't a problem in practice for now, 
0113         // but future work should use ExpertMode or DeclarativeMode for this reason (but also for the usability improvements!)
0114 
0115         if (m_callback_style != CallbackStyle::LegacyMode) {
0116             throw JException("Called DoLegacyProcess() on a non-legacy-mode JEventProcessor");
0117         }
0118 
0119         auto run_number = event->GetRunNumber();
0120 
0121         {
0122             // Protect the call to BeginRun(), etc, to prevent some threads from running Process() before BeginRun().
0123             std::lock_guard<std::mutex> lock(m_mutex);
0124 
0125             if (m_status == Status::Uninitialized) {
0126                 throw JException("JEventProcessor: Attempted to call DoLegacyProcess() before Initialize()");
0127             }
0128             else if (m_status == Status::Finalized) {
0129                 throw JException("JEventProcessor: Attempted to call DoLegacyProcess() after Finalize()");
0130             }
0131             if (m_last_run_number != run_number) {
0132                 if (m_last_run_number != -1) {
0133                     CallWithJExceptionWrapper("JEventProcessor::EndRun", [&](){ EndRun(); });
0134                 }
0135                 for (auto* resource : m_resources) {
0136                     resource->ChangeRun(event->GetRunNumber(), m_app);
0137                 }
0138                 m_last_run_number = run_number;
0139                 CallWithJExceptionWrapper("JEventProcessor::BeginRun", [&](){ BeginRun(event); });
0140             }
0141         }
0142         CallWithJExceptionWrapper("JEventProcessor::Process", [&](){ Process(event); });
0143         m_event_count += 1;
0144     }
0145 
0146 
0147     virtual void DoFinalize() {
0148         std::lock_guard<std::mutex> lock(m_mutex);
0149         if (m_status != Status::Finalized) {
0150             if (m_last_run_number != -1) {
0151                 CallWithJExceptionWrapper("JEventProcessor::EndRun", [&](){ EndRun(); });
0152             }
0153             CallWithJExceptionWrapper("JEventProcessor::Finish", [&](){ Finish(); });
0154             m_status = Status::Finalized;
0155         }
0156     }
0157 
0158 
0159     void Summarize(JComponentSummary& summary) const override {
0160         auto* result = new JComponentSummary::Component(
0161             "Processor", GetPrefix(), GetTypeName(), GetLevel(), GetPluginName());
0162 
0163         for (const auto* input : m_inputs) {
0164             size_t subinput_count = input->names.size();
0165             for (size_t i=0; i<subinput_count; ++i) {
0166                 result->AddInput(new JComponentSummary::Collection("", input->names[i], input->type_name, input->levels[i]));
0167             }
0168         }
0169         summary.Add(result);
0170     }
0171 
0172 
0173     virtual void Init() {}
0174 
0175 
0176     // LegacyMode-specific callbacks
0177 
0178     virtual void Process(const std::shared_ptr<const JEvent>& /*event*/) {
0179     }
0180 
0181     // ExpertMode-specific callbacks
0182 
0183     virtual void ProcessParallel(const JEvent& /*event*/) {
0184     }
0185 
0186     virtual void Process(const JEvent& /*event*/) {
0187     }
0188 
0189     // DeclarativeMode-specific callbacks
0190 
0191     virtual void ProcessParallel(int64_t /*run_nr*/, uint64_t /*event_nr*/, uint64_t /*event_idx*/) {
0192     }
0193 
0194     virtual void Process(int64_t /*run_nr*/, uint64_t /*event_nr*/, uint64_t /*event_idx*/) {
0195     }
0196 
0197 
0198     virtual void Finish() {}
0199 
0200 
0201     [[deprecated]]
0202     virtual std::string GetType() const {
0203         return m_type_name;
0204     }
0205 
0206 protected:
0207 
0208     // The following are meant to be called by the user from the constructor in order to
0209     // configure their JEventProcessor instance.
0210 
0211     /// Resource name lets the user tell the parallelization engine to synchronize different EventProcessors
0212     /// which write to the same shared resource; e.g. if you have two EventProcessors
0213     /// which both write to a ROOT tree, they should both set the resource name 'ROOT'. On the flip side,
0214     /// if you have two EventProcessors which write to different resources, e.g. ROOT and a CSV file, and
0215     /// you set different resource names, the parallelization engine will know that it is safe to pipeline
0216     /// these two processors. If you don't set a resource name at all, the parallelization engine will
0217     /// assume that you are manually synchronizing access via your own mutex, which will be safe if and only
0218     /// if you use your locks correctly, and also may result in a performance penalty.
0219 
0220     // void SetResourceName(std::string resource_name) { m_resource_name = std::move(resource_name); }
0221 
0222 
0223 private:
0224     std::string m_resource_name;
0225     std::atomic_ullong m_event_count {0};
0226 
0227 };
0228 
0229