Warning, file /jana2/src/libraries/JANA/JEventProcessor.h was not indexed
or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).
0001
0002
0003
0004
0005 #pragma once
0006
0007 #include <JANA/Components/JComponent.h>
0008 #include <JANA/Components/JHasInputs.h>
0009 #include <JANA/Components/JHasRunCallbacks.h>
0010 #include <JANA/JEvent.h>
0011 #include <mutex>
0012
0013 class JApplication;
0014
0015
0016 class JEventProcessor : public jana::components::JComponent,
0017 public jana::components::JHasRunCallbacks,
0018 public jana::components::JHasInputs {
0019 public:
0020
0021 JEventProcessor() {
0022 m_type_name = "JEventProcessor";
0023 }
0024 virtual ~JEventProcessor() = default;
0025
0026 [[deprecated]]
0027 explicit JEventProcessor(JApplication* app) {
0028 m_app = app;
0029 }
0030
0031
0032 std::string GetResourceName() const { return m_resource_name; }
0033
0034 uint64_t GetEventCount() const { return m_event_count; };
0035
0036 bool IsOrderingEnabled() const { return m_enable_ordering; }
0037
0038 void EnableOrdering(bool enable=true) { m_enable_ordering = enable; }
0039
0040
0041 virtual void DoMap(const JEvent& event) {
0042
0043 if (m_callback_style == CallbackStyle::LegacyMode) {
0044 throw JException("Called DoMap() on a legacy-mode JEventProcessor");
0045 }
0046 for (auto* input : m_inputs) {
0047 input->TriggerFactoryCreate(event);
0048 }
0049 for (auto* variadic_input : m_variadic_inputs) {
0050 variadic_input->TriggerFactoryCreate(event);
0051 }
0052 ProcessParallel(event);
0053 }
0054
0055
0056 virtual void DoTap(const JEvent& event) {
0057
0058 if (m_callback_style == CallbackStyle::LegacyMode) {
0059 throw JException("Called DoTap() on a legacy-mode JEventProcessor");
0060 }
0061 std::lock_guard<std::mutex> lock(m_mutex);
0062
0063
0064
0065
0066 if (!m_is_initialized) {
0067 throw JException("JEventProcessor: Attempted to call DoTap() before Initialize()");
0068 }
0069 else if (m_is_finalized) {
0070 throw JException("JEventProcessor: Attempted to call DoMap() after Finalize()");
0071 }
0072 for (auto* input : m_inputs) {
0073
0074
0075
0076 input->Populate(event);
0077 }
0078 for (auto* variadic_input : m_variadic_inputs) {
0079 variadic_input->Populate(event);
0080 }
0081 auto run_number = event.GetRunNumber();
0082 if (m_last_run_number != run_number) {
0083 for (auto* resource : m_resources) {
0084 resource->ChangeRun(event.GetRunNumber(), m_app);
0085 }
0086 m_last_run_number = run_number;
0087 CallWithJExceptionWrapper("JEventProcessor::ChangeRun", [&](){ ChangeRun(event); });
0088 }
0089 CallWithJExceptionWrapper("JEventProcessor::ProcessSequential", [&](){ ProcessSequential(event); });
0090 m_event_count += 1;
0091 }
0092
0093
0094 virtual void DoLegacyProcess(const std::shared_ptr<const JEvent>& event) {
0095
0096
0097
0098
0099
0100
0101
0102 if (m_callback_style != CallbackStyle::LegacyMode) {
0103 throw JException("Called DoLegacyProcess() on a non-legacy-mode JEventProcessor");
0104 }
0105
0106 auto run_number = event->GetRunNumber();
0107
0108 {
0109
0110 std::lock_guard<std::mutex> lock(m_mutex);
0111
0112 if (!m_is_initialized) {
0113 throw JException("JEventProcessor: Attempted to call DoLegacyProcess() before Initialize()");
0114 }
0115 else if (m_is_finalized) {
0116 throw JException("JEventProcessor: Attempted to call DoLegacyProcess() after Finalize()");
0117 }
0118 if (m_last_run_number != run_number) {
0119 if (m_last_run_number != -1) {
0120 CallWithJExceptionWrapper("JEventProcessor::EndRun", [&](){ EndRun(); });
0121 }
0122 for (auto* resource : m_resources) {
0123 resource->ChangeRun(event->GetRunNumber(), m_app);
0124 }
0125 m_last_run_number = run_number;
0126 CallWithJExceptionWrapper("JEventProcessor::BeginRun", [&](){ BeginRun(event); });
0127 }
0128 }
0129 CallWithJExceptionWrapper("JEventProcessor::Process", [&](){ Process(event); });
0130 m_event_count += 1;
0131 }
0132
0133
0134 virtual void DoFinalize() {
0135 std::lock_guard<std::mutex> lock(m_mutex);
0136 if (!m_is_finalized) {
0137 if (m_last_run_number != -1) {
0138 CallWithJExceptionWrapper("JEventProcessor::EndRun", [&](){ EndRun(); });
0139 }
0140 CallWithJExceptionWrapper("JEventProcessor::Finish", [&](){ Finish(); });
0141 m_is_finalized = true;
0142 }
0143 }
0144
0145
0146 void Summarize(JComponentSummary& summary) const override {
0147 auto* result = new JComponentSummary::Component(
0148 "Processor", GetPrefix(), GetTypeName(), GetLevel(), GetPluginName());
0149
0150 for (const auto* input : m_inputs) {
0151 result->AddInput(new JComponentSummary::Collection("", input->GetDatabundleName(), input->GetTypeName(), input->GetLevel()));
0152 }
0153 for (const auto* input : m_variadic_inputs) {
0154 size_t subinput_count = input->GetRequestedDatabundleNames().size();
0155 for (size_t i=0; i<subinput_count; ++i) {
0156 result->AddInput(new JComponentSummary::Collection("", input->GetRequestedDatabundleNames()[i], input->GetTypeName(), input->GetLevel()));
0157 }
0158 }
0159 summary.Add(result);
0160 }
0161
0162
0163
0164
0165 virtual void Process(const std::shared_ptr<const JEvent>& ) {
0166 }
0167
0168
0169
0170 virtual void ProcessParallel(const JEvent& ) {
0171 }
0172
0173 virtual void ProcessSequential(const JEvent& ) {
0174 }
0175
0176
0177 virtual void Finish() {}
0178
0179
0180 protected:
0181
0182
0183
0184
0185
0186
0187
0188
0189
0190
0191
0192
0193
0194
0195
0196
0197 private:
0198 bool m_enable_ordering = false;
0199 std::string m_resource_name;
0200 std::atomic_ullong m_event_count {0};
0201
0202 };
0203
0204