File indexing completed on 2025-01-18 10:17:38
0001 #include <JANA/JEventSource.h>
0002
0003 void JEventSource::DoInit() {
0004 std::lock_guard<std::mutex> lock(m_mutex);
0005 if (m_status != Status::Uninitialized) {
0006 throw JException("Attempted to initialize a JEventSource that is already initialized!");
0007 }
0008 for (auto* parameter : m_parameters) {
0009 parameter->Configure(*(m_app->GetJParameterManager()), m_prefix);
0010 }
0011 for (auto* service : m_services) {
0012 service->Fetch(m_app);
0013 }
0014 CallWithJExceptionWrapper("JEventSource::Init", [&](){ Init(); });
0015 m_status = Status::Initialized;
0016 LOG_INFO(GetLogger()) << "Initialized JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0017 }
0018
0019 void JEventSource::DoInitialize() {
0020 DoOpen();
0021 }
0022
0023 void JEventSource::DoOpen(bool with_lock) {
0024 if (with_lock) {
0025 std::lock_guard<std::mutex> lock(m_mutex);
0026 if (m_status != Status::Initialized) {
0027 throw JException("Attempted to open a JEventSource that hasn't been initialized!");
0028 }
0029 CallWithJExceptionWrapper("JEventSource::Open", [&](){ Open();});
0030 if (GetResourceName().empty()) {
0031 LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "'" << LOG_END;
0032 }
0033 else {
0034 LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0035 }
0036 m_status = Status::Opened;
0037 }
0038 else {
0039 if (m_status != Status::Initialized) {
0040 throw JException("Attempted to open a JEventSource that hasn't been initialized!");
0041 }
0042 CallWithJExceptionWrapper("JEventSource::Open", [&](){ Open();});
0043 if (GetResourceName().empty()) {
0044 LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "'" << LOG_END;
0045 }
0046 else {
0047 LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0048 }
0049 m_status = Status::Opened;
0050 }
0051 }
0052
0053 void JEventSource::DoClose(bool with_lock) {
0054 if (with_lock) {
0055 std::lock_guard<std::mutex> lock(m_mutex);
0056
0057 if (m_status != JEventSource::Status::Opened) return;
0058
0059 CallWithJExceptionWrapper("JEventSource::Close", [&](){ Close();});
0060 if (GetResourceName().empty()) {
0061 LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "'" << LOG_END;
0062 }
0063 else {
0064 LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0065 }
0066 m_status = Status::Closed;
0067 }
0068 else {
0069 if (m_status != JEventSource::Status::Opened) return;
0070
0071 CallWithJExceptionWrapper("JEventSource::Close", [&](){ Close();});
0072 if (GetResourceName().empty()) {
0073 LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "'" << LOG_END;
0074 }
0075 else {
0076 LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0077 }
0078 m_status = Status::Closed;
0079 }
0080 }
0081
0082 JEventSource::Result JEventSource::DoNext(std::shared_ptr<JEvent> event) {
0083
0084 std::lock_guard<std::mutex> lock(m_mutex);
0085
0086 if (m_status == Status::Uninitialized) {
0087 throw JException("JEventSource has not been initialized!");
0088 }
0089
0090 if (m_callback_style == CallbackStyle::LegacyMode) {
0091 return DoNextCompatibility(event);
0092 }
0093
0094 auto first_evt_nr = m_nskip;
0095 auto last_evt_nr = m_nevents + m_nskip;
0096
0097 if (m_status == Status::Initialized) {
0098 DoOpen(false);
0099 }
0100 if (m_status == Status::Opened) {
0101 if (m_nevents != 0 && (m_events_emitted == last_evt_nr)) {
0102
0103 DoClose(false);
0104 return Result::FailureFinished;
0105 }
0106
0107
0108
0109 event->SetEventNumber(m_events_emitted);
0110 event->SetJEventSource(this);
0111 event->SetSequential(false);
0112 event->GetJCallGraphRecorder()->Reset();
0113
0114
0115 auto previous_origin = event->GetJCallGraphRecorder()->SetInsertDataOrigin( JCallGraphRecorder::ORIGIN_FROM_SOURCE);
0116 JEventSource::Result result;
0117 CallWithJExceptionWrapper("JEventSource::Emit", [&](){
0118 result = Emit(*event);
0119 });
0120 event->GetJCallGraphRecorder()->SetInsertDataOrigin( previous_origin );
0121
0122 if (result == Result::Success) {
0123 m_events_emitted += 1;
0124
0125
0126 for (auto* output : m_outputs) {
0127 output->InsertCollection(*event);
0128 }
0129 if (m_events_emitted <= first_evt_nr) {
0130
0131
0132 return Result::FailureTryAgain;
0133 }
0134 return Result::Success;
0135 }
0136 else if (result == Result::FailureFinished) {
0137
0138
0139 DoClose(false);
0140 return Result::FailureFinished;
0141 }
0142 else if (result == Result::FailureTryAgain) {
0143
0144
0145 return Result::FailureTryAgain;
0146 }
0147 else {
0148 throw JException("Invalid JEventSource::Result value!");
0149 }
0150 }
0151 else {
0152 return Result::FailureFinished;
0153 }
0154 }
0155
0156 JEventSource::Result JEventSource::DoNextCompatibility(std::shared_ptr<JEvent> event) {
0157
0158 auto first_evt_nr = m_nskip;
0159 auto last_evt_nr = m_nevents + m_nskip;
0160
0161 try {
0162 if (m_status == Status::Initialized) {
0163 DoOpen(false);
0164 }
0165 if (m_status == Status::Opened) {
0166 if (m_events_emitted < first_evt_nr) {
0167
0168 event->SetEventNumber(m_events_emitted);
0169 auto previous_origin = event->GetJCallGraphRecorder()->SetInsertDataOrigin( JCallGraphRecorder::ORIGIN_FROM_SOURCE);
0170 GetEvent(event);
0171 event->GetJCallGraphRecorder()->SetInsertDataOrigin( previous_origin );
0172 m_events_emitted += 1;
0173 return Result::FailureTryAgain;
0174 } else if (m_nevents != 0 && (m_events_emitted == last_evt_nr)) {
0175
0176 DoClose(false);
0177 return Result::FailureFinished;
0178 } else {
0179
0180
0181 event->SetEventNumber(m_events_emitted);
0182 event->SetJApplication(m_app);
0183 event->SetJEventSource(this);
0184 event->SetSequential(false);
0185 event->GetJCallGraphRecorder()->Reset();
0186 auto previous_origin = event->GetJCallGraphRecorder()->SetInsertDataOrigin( JCallGraphRecorder::ORIGIN_FROM_SOURCE);
0187 GetEvent(event);
0188 for (auto* output : m_outputs) {
0189 output->InsertCollection(*event);
0190 }
0191 event->GetJCallGraphRecorder()->SetInsertDataOrigin( previous_origin );
0192 m_events_emitted += 1;
0193 return Result::Success;
0194 }
0195 } else if (m_status == Status::Closed) {
0196 return Result::FailureFinished;
0197 } else {
0198 throw JException("Invalid m_status");
0199 }
0200 }
0201 catch (RETURN_STATUS rs) {
0202
0203 if (rs == RETURN_STATUS::kNO_MORE_EVENTS) {
0204 DoClose(false);
0205 return Result::FailureFinished;
0206 }
0207 else if (rs == RETURN_STATUS::kTRY_AGAIN || rs == RETURN_STATUS::kBUSY) {
0208 return Result::FailureTryAgain;
0209 }
0210 else if (rs == RETURN_STATUS::kERROR || rs == RETURN_STATUS::kUNKNOWN) {
0211 JException ex ("JEventSource threw RETURN_STATUS::kERROR or kUNKNOWN");
0212 ex.plugin_name = m_plugin_name;
0213 ex.type_name = m_type_name;
0214 ex.function_name = "JEventSource::GetEvent";
0215 ex.instance_name = m_resource_name;
0216 throw ex;
0217 }
0218 else {
0219 return Result::Success;
0220 }
0221 }
0222 catch (JException& ex) {
0223 if (ex.function_name.empty()) ex.function_name = "JEventSource::GetEvent";
0224 if (ex.type_name.empty()) ex.type_name = m_type_name;
0225 if (ex.instance_name.empty()) ex.instance_name = m_prefix;
0226 if (ex.plugin_name.empty()) ex.plugin_name = m_plugin_name;
0227 throw ex;
0228 }
0229 catch (std::exception& e){
0230 auto ex = JException(e.what());
0231 ex.exception_type = JTypeInfo::demangle_current_exception_type();
0232 ex.nested_exception = std::current_exception();
0233 ex.function_name = "JEventSource::GetEvent";
0234 ex.type_name = m_type_name;
0235 ex.instance_name = m_prefix;
0236 ex.plugin_name = m_plugin_name;
0237 throw ex;
0238 }
0239 catch (...) {
0240 auto ex = JException("Unknown exception");
0241 ex.exception_type = JTypeInfo::demangle_current_exception_type();
0242 ex.nested_exception = std::current_exception();
0243 ex.function_name = "JEventSource::GetEvent";
0244 ex.type_name = m_type_name;
0245 ex.instance_name = m_prefix;
0246 ex.plugin_name = m_plugin_name;
0247 throw ex;
0248 }
0249 }
0250
0251
0252 void JEventSource::DoFinishEvent(JEvent& event) {
0253
0254 m_events_finished.fetch_add(1);
0255 if (m_enable_finish_event) {
0256 std::lock_guard<std::mutex> lock(m_mutex);
0257 CallWithJExceptionWrapper("JEventSource::FinishEvent", [&](){
0258 FinishEvent(event);
0259 });
0260 }
0261 }
0262
0263 void JEventSource::Summarize(JComponentSummary& summary) const {
0264
0265 auto* result = new JComponentSummary::Component(
0266 "Source", GetPrefix(), GetTypeName(), GetLevel(), GetPluginName());
0267
0268 for (const auto* output : m_outputs) {
0269 size_t suboutput_count = output->collection_names.size();
0270 for (size_t i=0; i<suboutput_count; ++i) {
0271 result->AddOutput(new JComponentSummary::Collection("", output->collection_names[i], output->type_name, GetLevel()));
0272 }
0273 }
0274
0275 summary.Add(result);
0276 }