File indexing completed on 2025-12-05 10:24:26
0001 #include <JANA/JEventSource.h>
0002
0003 void JEventSource::DoOpen(bool with_lock) {
0004 if (with_lock) {
0005 std::lock_guard<std::mutex> lock(m_mutex);
0006 if (!m_is_initialized) {
0007 throw JException("Attempted to open a JEventSource that hasn't been initialized!");
0008 }
0009 CallWithJExceptionWrapper("JEventSource::Open", [&](){ Open();});
0010 if (GetResourceName().empty()) {
0011 LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "'" << LOG_END;
0012 }
0013 else {
0014 LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0015 }
0016 m_status = Status::Opened;
0017 }
0018 else {
0019 if (!m_is_initialized) {
0020 throw JException("Attempted to open a JEventSource that hasn't been initialized!");
0021 }
0022 CallWithJExceptionWrapper("JEventSource::Open", [&](){ Open();});
0023 if (GetResourceName().empty()) {
0024 LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "'" << LOG_END;
0025 }
0026 else {
0027 LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0028 }
0029 m_status = Status::Opened;
0030 }
0031 }
0032
0033 void JEventSource::DoClose(bool with_lock) {
0034 if (with_lock) {
0035 std::lock_guard<std::mutex> lock(m_mutex);
0036
0037 if (m_status != JEventSource::Status::Opened) return;
0038
0039 CallWithJExceptionWrapper("JEventSource::Close", [&](){ Close();});
0040 if (GetResourceName().empty()) {
0041 LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "'" << LOG_END;
0042 }
0043 else {
0044 LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0045 }
0046 m_status = Status::Closed;
0047 }
0048 else {
0049 if (m_status != JEventSource::Status::Opened) return;
0050
0051 CallWithJExceptionWrapper("JEventSource::Close", [&](){ Close();});
0052 if (GetResourceName().empty()) {
0053 LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "'" << LOG_END;
0054 }
0055 else {
0056 LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0057 }
0058 m_status = Status::Closed;
0059 }
0060 }
0061
0062 JEventSource::Result JEventSource::DoNext(std::shared_ptr<JEvent> event) {
0063
0064 std::lock_guard<std::mutex> lock(m_mutex);
0065
0066 if (!m_is_initialized) {
0067 throw JException("JEventSource has not been initialized!");
0068 }
0069 if (m_status == Status::Unopened) {
0070 DoOpen(false);
0071 }
0072 if (m_status == Status::Opened) {
0073
0074
0075 if (m_nskip > 0) {
0076 auto [result, remaining_events] = Skip(*event.get(), m_nskip);
0077 m_events_skipped += (m_nskip - remaining_events);
0078 m_nskip = remaining_events;
0079
0080 LOG_DEBUG(GetLogger()) << "Finished with Skip: " << m_events_skipped << " events skipped, " << m_nskip << " events to skip remain";
0081
0082
0083
0084 if (result != Result::Success) return result;
0085 }
0086
0087
0088 if (m_nevents != 0 && m_events_emitted >= m_nevents) {
0089 LOG_DEBUG(GetLogger()) << "Closing EventSource due to reaching nevent limit";
0090 DoClose(false);
0091 return Result::FailureFinished;
0092 }
0093
0094
0095
0096
0097 event->SetEventNumber(m_events_emitted);
0098 event->SetJEventSource(this);
0099 event->SetSequential(false);
0100 event->GetJCallGraphRecorder()->Reset();
0101
0102
0103 JEventSource::Result result = Result::Success;
0104 try {
0105 auto previous_origin = event->GetJCallGraphRecorder()->SetInsertDataOrigin( JCallGraphRecorder::ORIGIN_FROM_SOURCE);
0106 if (m_callback_style == CallbackStyle::LegacyMode) {
0107 GetEvent(event);
0108 }
0109 else {
0110 result = Emit(*event);
0111 }
0112 event->GetJCallGraphRecorder()->SetInsertDataOrigin( previous_origin );
0113 }
0114 catch (RETURN_STATUS rs) {
0115
0116 if (rs == RETURN_STATUS::kNO_MORE_EVENTS) {
0117 DoClose(false);
0118 result = Result::FailureFinished;
0119 }
0120 else if (rs == RETURN_STATUS::kTRY_AGAIN || rs == RETURN_STATUS::kBUSY) {
0121 result = Result::FailureTryAgain;
0122 }
0123 else if (rs == RETURN_STATUS::kERROR || rs == RETURN_STATUS::kUNKNOWN) {
0124 JException ex ("JEventSource threw RETURN_STATUS::kERROR or kUNKNOWN");
0125 ex.plugin_name = m_plugin_name;
0126 ex.type_name = m_type_name;
0127 ex.function_name = "JEventSource::GetEvent";
0128 ex.instance_name = m_resource_name;
0129 throw ex;
0130 }
0131 }
0132 catch (JException& ex) {
0133 if (ex.function_name.empty()) ex.function_name = "JEventSource::GetEvent";
0134 if (ex.type_name.empty()) ex.type_name = m_type_name;
0135 if (ex.instance_name.empty()) ex.instance_name = m_prefix;
0136 if (ex.plugin_name.empty()) ex.plugin_name = m_plugin_name;
0137 throw ex;
0138 }
0139 catch (std::exception& e){
0140 auto ex = JException(e.what());
0141 ex.exception_type = JTypeInfo::demangle_current_exception_type();
0142 ex.nested_exception = std::current_exception();
0143 ex.function_name = "JEventSource::GetEvent";
0144 ex.type_name = m_type_name;
0145 ex.instance_name = m_prefix;
0146 ex.plugin_name = m_plugin_name;
0147 throw ex;
0148 }
0149 catch (...) {
0150 auto ex = JException("Unknown exception");
0151 ex.exception_type = JTypeInfo::demangle_current_exception_type();
0152 ex.nested_exception = std::current_exception();
0153 ex.function_name = "JEventSource::GetEvent";
0154 ex.type_name = m_type_name;
0155 ex.instance_name = m_prefix;
0156 ex.plugin_name = m_plugin_name;
0157 throw ex;
0158 }
0159
0160 if (result == Result::Success) {
0161 m_events_emitted += 1;
0162
0163
0164 for (auto* output : GetOutputs()) {
0165 output->EulerianStore(*event->GetFactorySet());
0166 }
0167 for (auto* output : GetVariadicOutputs()) {
0168 output->EulerianStore(*event->GetFactorySet());
0169 }
0170 return Result::Success;
0171 }
0172 else if (result == Result::FailureFinished) {
0173
0174
0175 DoClose(false);
0176 return Result::FailureFinished;
0177 }
0178 else if (result == Result::FailureTryAgain) {
0179
0180
0181 return Result::FailureTryAgain;
0182 }
0183 else {
0184 throw JException("Invalid JEventSource::Result value!");
0185 }
0186 }
0187 else {
0188 return Result::FailureFinished;
0189 }
0190 }
0191
0192
0193 void JEventSource::DoFinishEvent(JEvent& event) {
0194
0195 m_events_processed.fetch_add(1);
0196 if (m_enable_finish_event) {
0197 std::lock_guard<std::mutex> lock(m_mutex);
0198 CallWithJExceptionWrapper("JEventSource::FinishEvent", [&](){
0199 FinishEvent(event);
0200 });
0201 }
0202 }
0203
0204 void JEventSource::Summarize(JComponentSummary& summary) const {
0205
0206 auto* result = new JComponentSummary::Component(
0207 "Source", GetPrefix(), GetTypeName(), GetLevel(), GetPluginName());
0208
0209 SummarizeOutputs(*result);
0210 summary.Add(result);
0211 }
0212
0213
0214 std::pair<JEventSource::Result, size_t> JEventSource::Skip(JEvent& event, size_t events_to_skip) {
0215
0216
0217 Result result = Result::Success;
0218
0219 while (events_to_skip > 0 && result == Result::Success) {
0220 try {
0221 auto previous_origin = event.GetJCallGraphRecorder()->SetInsertDataOrigin( JCallGraphRecorder::ORIGIN_FROM_SOURCE);
0222 if (m_callback_style == CallbackStyle::LegacyMode) {
0223 GetEvent(event.shared_from_this());
0224 }
0225 else {
0226 result = Emit(event);
0227 }
0228 event.GetJCallGraphRecorder()->SetInsertDataOrigin( previous_origin );
0229
0230
0231 if (m_enable_finish_event) {
0232 CallWithJExceptionWrapper("JEventSource::FinishEvent", [&](){ FinishEvent(event); });
0233 }
0234 event.Clear(false);
0235 events_to_skip -= 1;
0236 }
0237 catch (RETURN_STATUS rs) {
0238
0239 if (rs == RETURN_STATUS::kNO_MORE_EVENTS) {
0240 DoClose(false);
0241 result = Result::FailureFinished;
0242 }
0243 else if (rs == RETURN_STATUS::kTRY_AGAIN || rs == RETURN_STATUS::kBUSY) {
0244 result = Result::FailureTryAgain;
0245 }
0246 else if (rs == RETURN_STATUS::kERROR || rs == RETURN_STATUS::kUNKNOWN) {
0247 JException ex ("JEventSource threw RETURN_STATUS::kERROR or kUNKNOWN");
0248 ex.plugin_name = m_plugin_name;
0249 ex.type_name = m_type_name;
0250 ex.function_name = (m_callback_style == CallbackStyle::LegacyMode) ? "JEventSource::GetEvent" : "JEventSource::Emit";
0251 ex.instance_name = m_resource_name;
0252 throw ex;
0253 }
0254 }
0255 catch (JException& ex) {
0256 if (ex.function_name.empty()) ex.function_name = (m_callback_style == CallbackStyle::LegacyMode) ? "JEventSource::GetEvent" : "JEventSource::Emit";
0257 if (ex.type_name.empty()) ex.type_name = m_type_name;
0258 if (ex.instance_name.empty()) ex.instance_name = m_prefix;
0259 if (ex.plugin_name.empty()) ex.plugin_name = m_plugin_name;
0260 throw ex;
0261 }
0262 catch (std::exception& e){
0263 auto ex = JException(e.what());
0264 ex.exception_type = JTypeInfo::demangle_current_exception_type();
0265 ex.nested_exception = std::current_exception();
0266 ex.function_name = (m_callback_style == CallbackStyle::LegacyMode) ? "JEventSource::GetEvent" : "JEventSource::Emit";
0267 ex.type_name = m_type_name;
0268 ex.instance_name = m_prefix;
0269 ex.plugin_name = m_plugin_name;
0270 throw ex;
0271 }
0272 catch (...) {
0273 auto ex = JException("Unknown exception");
0274 ex.exception_type = JTypeInfo::demangle_current_exception_type();
0275 ex.nested_exception = std::current_exception();
0276 ex.function_name = (m_callback_style == CallbackStyle::LegacyMode) ? "JEventSource::GetEvent" : "JEventSource::Emit";
0277 ex.type_name = m_type_name;
0278 ex.instance_name = m_prefix;
0279 ex.plugin_name = m_plugin_name;
0280 throw ex;
0281 }
0282 }
0283
0284 return {result, events_to_skip};
0285 }
0286
0287
0288
0289