Back to home page

EIC code displayed by LXR

 
 

    


Warning, file /jana2/src/libraries/JANA/JEventProcessor.h was not indexed or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).

0001 
0002 // Copyright 2020, Jefferson Science Associates, LLC.
0003 // Subject to the terms in the LICENSE file found in the top-level directory.
0004 
0005 #pragma once
0006 
0007 #include <JANA/Components/JComponent.h>
0008 #include <JANA/Components/JHasInputs.h>
0009 #include <JANA/Components/JHasRunCallbacks.h>
0010 #include <JANA/JEvent.h>
0011 #include <mutex>
0012 
0013 class JApplication;
0014 
0015 
0016 class JEventProcessor : public jana::components::JComponent, 
0017                         public jana::components::JHasRunCallbacks, 
0018                         public jana::components::JHasInputs {
0019 public:
0020 
0021     JEventProcessor() {
0022         m_type_name = "JEventProcessor";
0023     }
0024     virtual ~JEventProcessor() = default;
0025 
0026     [[deprecated]]
0027     explicit JEventProcessor(JApplication* app) {
0028         m_app = app;
0029     }
0030 
0031 
0032     std::string GetResourceName() const { return m_resource_name; }
0033 
0034     uint64_t GetEventCount() const { return m_event_count; };
0035 
0036     bool IsOrderingEnabled() const { return m_enable_ordering; }
0037 
0038     void EnableOrdering(bool enable=true) { m_enable_ordering = enable; }
0039 
0040 
0041     virtual void DoMap(const JEvent& event) {
0042 
0043         if (m_callback_style == CallbackStyle::LegacyMode) {
0044             throw JException("Called DoMap() on a legacy-mode JEventProcessor");
0045         }
0046         for (auto* input : m_inputs) {
0047             input->TriggerFactoryCreate(event);
0048         }
0049         for (auto* variadic_input : m_variadic_inputs) {
0050             variadic_input->TriggerFactoryCreate(event);
0051         }
0052         ProcessParallel(event);
0053     }
0054 
0055 
0056     virtual void DoTap(const JEvent& event) {
0057 
0058         if (m_callback_style == CallbackStyle::LegacyMode) {
0059             throw JException("Called DoTap() on a legacy-mode JEventProcessor");
0060         }
0061         std::lock_guard<std::mutex> lock(m_mutex);
0062         // In principle DoReduce() is being called by one thread at a time, but we hold a lock anyway 
0063         // so that this runs correctly even if that isn't happening. This lock shouldn't experience
0064         // any contention.
0065 
0066         if (!m_is_initialized) {
0067             throw JException("JEventProcessor: Attempted to call DoTap() before Initialize()");
0068         }
0069         else if (m_is_finalized) {
0070             throw JException("JEventProcessor: Attempted to call DoMap() after Finalize()");
0071         }
0072         for (auto* input : m_inputs) {
0073             // This collection should have already been computed during DoMap()
0074             // We do this before ChangeRun() just in case we will need to pull data out of
0075             // a begin-of-run event.
0076             input->Populate(event);
0077         }
0078         for (auto* variadic_input : m_variadic_inputs) {
0079             variadic_input->Populate(event);
0080         }
0081         auto run_number = event.GetRunNumber();
0082         if (m_last_run_number != run_number) {
0083             for (auto* resource : m_resources) {
0084                 resource->ChangeRun(event.GetRunNumber(), m_app);
0085             }
0086             m_last_run_number = run_number;
0087             CallWithJExceptionWrapper("JEventProcessor::ChangeRun", [&](){ ChangeRun(event); });
0088         }
0089         CallWithJExceptionWrapper("JEventProcessor::ProcessSequential", [&](){ ProcessSequential(event); });
0090         m_event_count += 1;
0091     }
0092 
0093 
0094     virtual void DoLegacyProcess(const std::shared_ptr<const JEvent>& event) {
0095 
0096         // DoLegacyProcess holds a lock to make sure that {Begin,Change,End}Run() are always called before Process(). 
0097         // Note that in LegacyMode, Process() requires the user to manage a _separate_ lock for its critical section.
0098         // This arrangement means that {Begin,Change,End}Run() will definitely be called at least once before `Process`, but there
0099         // may be races when there are multiple run numbers present in the stream. This isn't a problem in practice for now, 
0100         // but future work should use ExpertMode for this reason (but also for the usability improvements!)
0101 
0102         if (m_callback_style != CallbackStyle::LegacyMode) {
0103             throw JException("Called DoLegacyProcess() on a non-legacy-mode JEventProcessor");
0104         }
0105 
0106         auto run_number = event->GetRunNumber();
0107 
0108         {
0109             // Protect the call to BeginRun(), etc, to prevent some threads from running Process() before BeginRun().
0110             std::lock_guard<std::mutex> lock(m_mutex);
0111 
0112             if (!m_is_initialized) {
0113                 throw JException("JEventProcessor: Attempted to call DoLegacyProcess() before Initialize()");
0114             }
0115             else if (m_is_finalized) {
0116                 throw JException("JEventProcessor: Attempted to call DoLegacyProcess() after Finalize()");
0117             }
0118             if (m_last_run_number != run_number) {
0119                 if (m_last_run_number != -1) {
0120                     CallWithJExceptionWrapper("JEventProcessor::EndRun", [&](){ EndRun(); });
0121                 }
0122                 for (auto* resource : m_resources) {
0123                     resource->ChangeRun(event->GetRunNumber(), m_app);
0124                 }
0125                 m_last_run_number = run_number;
0126                 CallWithJExceptionWrapper("JEventProcessor::BeginRun", [&](){ BeginRun(event); });
0127             }
0128         }
0129         CallWithJExceptionWrapper("JEventProcessor::Process", [&](){ Process(event); });
0130         m_event_count += 1;
0131     }
0132 
0133 
0134     virtual void DoFinalize() {
0135         std::lock_guard<std::mutex> lock(m_mutex);
0136         if (!m_is_finalized) {
0137             if (m_last_run_number != -1) {
0138                 CallWithJExceptionWrapper("JEventProcessor::EndRun", [&](){ EndRun(); });
0139             }
0140             CallWithJExceptionWrapper("JEventProcessor::Finish", [&](){ Finish(); });
0141             m_is_finalized = true;
0142         }
0143     }
0144 
0145 
0146     void Summarize(JComponentSummary& summary) const override {
0147         auto* result = new JComponentSummary::Component(
0148             "Processor", GetPrefix(), GetTypeName(), GetLevel(), GetPluginName());
0149 
0150         for (const auto* input : m_inputs) {
0151             result->AddInput(new JComponentSummary::Collection("", input->GetDatabundleName(), input->GetTypeName(), input->GetLevel()));
0152         }
0153         for (const auto* input : m_variadic_inputs) {
0154             size_t subinput_count = input->GetRequestedDatabundleNames().size();
0155             for (size_t i=0; i<subinput_count; ++i) {
0156                 result->AddInput(new JComponentSummary::Collection("", input->GetRequestedDatabundleNames()[i], input->GetTypeName(), input->GetLevel()));
0157             }
0158         }
0159         summary.Add(result);
0160     }
0161 
0162 
0163     // LegacyMode-specific callbacks
0164 
0165     virtual void Process(const std::shared_ptr<const JEvent>& /*event*/) {
0166     }
0167 
0168     // ExpertMode-specific callbacks
0169 
0170     virtual void ProcessParallel(const JEvent& /*event*/) {
0171     }
0172 
0173     virtual void ProcessSequential(const JEvent& /*event*/) {
0174     }
0175 
0176 
0177     virtual void Finish() {}
0178 
0179 
0180 protected:
0181 
0182     // The following are meant to be called by the user from the constructor in order to
0183     // configure their JEventProcessor instance.
0184 
0185     /// Resource name lets the user tell the parallelization engine to synchronize different EventProcessors
0186     /// which write to the same shared resource; e.g. if you have two EventProcessors
0187     /// which both write to a ROOT tree, they should both set the resource name 'ROOT'. On the flip side,
0188     /// if you have two EventProcessors which write to different resources, e.g. ROOT and a CSV file, and
0189     /// you set different resource names, the parallelization engine will know that it is safe to pipeline
0190     /// these two processors. If you don't set a resource name at all, the parallelization engine will
0191     /// assume that you are manually synchronizing access via your own mutex, which will be safe if and only
0192     /// if you use your locks correctly, and also may result in a performance penalty.
0193 
0194     // void SetResourceName(std::string resource_name) { m_resource_name = std::move(resource_name); }
0195 
0196 
0197 private:
0198     bool m_enable_ordering = false;
0199     std::string m_resource_name;
0200     std::atomic_ullong m_event_count {0};
0201 
0202 };
0203 
0204