Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-07-06 08:57:22

0001 
0002 // Copyright 2020, Jefferson Science Associates, LLC.
0003 // Subject to the terms in the LICENSE file found in the top-level directory.
0004 
0005 #pragma once
0006 
0007 #include <JANA/Components/JComponent.h>
0008 #include <JANA/Components/JHasInputs.h>
0009 #include <JANA/Components/JHasRunCallbacks.h>
0010 #include <JANA/JEvent.h>
0011 #include <mutex>
0012 
0013 class JApplication;
0014 
0015 
0016 class JEventProcessor : public jana::components::JComponent, 
0017                         public jana::components::JHasRunCallbacks, 
0018                         public jana::components::JHasInputs {
0019 public:
0020 
0021     JEventProcessor() = default;
0022     virtual ~JEventProcessor() = default;
0023 
0024     [[deprecated]]
0025     explicit JEventProcessor(JApplication* app) {
0026         m_app = app;
0027     }
0028 
0029 
0030     std::string GetResourceName() const { return m_resource_name; }
0031 
0032     uint64_t GetEventCount() const { return m_event_count; };
0033 
0034 
0035     virtual void DoInitialize() {
0036         std::lock_guard<std::mutex> lock(m_mutex);
0037         for (auto* parameter : m_parameters) {
0038             parameter->Init(*(m_app->GetJParameterManager()), m_prefix);
0039         }
0040         for (auto* service : m_services) {
0041             service->Fetch(m_app);
0042         }
0043         CallWithJExceptionWrapper("JEventProcessor::Init", [&](){ Init(); });
0044         m_status = Status::Initialized;
0045     }
0046 
0047 
0048     virtual void DoMap(const JEvent& event) {
0049 
0050         if (m_callback_style == CallbackStyle::LegacyMode) {
0051             throw JException("Called DoMap() on a legacy-mode JEventProcessor");
0052         }
0053         for (auto* input : m_inputs) {
0054             input->PrefetchCollection(event);
0055         }
0056         for (auto* variadic_input : m_variadic_inputs) {
0057             variadic_input->PrefetchCollection(event);
0058         }
0059         if (m_callback_style == CallbackStyle::ExpertMode) {
0060             ProcessParallel(event);
0061         }
0062         else {
0063             ProcessParallel(event.GetRunNumber(), event.GetEventNumber(), event.GetEventIndex());
0064         }
0065     }
0066 
0067 
0068     virtual void DoTap(const JEvent& event) {
0069 
0070         if (m_callback_style == CallbackStyle::LegacyMode) {
0071             throw JException("Called DoReduce() on a legacy-mode JEventProcessor");
0072         }
0073         std::lock_guard<std::mutex> lock(m_mutex);
0074         // In principle DoReduce() is being called by one thread at a time, but we hold a lock anyway 
0075         // so that this runs correctly even if that isn't happening. This lock shouldn't experience
0076         // any contention.
0077 
0078         if (m_status == Status::Uninitialized) {
0079             throw JException("JEventProcessor: Attempted to call DoTap() before Initialize()");
0080         }
0081         else if (m_status == Status::Finalized) {
0082             throw JException("JEventProcessor: Attempted to call DoMap() after Finalize()");
0083         }
0084         for (auto* input : m_inputs) {
0085             // This collection should have already been computed during DoMap()
0086             // We do this before ChangeRun() just in case we will need to pull data out of
0087             // a begin-of-run event.
0088             input->GetCollection(event);
0089         }
0090         for (auto* variadic_input : m_variadic_inputs) {
0091             variadic_input->GetCollection(event);
0092         }
0093         auto run_number = event.GetRunNumber();
0094         if (m_last_run_number != run_number) {
0095             for (auto* resource : m_resources) {
0096                 resource->ChangeRun(event.GetRunNumber(), m_app);
0097             }
0098             m_last_run_number = run_number;
0099             CallWithJExceptionWrapper("JEventProcessor::ChangeRun", [&](){ ChangeRun(event); });
0100         }
0101         if (m_callback_style == CallbackStyle::DeclarativeMode) {
0102             CallWithJExceptionWrapper("JEventProcessor::ProcessSequential", [&](){ 
0103                 ProcessSequential(event.GetRunNumber(), event.GetEventNumber(), event.GetEventIndex());
0104             });
0105         }
0106         else if (m_callback_style == CallbackStyle::ExpertMode) {
0107             CallWithJExceptionWrapper("JEventProcessor::ProcessSequential", [&](){ ProcessSequential(event); });
0108         }
0109         m_event_count += 1;
0110     }
0111 
0112 
0113     virtual void DoLegacyProcess(const std::shared_ptr<const JEvent>& event) {
0114 
0115         // DoLegacyProcess holds a lock to make sure that {Begin,Change,End}Run() are always called before Process(). 
0116         // Note that in LegacyMode, Process() requires the user to manage a _separate_ lock for its critical section.
0117         // This arrangement means that {Begin,Change,End}Run() will definitely be called at least once before `Process`, but there
0118         // may be races when there are multiple run numbers present in the stream. This isn't a problem in practice for now, 
0119         // but future work should use ExpertMode or DeclarativeMode for this reason (but also for the usability improvements!)
0120 
0121         if (m_callback_style != CallbackStyle::LegacyMode) {
0122             throw JException("Called DoLegacyProcess() on a non-legacy-mode JEventProcessor");
0123         }
0124 
0125         auto run_number = event->GetRunNumber();
0126 
0127         {
0128             // Protect the call to BeginRun(), etc, to prevent some threads from running Process() before BeginRun().
0129             std::lock_guard<std::mutex> lock(m_mutex);
0130 
0131             if (m_status == Status::Uninitialized) {
0132                 throw JException("JEventProcessor: Attempted to call DoLegacyProcess() before Initialize()");
0133             }
0134             else if (m_status == Status::Finalized) {
0135                 throw JException("JEventProcessor: Attempted to call DoLegacyProcess() after Finalize()");
0136             }
0137             if (m_last_run_number != run_number) {
0138                 if (m_last_run_number != -1) {
0139                     CallWithJExceptionWrapper("JEventProcessor::EndRun", [&](){ EndRun(); });
0140                 }
0141                 for (auto* resource : m_resources) {
0142                     resource->ChangeRun(event->GetRunNumber(), m_app);
0143                 }
0144                 m_last_run_number = run_number;
0145                 CallWithJExceptionWrapper("JEventProcessor::BeginRun", [&](){ BeginRun(event); });
0146             }
0147         }
0148         CallWithJExceptionWrapper("JEventProcessor::Process", [&](){ Process(event); });
0149         m_event_count += 1;
0150     }
0151 
0152 
0153     virtual void DoFinalize() {
0154         std::lock_guard<std::mutex> lock(m_mutex);
0155         if (m_status != Status::Finalized) {
0156             if (m_last_run_number != -1) {
0157                 CallWithJExceptionWrapper("JEventProcessor::EndRun", [&](){ EndRun(); });
0158             }
0159             CallWithJExceptionWrapper("JEventProcessor::Finish", [&](){ Finish(); });
0160             m_status = Status::Finalized;
0161         }
0162     }
0163 
0164 
0165     void Summarize(JComponentSummary& summary) const override {
0166         auto* result = new JComponentSummary::Component(
0167             "Processor", GetPrefix(), GetTypeName(), GetLevel(), GetPluginName());
0168 
0169         for (const auto* input : m_inputs) {
0170             result->AddInput(new JComponentSummary::Collection("", input->GetDatabundleName(), input->GetTypeName(), input->GetLevel()));
0171         }
0172         for (const auto* input : m_variadic_inputs) {
0173             size_t subinput_count = input->GetRequestedDatabundleNames().size();
0174             for (size_t i=0; i<subinput_count; ++i) {
0175                 result->AddInput(new JComponentSummary::Collection("", input->GetRequestedDatabundleNames()[i], input->GetTypeName(), input->GetLevel()));
0176             }
0177         }
0178         summary.Add(result);
0179     }
0180 
0181 
0182     virtual void Init() {}
0183 
0184 
0185     // LegacyMode-specific callbacks
0186 
0187     virtual void Process(const std::shared_ptr<const JEvent>& /*event*/) {
0188     }
0189 
0190     // ExpertMode-specific callbacks
0191 
0192     virtual void ProcessParallel(const JEvent& /*event*/) {
0193     }
0194 
0195     virtual void ProcessSequential(const JEvent& /*event*/) {
0196     }
0197 
0198     // DeclarativeMode-specific callbacks
0199 
0200     virtual void ProcessParallel(int64_t /*run_nr*/, uint64_t /*event_nr*/, uint64_t /*event_idx*/) {
0201     }
0202 
0203     virtual void ProcessSequential(int64_t /*run_nr*/, uint64_t /*event_nr*/, uint64_t /*event_idx*/) {
0204     }
0205 
0206 
0207     virtual void Finish() {}
0208 
0209 
0210 protected:
0211 
0212     // The following are meant to be called by the user from the constructor in order to
0213     // configure their JEventProcessor instance.
0214 
0215     /// Resource name lets the user tell the parallelization engine to synchronize different EventProcessors
0216     /// which write to the same shared resource; e.g. if you have two EventProcessors
0217     /// which both write to a ROOT tree, they should both set the resource name 'ROOT'. On the flip side,
0218     /// if you have two EventProcessors which write to different resources, e.g. ROOT and a CSV file, and
0219     /// you set different resource names, the parallelization engine will know that it is safe to pipeline
0220     /// these two processors. If you don't set a resource name at all, the parallelization engine will
0221     /// assume that you are manually synchronizing access via your own mutex, which will be safe if and only
0222     /// if you use your locks correctly, and also may result in a performance penalty.
0223 
0224     // void SetResourceName(std::string resource_name) { m_resource_name = std::move(resource_name); }
0225 
0226 
0227 private:
0228     std::string m_resource_name;
0229     std::atomic_ullong m_event_count {0};
0230 
0231 };
0232 
0233