File indexing completed on 2025-01-18 10:01:41
0001
0002
0003
0004
0005 #pragma once
0006
0007 #include <JANA/Components/JComponent.h>
0008 #include <JANA/Components/JHasOutputs.h>
0009 #include <JANA/JEvent.h>
0010 #include <JANA/JException.h>
0011 #include <JANA/JFactoryGenerator.h>
0012
0013
0014 class JFactoryGenerator;
0015 class JApplication;
0016 class JFactory;
0017
0018
0019 class JEventSource : public jana::components::JComponent,
0020 public jana::components::JHasOutputs {
0021
0022 public:
0023
0024
0025
0026 enum class Result { Success, FailureTryAgain, FailureFinished };
0027
0028
0029
0030 enum class RETURN_STATUS { kSUCCESS, kNO_MORE_EVENTS, kBUSY, kTRY_AGAIN, kERROR, kUNKNOWN };
0031
0032
0033
0034
0035 explicit JEventSource(std::string resource_name, JApplication* app = nullptr)
0036 : m_resource_name(std::move(resource_name))
0037 , m_event_count{0}
0038 {
0039 m_app = app;
0040 }
0041
0042 JEventSource() = default;
0043 virtual ~JEventSource() = default;
0044
0045
0046
0047
0048
0049 virtual void Init() {}
0050
0051
0052
0053
0054
0055
0056
0057 virtual void Open() {}
0058
0059
0060
0061
0062
0063
0064
0065
0066
0067
0068 virtual Result Emit(JEvent&) { return Result::Success; };
0069
0070
0071
0072
0073
0074
0075
0076
0077 virtual void Close() {}
0078
0079
0080
0081
0082
0083
0084
0085
0086
0087
0088
0089
0090
0091
0092
0093
0094
0095
0096
0097 virtual void GetEvent(std::shared_ptr<JEvent>) {};
0098
0099 virtual void Preprocess(const JEvent&) {};
0100
0101
0102
0103
0104
0105
0106
0107
0108
0109 virtual void FinishEvent(JEvent&) {};
0110
0111
0112
0113
0114
0115 virtual bool GetObjects(const std::shared_ptr<const JEvent>&, JFactory*) {
0116 return false;
0117 }
0118
0119
0120 virtual void DoInit() {
0121 if (m_status == Status::Uninitialized) {
0122 CallWithJExceptionWrapper("JEventSource::Init", [&](){ Init();});
0123 m_status = Status::Initialized;
0124 LOG_INFO(GetLogger()) << "Initialized JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0125 }
0126 else {
0127 throw JException("Attempted to initialize a JEventSource that is not uninitialized!");
0128 }
0129 }
0130
0131 [[deprecated("Replaced by JEventSource::DoOpen()")]]
0132 virtual void DoInitialize() {
0133 DoOpen();
0134 }
0135
0136 virtual void DoOpen(bool with_lock=true) {
0137 if (with_lock) {
0138 std::lock_guard<std::mutex> lock(m_mutex);
0139 if (m_status != Status::Initialized) {
0140 throw JException("Attempted to open a JEventSource that hasn't been initialized!");
0141 }
0142 CallWithJExceptionWrapper("JEventSource::Open", [&](){ Open();});
0143 if (GetResourceName().empty()) {
0144 LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "'" << LOG_END;
0145 }
0146 else {
0147 LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0148 }
0149 m_status = Status::Opened;
0150 }
0151 else {
0152 if (m_status != Status::Initialized) {
0153 throw JException("Attempted to open a JEventSource that hasn't been initialized!");
0154 }
0155 CallWithJExceptionWrapper("JEventSource::Open", [&](){ Open();});
0156 if (GetResourceName().empty()) {
0157 LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "'" << LOG_END;
0158 }
0159 else {
0160 LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0161 }
0162 m_status = Status::Opened;
0163 }
0164 }
0165
0166 virtual void DoClose(bool with_lock=true) {
0167 if (with_lock) {
0168 std::lock_guard<std::mutex> lock(m_mutex);
0169
0170 if (m_status != JEventSource::Status::Opened) return;
0171
0172 CallWithJExceptionWrapper("JEventSource::Close", [&](){ Close();});
0173 if (GetResourceName().empty()) {
0174 LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "'" << LOG_END;
0175 }
0176 else {
0177 LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0178 }
0179 m_status = Status::Closed;
0180 }
0181 else {
0182 if (m_status != JEventSource::Status::Opened) return;
0183
0184 CallWithJExceptionWrapper("JEventSource::Close", [&](){ Close();});
0185 if (GetResourceName().empty()) {
0186 LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "'" << LOG_END;
0187 }
0188 else {
0189 LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0190 }
0191 m_status = Status::Closed;
0192 }
0193 }
0194
0195 Result DoNext(std::shared_ptr<JEvent> event) {
0196
0197 std::lock_guard<std::mutex> lock(m_mutex);
0198
0199 if (m_status == Status::Uninitialized) {
0200 throw JException("JEventSource has not been initialized!");
0201 }
0202
0203 if (m_callback_style == CallbackStyle::LegacyMode) {
0204 return DoNextCompatibility(event);
0205 }
0206
0207 auto first_evt_nr = m_nskip;
0208 auto last_evt_nr = m_nevents + m_nskip;
0209
0210 if (m_status == Status::Initialized) {
0211 DoOpen(false);
0212 }
0213 if (m_status == Status::Opened) {
0214 if (m_nevents != 0 && (m_event_count == last_evt_nr)) {
0215
0216 DoClose(false);
0217 return Result::FailureFinished;
0218 }
0219
0220
0221
0222 event->SetEventNumber(m_event_count);
0223 event->SetJEventSource(this);
0224 event->SetSequential(false);
0225 event->GetJCallGraphRecorder()->Reset();
0226
0227
0228 auto previous_origin = event->GetJCallGraphRecorder()->SetInsertDataOrigin( JCallGraphRecorder::ORIGIN_FROM_SOURCE);
0229 JEventSource::Result result;
0230 CallWithJExceptionWrapper("JEventSource::Emit", [&](){
0231 result = Emit(*event);
0232 });
0233 event->GetJCallGraphRecorder()->SetInsertDataOrigin( previous_origin );
0234
0235 if (result == Result::Success) {
0236 m_event_count += 1;
0237
0238
0239 for (auto* output : m_outputs) {
0240 output->InsertCollection(*event);
0241 }
0242 if (m_event_count <= first_evt_nr) {
0243
0244
0245 return Result::FailureTryAgain;
0246 }
0247 return Result::Success;
0248 }
0249 else if (result == Result::FailureFinished) {
0250
0251
0252 DoClose(false);
0253 return Result::FailureFinished;
0254 }
0255 else if (result == Result::FailureTryAgain) {
0256
0257
0258 return Result::FailureTryAgain;
0259 }
0260 else {
0261 throw JException("Invalid JEventSource::Result value!");
0262 }
0263 }
0264 else {
0265 return Result::FailureFinished;
0266 }
0267 }
0268
0269 Result DoNextCompatibility(std::shared_ptr<JEvent> event) {
0270
0271 auto first_evt_nr = m_nskip;
0272 auto last_evt_nr = m_nevents + m_nskip;
0273
0274 try {
0275 if (m_status == Status::Initialized) {
0276 DoOpen(false);
0277 }
0278 if (m_status == Status::Opened) {
0279 if (m_event_count < first_evt_nr) {
0280
0281 event->SetEventNumber(m_event_count);
0282 auto previous_origin = event->GetJCallGraphRecorder()->SetInsertDataOrigin( JCallGraphRecorder::ORIGIN_FROM_SOURCE);
0283 GetEvent(event);
0284 event->GetJCallGraphRecorder()->SetInsertDataOrigin( previous_origin );
0285 m_event_count += 1;
0286 return Result::FailureTryAgain;
0287 } else if (m_nevents != 0 && (m_event_count == last_evt_nr)) {
0288
0289 DoClose(false);
0290 return Result::FailureFinished;
0291 } else {
0292
0293
0294 event->SetEventNumber(m_event_count);
0295 event->SetJApplication(m_app);
0296 event->SetJEventSource(this);
0297 event->SetSequential(false);
0298 event->GetJCallGraphRecorder()->Reset();
0299 auto previous_origin = event->GetJCallGraphRecorder()->SetInsertDataOrigin( JCallGraphRecorder::ORIGIN_FROM_SOURCE);
0300 GetEvent(event);
0301 for (auto* output : m_outputs) {
0302 output->InsertCollection(*event);
0303 }
0304 event->GetJCallGraphRecorder()->SetInsertDataOrigin( previous_origin );
0305 m_event_count += 1;
0306 return Result::Success;
0307 }
0308 } else if (m_status == Status::Closed) {
0309 return Result::FailureFinished;
0310 } else {
0311 throw JException("Invalid m_status");
0312 }
0313 }
0314 catch (RETURN_STATUS rs) {
0315
0316 if (rs == RETURN_STATUS::kNO_MORE_EVENTS) {
0317 DoClose(false);
0318 return Result::FailureFinished;
0319 }
0320 else if (rs == RETURN_STATUS::kTRY_AGAIN || rs == RETURN_STATUS::kBUSY) {
0321 return Result::FailureTryAgain;
0322 }
0323 else if (rs == RETURN_STATUS::kERROR || rs == RETURN_STATUS::kUNKNOWN) {
0324 JException ex ("JEventSource threw RETURN_STATUS::kERROR or kUNKNOWN");
0325 ex.plugin_name = m_plugin_name;
0326 ex.type_name = m_type_name;
0327 ex.function_name = "JEventSource::GetEvent";
0328 ex.instance_name = m_resource_name;
0329 throw ex;
0330 }
0331 else {
0332 return Result::Success;
0333 }
0334 }
0335 catch (JException& ex) {
0336 if (ex.function_name.empty()) ex.function_name = "JEventSource::GetEvent";
0337 if (ex.type_name.empty()) ex.type_name = m_type_name;
0338 if (ex.instance_name.empty()) ex.instance_name = m_prefix;
0339 if (ex.plugin_name.empty()) ex.plugin_name = m_plugin_name;
0340 throw ex;
0341 }
0342 catch (std::exception& e){
0343 auto ex = JException(e.what());
0344 ex.exception_type = JTypeInfo::demangle_current_exception_type();
0345 ex.nested_exception = std::current_exception();
0346 ex.function_name = "JEventSource::GetEvent";
0347 ex.type_name = m_type_name;
0348 ex.instance_name = m_prefix;
0349 ex.plugin_name = m_plugin_name;
0350 throw ex;
0351 }
0352 catch (...) {
0353 auto ex = JException("Unknown exception");
0354 ex.exception_type = JTypeInfo::demangle_current_exception_type();
0355 ex.nested_exception = std::current_exception();
0356 ex.function_name = "JEventSource::GetEvent";
0357 ex.type_name = m_type_name;
0358 ex.instance_name = m_prefix;
0359 ex.plugin_name = m_plugin_name;
0360 throw ex;
0361 }
0362 }
0363
0364
0365
0366
0367
0368 void DoFinish(JEvent& event) {
0369 if (m_enable_finish_event) {
0370 std::lock_guard<std::mutex> lock(m_mutex);
0371 CallWithJExceptionWrapper("JEventSource::FinishEvent", [&](){
0372 FinishEvent(event);
0373 });
0374 }
0375 }
0376
0377 void Summarize(JComponentSummary& summary) const override {
0378
0379 auto* result = new JComponentSummary::Component(
0380 "Source", GetPrefix(), GetTypeName(), GetLevel(), GetPluginName());
0381
0382 for (const auto* output : m_outputs) {
0383 size_t suboutput_count = output->collection_names.size();
0384 for (size_t i=0; i<suboutput_count; ++i) {
0385 result->AddOutput(new JComponentSummary::Collection("", output->collection_names[i], output->type_name, GetLevel()));
0386 }
0387 }
0388
0389 summary.Add(result);
0390 }
0391
0392
0393
0394 void SetResourceName(std::string resource_name) { m_resource_name = resource_name; }
0395
0396 std::string GetResourceName() const { return m_resource_name; }
0397
0398 uint64_t GetEventCount() const { return m_event_count; };
0399
0400
0401 virtual std::string GetType() const { return m_type_name; }
0402
0403
0404 std::string GetName() const { return m_resource_name; }
0405
0406 bool IsGetObjectsEnabled() const { return m_enable_get_objects; }
0407 bool IsFinishEventEnabled() const { return m_enable_finish_event; }
0408
0409
0410 virtual std::string GetVDescription() const {
0411 return "<description unavailable>";
0412 }
0413
0414
0415 uint64_t GetNSkip() { return m_nskip; }
0416 uint64_t GetNEvents() { return m_nevents; }
0417
0418
0419
0420
0421
0422
0423
0424 void EnableFinishEvent(bool enable=true) { m_enable_finish_event = enable; }
0425 void EnableGetObjects(bool enable=true) { m_enable_get_objects = enable; }
0426
0427
0428 void SetNEvents(uint64_t nevents) { m_nevents = nevents; };
0429
0430
0431 void SetNSkip(uint64_t nskip) { m_nskip = nskip; };
0432
0433
0434 private:
0435 std::string m_resource_name;
0436 std::atomic_ullong m_event_count {0};
0437 uint64_t m_nskip = 0;
0438 uint64_t m_nevents = 0;
0439 bool m_enable_finish_event = false;
0440 bool m_enable_get_objects = false;
0441
0442 };
0443
0444