File indexing completed on 2025-01-18 10:17:38
0001
0002
0003
0004
0005 #pragma once
0006
0007 #include <JANA/Components/JComponent.h>
0008 #include <JANA/Components/JHasInputs.h>
0009 #include <JANA/Components/JHasRunCallbacks.h>
0010 #include <JANA/JEvent.h>
0011 #include <mutex>
0012
0013 class JApplication;
0014
0015
0016 class JEventProcessor : public jana::components::JComponent,
0017 public jana::components::JHasRunCallbacks,
0018 public jana::components::JHasInputs {
0019 public:
0020
0021 JEventProcessor() = default;
0022 virtual ~JEventProcessor() = default;
0023
0024
0025 explicit JEventProcessor(JApplication* app) {
0026 m_app = app;
0027 }
0028
0029
0030 std::string GetResourceName() const { return m_resource_name; }
0031
0032 uint64_t GetEventCount() const { return m_event_count; };
0033
0034
0035 virtual void DoInitialize() {
0036 std::lock_guard<std::mutex> lock(m_mutex);
0037 for (auto* parameter : m_parameters) {
0038 parameter->Configure(*(m_app->GetJParameterManager()), m_prefix);
0039 }
0040 for (auto* service : m_services) {
0041 service->Fetch(m_app);
0042 }
0043 CallWithJExceptionWrapper("JEventProcessor::Init", [&](){ Init(); });
0044 m_status = Status::Initialized;
0045 }
0046
0047
0048 virtual void DoMap(const JEvent& event) {
0049
0050 if (m_callback_style == CallbackStyle::LegacyMode) {
0051 throw JException("Called DoMap() on a legacy-mode JEventProcessor");
0052 }
0053 for (auto* input : m_inputs) {
0054 input->PrefetchCollection(event);
0055 }
0056 if (m_callback_style == CallbackStyle::ExpertMode) {
0057 ProcessParallel(event);
0058 }
0059 else {
0060 ProcessParallel(event.GetRunNumber(), event.GetEventNumber(), event.GetEventIndex());
0061 }
0062 }
0063
0064
0065 virtual void DoTap(const JEvent& event) {
0066
0067 if (m_callback_style == CallbackStyle::LegacyMode) {
0068 throw JException("Called DoReduce() on a legacy-mode JEventProcessor");
0069 }
0070 std::lock_guard<std::mutex> lock(m_mutex);
0071
0072
0073
0074
0075 if (m_status == Status::Uninitialized) {
0076 throw JException("JEventProcessor: Attempted to call DoTap() before Initialize()");
0077 }
0078 else if (m_status == Status::Finalized) {
0079 throw JException("JEventProcessor: Attempted to call DoMap() after Finalize()");
0080 }
0081 for (auto* input : m_inputs) {
0082
0083
0084
0085 input->GetCollection(event);
0086 }
0087 auto run_number = event.GetRunNumber();
0088 if (m_last_run_number != run_number) {
0089 for (auto* resource : m_resources) {
0090 resource->ChangeRun(event.GetRunNumber(), m_app);
0091 }
0092 m_last_run_number = run_number;
0093 CallWithJExceptionWrapper("JEventProcessor::ChangeRun", [&](){ ChangeRun(event); });
0094 }
0095 if (m_callback_style == CallbackStyle::DeclarativeMode) {
0096 CallWithJExceptionWrapper("JEventProcessor::Process", [&](){
0097 Process(event.GetRunNumber(), event.GetEventNumber(), event.GetEventIndex());
0098 });
0099 }
0100 else if (m_callback_style == CallbackStyle::ExpertMode) {
0101 CallWithJExceptionWrapper("JEventProcessor::Process", [&](){ Process(event); });
0102 }
0103 m_event_count += 1;
0104 }
0105
0106
0107 virtual void DoLegacyProcess(const std::shared_ptr<const JEvent>& event) {
0108
0109
0110
0111
0112
0113
0114
0115 if (m_callback_style != CallbackStyle::LegacyMode) {
0116 throw JException("Called DoLegacyProcess() on a non-legacy-mode JEventProcessor");
0117 }
0118
0119 auto run_number = event->GetRunNumber();
0120
0121 {
0122
0123 std::lock_guard<std::mutex> lock(m_mutex);
0124
0125 if (m_status == Status::Uninitialized) {
0126 throw JException("JEventProcessor: Attempted to call DoLegacyProcess() before Initialize()");
0127 }
0128 else if (m_status == Status::Finalized) {
0129 throw JException("JEventProcessor: Attempted to call DoLegacyProcess() after Finalize()");
0130 }
0131 if (m_last_run_number != run_number) {
0132 if (m_last_run_number != -1) {
0133 CallWithJExceptionWrapper("JEventProcessor::EndRun", [&](){ EndRun(); });
0134 }
0135 for (auto* resource : m_resources) {
0136 resource->ChangeRun(event->GetRunNumber(), m_app);
0137 }
0138 m_last_run_number = run_number;
0139 CallWithJExceptionWrapper("JEventProcessor::BeginRun", [&](){ BeginRun(event); });
0140 }
0141 }
0142 CallWithJExceptionWrapper("JEventProcessor::Process", [&](){ Process(event); });
0143 m_event_count += 1;
0144 }
0145
0146
0147 virtual void DoFinalize() {
0148 std::lock_guard<std::mutex> lock(m_mutex);
0149 if (m_status != Status::Finalized) {
0150 if (m_last_run_number != -1) {
0151 CallWithJExceptionWrapper("JEventProcessor::EndRun", [&](){ EndRun(); });
0152 }
0153 CallWithJExceptionWrapper("JEventProcessor::Finish", [&](){ Finish(); });
0154 m_status = Status::Finalized;
0155 }
0156 }
0157
0158
0159 void Summarize(JComponentSummary& summary) const override {
0160 auto* result = new JComponentSummary::Component(
0161 "Processor", GetPrefix(), GetTypeName(), GetLevel(), GetPluginName());
0162
0163 for (const auto* input : m_inputs) {
0164 size_t subinput_count = input->names.size();
0165 for (size_t i=0; i<subinput_count; ++i) {
0166 result->AddInput(new JComponentSummary::Collection("", input->names[i], input->type_name, input->levels[i]));
0167 }
0168 }
0169 summary.Add(result);
0170 }
0171
0172
0173 virtual void Init() {}
0174
0175
0176
0177
0178 virtual void Process(const std::shared_ptr<const JEvent>& ) {
0179 }
0180
0181
0182
0183 virtual void ProcessParallel(const JEvent& ) {
0184 }
0185
0186 virtual void Process(const JEvent& ) {
0187 }
0188
0189
0190
0191 virtual void ProcessParallel(int64_t , uint64_t , uint64_t ) {
0192 }
0193
0194 virtual void Process(int64_t , uint64_t , uint64_t ) {
0195 }
0196
0197
0198 virtual void Finish() {}
0199
0200
0201 [[deprecated]]
0202 virtual std::string GetType() const {
0203 return m_type_name;
0204 }
0205
0206 protected:
0207
0208
0209
0210
0211
0212
0213
0214
0215
0216
0217
0218
0219
0220
0221
0222
0223 private:
0224 std::string m_resource_name;
0225 std::atomic_ullong m_event_count {0};
0226
0227 };
0228
0229