File indexing completed on 2025-09-18 09:41:06
0001 #include <JANA/JEventSource.h>
0002
0003 void JEventSource::DoInit() {
0004 std::lock_guard<std::mutex> lock(m_mutex);
0005 if (m_status != Status::Uninitialized) {
0006 throw JException("Attempted to initialize a JEventSource that is already initialized!");
0007 }
0008 for (auto* parameter : m_parameters) {
0009 parameter->Init(*(m_app->GetJParameterManager()), m_prefix);
0010 }
0011 for (auto* service : m_services) {
0012 service->Fetch(m_app);
0013 }
0014 CallWithJExceptionWrapper("JEventSource::Init", [&](){ Init(); });
0015 m_status = Status::Initialized;
0016 LOG_INFO(GetLogger()) << "Initialized JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0017 }
0018
0019 void JEventSource::DoOpen(bool with_lock) {
0020 if (with_lock) {
0021 std::lock_guard<std::mutex> lock(m_mutex);
0022 if (m_status != Status::Initialized) {
0023 throw JException("Attempted to open a JEventSource that hasn't been initialized!");
0024 }
0025 CallWithJExceptionWrapper("JEventSource::Open", [&](){ Open();});
0026 if (GetResourceName().empty()) {
0027 LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "'" << LOG_END;
0028 }
0029 else {
0030 LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0031 }
0032 m_status = Status::Opened;
0033 }
0034 else {
0035 if (m_status != Status::Initialized) {
0036 throw JException("Attempted to open a JEventSource that hasn't been initialized!");
0037 }
0038 CallWithJExceptionWrapper("JEventSource::Open", [&](){ Open();});
0039 if (GetResourceName().empty()) {
0040 LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "'" << LOG_END;
0041 }
0042 else {
0043 LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0044 }
0045 m_status = Status::Opened;
0046 }
0047 }
0048
0049 void JEventSource::DoClose(bool with_lock) {
0050 if (with_lock) {
0051 std::lock_guard<std::mutex> lock(m_mutex);
0052
0053 if (m_status != JEventSource::Status::Opened) return;
0054
0055 CallWithJExceptionWrapper("JEventSource::Close", [&](){ Close();});
0056 if (GetResourceName().empty()) {
0057 LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "'" << LOG_END;
0058 }
0059 else {
0060 LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0061 }
0062 m_status = Status::Closed;
0063 }
0064 else {
0065 if (m_status != JEventSource::Status::Opened) return;
0066
0067 CallWithJExceptionWrapper("JEventSource::Close", [&](){ Close();});
0068 if (GetResourceName().empty()) {
0069 LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "'" << LOG_END;
0070 }
0071 else {
0072 LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0073 }
0074 m_status = Status::Closed;
0075 }
0076 }
0077
0078 JEventSource::Result JEventSource::DoNext(std::shared_ptr<JEvent> event) {
0079
0080 std::lock_guard<std::mutex> lock(m_mutex);
0081
0082 if (m_status == Status::Uninitialized) {
0083 throw JException("JEventSource has not been initialized!");
0084 }
0085 if (m_status == Status::Initialized) {
0086 DoOpen(false);
0087 }
0088 if (m_status == Status::Opened) {
0089
0090
0091 if (m_nskip > 0) {
0092 auto [result, remaining_events] = Skip(*event.get(), m_nskip);
0093 m_events_skipped += (m_nskip - remaining_events);
0094 m_nskip = remaining_events;
0095
0096 LOG_DEBUG(GetLogger()) << "Finished with Skip: " << m_events_skipped << " events skipped, " << m_nskip << " events to skip remain";
0097
0098
0099
0100 if (result != Result::Success) return result;
0101 }
0102
0103
0104 if (m_nevents != 0 && m_events_emitted >= m_nevents) {
0105 LOG_DEBUG(GetLogger()) << "Closing EventSource due to reaching nevent limit";
0106 DoClose(false);
0107 return Result::FailureFinished;
0108 }
0109
0110
0111
0112
0113 event->SetEventNumber(m_events_emitted);
0114 event->SetJEventSource(this);
0115 event->SetSequential(false);
0116 event->GetJCallGraphRecorder()->Reset();
0117
0118
0119 JEventSource::Result result = Result::Success;
0120 try {
0121 auto previous_origin = event->GetJCallGraphRecorder()->SetInsertDataOrigin( JCallGraphRecorder::ORIGIN_FROM_SOURCE);
0122 if (m_callback_style == CallbackStyle::LegacyMode) {
0123 GetEvent(event);
0124 }
0125 else {
0126 result = Emit(*event);
0127 }
0128 event->GetJCallGraphRecorder()->SetInsertDataOrigin( previous_origin );
0129 }
0130 catch (RETURN_STATUS rs) {
0131
0132 if (rs == RETURN_STATUS::kNO_MORE_EVENTS) {
0133 DoClose(false);
0134 result = Result::FailureFinished;
0135 }
0136 else if (rs == RETURN_STATUS::kTRY_AGAIN || rs == RETURN_STATUS::kBUSY) {
0137 result = Result::FailureTryAgain;
0138 }
0139 else if (rs == RETURN_STATUS::kERROR || rs == RETURN_STATUS::kUNKNOWN) {
0140 JException ex ("JEventSource threw RETURN_STATUS::kERROR or kUNKNOWN");
0141 ex.plugin_name = m_plugin_name;
0142 ex.type_name = m_type_name;
0143 ex.function_name = "JEventSource::GetEvent";
0144 ex.instance_name = m_resource_name;
0145 throw ex;
0146 }
0147 }
0148 catch (JException& ex) {
0149 if (ex.function_name.empty()) ex.function_name = "JEventSource::GetEvent";
0150 if (ex.type_name.empty()) ex.type_name = m_type_name;
0151 if (ex.instance_name.empty()) ex.instance_name = m_prefix;
0152 if (ex.plugin_name.empty()) ex.plugin_name = m_plugin_name;
0153 throw ex;
0154 }
0155 catch (std::exception& e){
0156 auto ex = JException(e.what());
0157 ex.exception_type = JTypeInfo::demangle_current_exception_type();
0158 ex.nested_exception = std::current_exception();
0159 ex.function_name = "JEventSource::GetEvent";
0160 ex.type_name = m_type_name;
0161 ex.instance_name = m_prefix;
0162 ex.plugin_name = m_plugin_name;
0163 throw ex;
0164 }
0165 catch (...) {
0166 auto ex = JException("Unknown exception");
0167 ex.exception_type = JTypeInfo::demangle_current_exception_type();
0168 ex.nested_exception = std::current_exception();
0169 ex.function_name = "JEventSource::GetEvent";
0170 ex.type_name = m_type_name;
0171 ex.instance_name = m_prefix;
0172 ex.plugin_name = m_plugin_name;
0173 throw ex;
0174 }
0175
0176 if (result == Result::Success) {
0177 m_events_emitted += 1;
0178
0179
0180 for (auto* output : GetOutputs()) {
0181 output->EulerianStore(*event->GetFactorySet());
0182 }
0183 for (auto* output : GetVariadicOutputs()) {
0184 output->EulerianStore(*event->GetFactorySet());
0185 }
0186 return Result::Success;
0187 }
0188 else if (result == Result::FailureFinished) {
0189
0190
0191 DoClose(false);
0192 return Result::FailureFinished;
0193 }
0194 else if (result == Result::FailureTryAgain) {
0195
0196
0197 return Result::FailureTryAgain;
0198 }
0199 else {
0200 throw JException("Invalid JEventSource::Result value!");
0201 }
0202 }
0203 else {
0204 return Result::FailureFinished;
0205 }
0206 }
0207
0208
0209 void JEventSource::DoFinishEvent(JEvent& event) {
0210
0211 m_events_processed.fetch_add(1);
0212 if (m_enable_finish_event) {
0213 std::lock_guard<std::mutex> lock(m_mutex);
0214 CallWithJExceptionWrapper("JEventSource::FinishEvent", [&](){
0215 FinishEvent(event);
0216 });
0217 }
0218 }
0219
0220 void JEventSource::Summarize(JComponentSummary& summary) const {
0221
0222 auto* result = new JComponentSummary::Component(
0223 "Source", GetPrefix(), GetTypeName(), GetLevel(), GetPluginName());
0224
0225 SummarizeOutputs(*result);
0226 summary.Add(result);
0227 }
0228
0229
0230 std::pair<JEventSource::Result, size_t> JEventSource::Skip(JEvent& event, size_t events_to_skip) {
0231
0232
0233 Result result = Result::Success;
0234
0235 while (events_to_skip > 0 && result == Result::Success) {
0236 try {
0237 auto previous_origin = event.GetJCallGraphRecorder()->SetInsertDataOrigin( JCallGraphRecorder::ORIGIN_FROM_SOURCE);
0238 if (m_callback_style == CallbackStyle::LegacyMode) {
0239 GetEvent(event.shared_from_this());
0240 }
0241 else {
0242 result = Emit(event);
0243 }
0244 event.GetJCallGraphRecorder()->SetInsertDataOrigin( previous_origin );
0245
0246
0247 if (m_enable_finish_event) {
0248 CallWithJExceptionWrapper("JEventSource::FinishEvent", [&](){ FinishEvent(event); });
0249 }
0250 event.Clear(false);
0251 events_to_skip -= 1;
0252 }
0253 catch (RETURN_STATUS rs) {
0254
0255 if (rs == RETURN_STATUS::kNO_MORE_EVENTS) {
0256 DoClose(false);
0257 result = Result::FailureFinished;
0258 }
0259 else if (rs == RETURN_STATUS::kTRY_AGAIN || rs == RETURN_STATUS::kBUSY) {
0260 result = Result::FailureTryAgain;
0261 }
0262 else if (rs == RETURN_STATUS::kERROR || rs == RETURN_STATUS::kUNKNOWN) {
0263 JException ex ("JEventSource threw RETURN_STATUS::kERROR or kUNKNOWN");
0264 ex.plugin_name = m_plugin_name;
0265 ex.type_name = m_type_name;
0266 ex.function_name = (m_callback_style == CallbackStyle::LegacyMode) ? "JEventSource::GetEvent" : "JEventSource::Emit";
0267 ex.instance_name = m_resource_name;
0268 throw ex;
0269 }
0270 }
0271 catch (JException& ex) {
0272 if (ex.function_name.empty()) ex.function_name = (m_callback_style == CallbackStyle::LegacyMode) ? "JEventSource::GetEvent" : "JEventSource::Emit";
0273 if (ex.type_name.empty()) ex.type_name = m_type_name;
0274 if (ex.instance_name.empty()) ex.instance_name = m_prefix;
0275 if (ex.plugin_name.empty()) ex.plugin_name = m_plugin_name;
0276 throw ex;
0277 }
0278 catch (std::exception& e){
0279 auto ex = JException(e.what());
0280 ex.exception_type = JTypeInfo::demangle_current_exception_type();
0281 ex.nested_exception = std::current_exception();
0282 ex.function_name = (m_callback_style == CallbackStyle::LegacyMode) ? "JEventSource::GetEvent" : "JEventSource::Emit";
0283 ex.type_name = m_type_name;
0284 ex.instance_name = m_prefix;
0285 ex.plugin_name = m_plugin_name;
0286 throw ex;
0287 }
0288 catch (...) {
0289 auto ex = JException("Unknown exception");
0290 ex.exception_type = JTypeInfo::demangle_current_exception_type();
0291 ex.nested_exception = std::current_exception();
0292 ex.function_name = (m_callback_style == CallbackStyle::LegacyMode) ? "JEventSource::GetEvent" : "JEventSource::Emit";
0293 ex.type_name = m_type_name;
0294 ex.instance_name = m_prefix;
0295 ex.plugin_name = m_plugin_name;
0296 throw ex;
0297 }
0298 }
0299
0300 return {result, events_to_skip};
0301 }
0302
0303
0304
0305