File indexing completed on 2025-07-01 08:58:14
0001 #include <JANA/JEventSource.h>
0002
0003 void JEventSource::DoInit() {
0004 std::lock_guard<std::mutex> lock(m_mutex);
0005 if (m_status != Status::Uninitialized) {
0006 throw JException("Attempted to initialize a JEventSource that is already initialized!");
0007 }
0008 for (auto* parameter : m_parameters) {
0009 parameter->Init(*(m_app->GetJParameterManager()), m_prefix);
0010 }
0011 for (auto* service : m_services) {
0012 service->Fetch(m_app);
0013 }
0014 CallWithJExceptionWrapper("JEventSource::Init", [&](){ Init(); });
0015 m_status = Status::Initialized;
0016 LOG_INFO(GetLogger()) << "Initialized JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0017 }
0018
0019 void JEventSource::DoOpen(bool with_lock) {
0020 if (with_lock) {
0021 std::lock_guard<std::mutex> lock(m_mutex);
0022 if (m_status != Status::Initialized) {
0023 throw JException("Attempted to open a JEventSource that hasn't been initialized!");
0024 }
0025 CallWithJExceptionWrapper("JEventSource::Open", [&](){ Open();});
0026 if (GetResourceName().empty()) {
0027 LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "'" << LOG_END;
0028 }
0029 else {
0030 LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0031 }
0032 m_status = Status::Opened;
0033 }
0034 else {
0035 if (m_status != Status::Initialized) {
0036 throw JException("Attempted to open a JEventSource that hasn't been initialized!");
0037 }
0038 CallWithJExceptionWrapper("JEventSource::Open", [&](){ Open();});
0039 if (GetResourceName().empty()) {
0040 LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "'" << LOG_END;
0041 }
0042 else {
0043 LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0044 }
0045 m_status = Status::Opened;
0046 }
0047 }
0048
0049 void JEventSource::DoClose(bool with_lock) {
0050 if (with_lock) {
0051 std::lock_guard<std::mutex> lock(m_mutex);
0052
0053 if (m_status != JEventSource::Status::Opened) return;
0054
0055 CallWithJExceptionWrapper("JEventSource::Close", [&](){ Close();});
0056 if (GetResourceName().empty()) {
0057 LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "'" << LOG_END;
0058 }
0059 else {
0060 LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0061 }
0062 m_status = Status::Closed;
0063 }
0064 else {
0065 if (m_status != JEventSource::Status::Opened) return;
0066
0067 CallWithJExceptionWrapper("JEventSource::Close", [&](){ Close();});
0068 if (GetResourceName().empty()) {
0069 LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "'" << LOG_END;
0070 }
0071 else {
0072 LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0073 }
0074 m_status = Status::Closed;
0075 }
0076 }
0077
0078 JEventSource::Result JEventSource::DoNext(std::shared_ptr<JEvent> event) {
0079
0080 std::lock_guard<std::mutex> lock(m_mutex);
0081
0082 if (m_status == Status::Uninitialized) {
0083 throw JException("JEventSource has not been initialized!");
0084 }
0085 if (m_status == Status::Initialized) {
0086 DoOpen(false);
0087 }
0088 if (m_status == Status::Opened) {
0089
0090
0091 if (m_nskip > 0) {
0092 auto [result, remaining_events] = Skip(*event.get(), m_nskip);
0093 m_events_skipped += (m_nskip - remaining_events);
0094 m_nskip = remaining_events;
0095
0096 LOG_DEBUG(GetLogger()) << "Finished with Skip: " << m_events_skipped << " events skipped, " << m_nskip << " events to skip remain";
0097
0098
0099
0100 if (result != Result::Success) return result;
0101 }
0102
0103
0104 if (m_nevents != 0 && m_events_emitted >= m_nevents) {
0105 LOG_DEBUG(GetLogger()) << "Closing EventSource due to reaching nevent limit";
0106 DoClose(false);
0107 return Result::FailureFinished;
0108 }
0109
0110
0111
0112
0113 event->SetEventNumber(m_events_emitted);
0114 event->SetJEventSource(this);
0115 event->SetSequential(false);
0116 event->GetJCallGraphRecorder()->Reset();
0117
0118
0119 JEventSource::Result result = Result::Success;
0120 try {
0121 auto previous_origin = event->GetJCallGraphRecorder()->SetInsertDataOrigin( JCallGraphRecorder::ORIGIN_FROM_SOURCE);
0122 if (m_callback_style == CallbackStyle::LegacyMode) {
0123 GetEvent(event);
0124 }
0125 else {
0126 result = Emit(*event);
0127 }
0128 event->GetJCallGraphRecorder()->SetInsertDataOrigin( previous_origin );
0129 }
0130 catch (RETURN_STATUS rs) {
0131
0132 if (rs == RETURN_STATUS::kNO_MORE_EVENTS) {
0133 DoClose(false);
0134 result = Result::FailureFinished;
0135 }
0136 else if (rs == RETURN_STATUS::kTRY_AGAIN || rs == RETURN_STATUS::kBUSY) {
0137 result = Result::FailureTryAgain;
0138 }
0139 else if (rs == RETURN_STATUS::kERROR || rs == RETURN_STATUS::kUNKNOWN) {
0140 JException ex ("JEventSource threw RETURN_STATUS::kERROR or kUNKNOWN");
0141 ex.plugin_name = m_plugin_name;
0142 ex.type_name = m_type_name;
0143 ex.function_name = "JEventSource::GetEvent";
0144 ex.instance_name = m_resource_name;
0145 throw ex;
0146 }
0147 }
0148 catch (JException& ex) {
0149 if (ex.function_name.empty()) ex.function_name = "JEventSource::GetEvent";
0150 if (ex.type_name.empty()) ex.type_name = m_type_name;
0151 if (ex.instance_name.empty()) ex.instance_name = m_prefix;
0152 if (ex.plugin_name.empty()) ex.plugin_name = m_plugin_name;
0153 throw ex;
0154 }
0155 catch (std::exception& e){
0156 auto ex = JException(e.what());
0157 ex.exception_type = JTypeInfo::demangle_current_exception_type();
0158 ex.nested_exception = std::current_exception();
0159 ex.function_name = "JEventSource::GetEvent";
0160 ex.type_name = m_type_name;
0161 ex.instance_name = m_prefix;
0162 ex.plugin_name = m_plugin_name;
0163 throw ex;
0164 }
0165 catch (...) {
0166 auto ex = JException("Unknown exception");
0167 ex.exception_type = JTypeInfo::demangle_current_exception_type();
0168 ex.nested_exception = std::current_exception();
0169 ex.function_name = "JEventSource::GetEvent";
0170 ex.type_name = m_type_name;
0171 ex.instance_name = m_prefix;
0172 ex.plugin_name = m_plugin_name;
0173 throw ex;
0174 }
0175
0176 if (result == Result::Success) {
0177 m_events_emitted += 1;
0178
0179
0180 for (auto* output : m_outputs) {
0181 output->InsertCollection(*event);
0182 }
0183 return Result::Success;
0184 }
0185 else if (result == Result::FailureFinished) {
0186
0187
0188 DoClose(false);
0189 return Result::FailureFinished;
0190 }
0191 else if (result == Result::FailureTryAgain) {
0192
0193
0194 return Result::FailureTryAgain;
0195 }
0196 else {
0197 throw JException("Invalid JEventSource::Result value!");
0198 }
0199 }
0200 else {
0201 return Result::FailureFinished;
0202 }
0203 }
0204
0205
0206 void JEventSource::DoFinishEvent(JEvent& event) {
0207
0208 m_events_processed.fetch_add(1);
0209 if (m_enable_finish_event) {
0210 std::lock_guard<std::mutex> lock(m_mutex);
0211 CallWithJExceptionWrapper("JEventSource::FinishEvent", [&](){
0212 FinishEvent(event);
0213 });
0214 }
0215 }
0216
0217 void JEventSource::Summarize(JComponentSummary& summary) const {
0218
0219 auto* result = new JComponentSummary::Component(
0220 "Source", GetPrefix(), GetTypeName(), GetLevel(), GetPluginName());
0221
0222 for (const auto* output : m_outputs) {
0223 size_t suboutput_count = output->collection_names.size();
0224 for (size_t i=0; i<suboutput_count; ++i) {
0225 result->AddOutput(new JComponentSummary::Collection("", output->collection_names[i], output->type_name, GetLevel()));
0226 }
0227 }
0228
0229 summary.Add(result);
0230 }
0231
0232
0233 std::pair<JEventSource::Result, size_t> JEventSource::Skip(JEvent& event, size_t events_to_skip) {
0234
0235
0236 Result result = Result::Success;
0237
0238 while (events_to_skip > 0 && result == Result::Success) {
0239 try {
0240 auto previous_origin = event.GetJCallGraphRecorder()->SetInsertDataOrigin( JCallGraphRecorder::ORIGIN_FROM_SOURCE);
0241 if (m_callback_style == CallbackStyle::LegacyMode) {
0242 GetEvent(event.shared_from_this());
0243 }
0244 else {
0245 result = Emit(event);
0246 }
0247 event.GetJCallGraphRecorder()->SetInsertDataOrigin( previous_origin );
0248
0249
0250 if (m_enable_finish_event) {
0251 CallWithJExceptionWrapper("JEventSource::FinishEvent", [&](){ FinishEvent(event); });
0252 }
0253 event.Clear(false);
0254 events_to_skip -= 1;
0255 }
0256 catch (RETURN_STATUS rs) {
0257
0258 if (rs == RETURN_STATUS::kNO_MORE_EVENTS) {
0259 DoClose(false);
0260 result = Result::FailureFinished;
0261 }
0262 else if (rs == RETURN_STATUS::kTRY_AGAIN || rs == RETURN_STATUS::kBUSY) {
0263 result = Result::FailureTryAgain;
0264 }
0265 else if (rs == RETURN_STATUS::kERROR || rs == RETURN_STATUS::kUNKNOWN) {
0266 JException ex ("JEventSource threw RETURN_STATUS::kERROR or kUNKNOWN");
0267 ex.plugin_name = m_plugin_name;
0268 ex.type_name = m_type_name;
0269 ex.function_name = (m_callback_style == CallbackStyle::LegacyMode) ? "JEventSource::GetEvent" : "JEventSource::Emit";
0270 ex.instance_name = m_resource_name;
0271 throw ex;
0272 }
0273 }
0274 catch (JException& ex) {
0275 if (ex.function_name.empty()) ex.function_name = (m_callback_style == CallbackStyle::LegacyMode) ? "JEventSource::GetEvent" : "JEventSource::Emit";
0276 if (ex.type_name.empty()) ex.type_name = m_type_name;
0277 if (ex.instance_name.empty()) ex.instance_name = m_prefix;
0278 if (ex.plugin_name.empty()) ex.plugin_name = m_plugin_name;
0279 throw ex;
0280 }
0281 catch (std::exception& e){
0282 auto ex = JException(e.what());
0283 ex.exception_type = JTypeInfo::demangle_current_exception_type();
0284 ex.nested_exception = std::current_exception();
0285 ex.function_name = (m_callback_style == CallbackStyle::LegacyMode) ? "JEventSource::GetEvent" : "JEventSource::Emit";
0286 ex.type_name = m_type_name;
0287 ex.instance_name = m_prefix;
0288 ex.plugin_name = m_plugin_name;
0289 throw ex;
0290 }
0291 catch (...) {
0292 auto ex = JException("Unknown exception");
0293 ex.exception_type = JTypeInfo::demangle_current_exception_type();
0294 ex.nested_exception = std::current_exception();
0295 ex.function_name = (m_callback_style == CallbackStyle::LegacyMode) ? "JEventSource::GetEvent" : "JEventSource::Emit";
0296 ex.type_name = m_type_name;
0297 ex.instance_name = m_prefix;
0298 ex.plugin_name = m_plugin_name;
0299 throw ex;
0300 }
0301 }
0302
0303 return {result, events_to_skip};
0304 }
0305
0306
0307
0308