File indexing completed on 2025-07-06 08:57:22
0001
0002
0003
0004
0005 #pragma once
0006
0007 #include <JANA/Components/JComponent.h>
0008 #include <JANA/Components/JHasInputs.h>
0009 #include <JANA/Components/JHasRunCallbacks.h>
0010 #include <JANA/JEvent.h>
0011 #include <mutex>
0012
0013 class JApplication;
0014
0015
0016 class JEventProcessor : public jana::components::JComponent,
0017 public jana::components::JHasRunCallbacks,
0018 public jana::components::JHasInputs {
0019 public:
0020
0021 JEventProcessor() = default;
0022 virtual ~JEventProcessor() = default;
0023
0024 [[deprecated]]
0025 explicit JEventProcessor(JApplication* app) {
0026 m_app = app;
0027 }
0028
0029
0030 std::string GetResourceName() const { return m_resource_name; }
0031
0032 uint64_t GetEventCount() const { return m_event_count; };
0033
0034
0035 virtual void DoInitialize() {
0036 std::lock_guard<std::mutex> lock(m_mutex);
0037 for (auto* parameter : m_parameters) {
0038 parameter->Init(*(m_app->GetJParameterManager()), m_prefix);
0039 }
0040 for (auto* service : m_services) {
0041 service->Fetch(m_app);
0042 }
0043 CallWithJExceptionWrapper("JEventProcessor::Init", [&](){ Init(); });
0044 m_status = Status::Initialized;
0045 }
0046
0047
0048 virtual void DoMap(const JEvent& event) {
0049
0050 if (m_callback_style == CallbackStyle::LegacyMode) {
0051 throw JException("Called DoMap() on a legacy-mode JEventProcessor");
0052 }
0053 for (auto* input : m_inputs) {
0054 input->PrefetchCollection(event);
0055 }
0056 for (auto* variadic_input : m_variadic_inputs) {
0057 variadic_input->PrefetchCollection(event);
0058 }
0059 if (m_callback_style == CallbackStyle::ExpertMode) {
0060 ProcessParallel(event);
0061 }
0062 else {
0063 ProcessParallel(event.GetRunNumber(), event.GetEventNumber(), event.GetEventIndex());
0064 }
0065 }
0066
0067
0068 virtual void DoTap(const JEvent& event) {
0069
0070 if (m_callback_style == CallbackStyle::LegacyMode) {
0071 throw JException("Called DoReduce() on a legacy-mode JEventProcessor");
0072 }
0073 std::lock_guard<std::mutex> lock(m_mutex);
0074
0075
0076
0077
0078 if (m_status == Status::Uninitialized) {
0079 throw JException("JEventProcessor: Attempted to call DoTap() before Initialize()");
0080 }
0081 else if (m_status == Status::Finalized) {
0082 throw JException("JEventProcessor: Attempted to call DoMap() after Finalize()");
0083 }
0084 for (auto* input : m_inputs) {
0085
0086
0087
0088 input->GetCollection(event);
0089 }
0090 for (auto* variadic_input : m_variadic_inputs) {
0091 variadic_input->GetCollection(event);
0092 }
0093 auto run_number = event.GetRunNumber();
0094 if (m_last_run_number != run_number) {
0095 for (auto* resource : m_resources) {
0096 resource->ChangeRun(event.GetRunNumber(), m_app);
0097 }
0098 m_last_run_number = run_number;
0099 CallWithJExceptionWrapper("JEventProcessor::ChangeRun", [&](){ ChangeRun(event); });
0100 }
0101 if (m_callback_style == CallbackStyle::DeclarativeMode) {
0102 CallWithJExceptionWrapper("JEventProcessor::ProcessSequential", [&](){
0103 ProcessSequential(event.GetRunNumber(), event.GetEventNumber(), event.GetEventIndex());
0104 });
0105 }
0106 else if (m_callback_style == CallbackStyle::ExpertMode) {
0107 CallWithJExceptionWrapper("JEventProcessor::ProcessSequential", [&](){ ProcessSequential(event); });
0108 }
0109 m_event_count += 1;
0110 }
0111
0112
0113 virtual void DoLegacyProcess(const std::shared_ptr<const JEvent>& event) {
0114
0115
0116
0117
0118
0119
0120
0121 if (m_callback_style != CallbackStyle::LegacyMode) {
0122 throw JException("Called DoLegacyProcess() on a non-legacy-mode JEventProcessor");
0123 }
0124
0125 auto run_number = event->GetRunNumber();
0126
0127 {
0128
0129 std::lock_guard<std::mutex> lock(m_mutex);
0130
0131 if (m_status == Status::Uninitialized) {
0132 throw JException("JEventProcessor: Attempted to call DoLegacyProcess() before Initialize()");
0133 }
0134 else if (m_status == Status::Finalized) {
0135 throw JException("JEventProcessor: Attempted to call DoLegacyProcess() after Finalize()");
0136 }
0137 if (m_last_run_number != run_number) {
0138 if (m_last_run_number != -1) {
0139 CallWithJExceptionWrapper("JEventProcessor::EndRun", [&](){ EndRun(); });
0140 }
0141 for (auto* resource : m_resources) {
0142 resource->ChangeRun(event->GetRunNumber(), m_app);
0143 }
0144 m_last_run_number = run_number;
0145 CallWithJExceptionWrapper("JEventProcessor::BeginRun", [&](){ BeginRun(event); });
0146 }
0147 }
0148 CallWithJExceptionWrapper("JEventProcessor::Process", [&](){ Process(event); });
0149 m_event_count += 1;
0150 }
0151
0152
0153 virtual void DoFinalize() {
0154 std::lock_guard<std::mutex> lock(m_mutex);
0155 if (m_status != Status::Finalized) {
0156 if (m_last_run_number != -1) {
0157 CallWithJExceptionWrapper("JEventProcessor::EndRun", [&](){ EndRun(); });
0158 }
0159 CallWithJExceptionWrapper("JEventProcessor::Finish", [&](){ Finish(); });
0160 m_status = Status::Finalized;
0161 }
0162 }
0163
0164
0165 void Summarize(JComponentSummary& summary) const override {
0166 auto* result = new JComponentSummary::Component(
0167 "Processor", GetPrefix(), GetTypeName(), GetLevel(), GetPluginName());
0168
0169 for (const auto* input : m_inputs) {
0170 result->AddInput(new JComponentSummary::Collection("", input->GetDatabundleName(), input->GetTypeName(), input->GetLevel()));
0171 }
0172 for (const auto* input : m_variadic_inputs) {
0173 size_t subinput_count = input->GetRequestedDatabundleNames().size();
0174 for (size_t i=0; i<subinput_count; ++i) {
0175 result->AddInput(new JComponentSummary::Collection("", input->GetRequestedDatabundleNames()[i], input->GetTypeName(), input->GetLevel()));
0176 }
0177 }
0178 summary.Add(result);
0179 }
0180
0181
0182 virtual void Init() {}
0183
0184
0185
0186
0187 virtual void Process(const std::shared_ptr<const JEvent>& ) {
0188 }
0189
0190
0191
0192 virtual void ProcessParallel(const JEvent& ) {
0193 }
0194
0195 virtual void ProcessSequential(const JEvent& ) {
0196 }
0197
0198
0199
0200 virtual void ProcessParallel(int64_t , uint64_t , uint64_t ) {
0201 }
0202
0203 virtual void ProcessSequential(int64_t , uint64_t , uint64_t ) {
0204 }
0205
0206
0207 virtual void Finish() {}
0208
0209
0210 protected:
0211
0212
0213
0214
0215
0216
0217
0218
0219
0220
0221
0222
0223
0224
0225
0226
0227 private:
0228 std::string m_resource_name;
0229 std::atomic_ullong m_event_count {0};
0230
0231 };
0232
0233