Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-07-01 08:58:14

0001 #include <JANA/JEventSource.h>
0002 
0003 void JEventSource::DoInit() {
0004     std::lock_guard<std::mutex> lock(m_mutex);
0005     if (m_status != Status::Uninitialized) {
0006         throw JException("Attempted to initialize a JEventSource that is already initialized!");
0007     }
0008     for (auto* parameter : m_parameters) {
0009         parameter->Init(*(m_app->GetJParameterManager()), m_prefix);
0010     }
0011     for (auto* service : m_services) {
0012         service->Fetch(m_app);
0013     }
0014     CallWithJExceptionWrapper("JEventSource::Init", [&](){ Init(); });
0015     m_status = Status::Initialized;
0016     LOG_INFO(GetLogger()) << "Initialized JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0017 }
0018 
0019 void JEventSource::DoOpen(bool with_lock) {
0020     if (with_lock) {
0021         std::lock_guard<std::mutex> lock(m_mutex);
0022         if (m_status != Status::Initialized) {
0023             throw JException("Attempted to open a JEventSource that hasn't been initialized!");
0024         }
0025         CallWithJExceptionWrapper("JEventSource::Open", [&](){ Open();});
0026         if (GetResourceName().empty()) {
0027             LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "'" << LOG_END;
0028         }
0029         else {
0030             LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0031         }
0032         m_status = Status::Opened;
0033     }
0034     else {
0035         if (m_status != Status::Initialized) {
0036             throw JException("Attempted to open a JEventSource that hasn't been initialized!");
0037         }
0038         CallWithJExceptionWrapper("JEventSource::Open", [&](){ Open();});
0039         if (GetResourceName().empty()) {
0040             LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "'" << LOG_END;
0041         }
0042         else {
0043             LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0044         }
0045         m_status = Status::Opened;
0046     }
0047 }
0048 
0049 void JEventSource::DoClose(bool with_lock) {
0050     if (with_lock) {
0051         std::lock_guard<std::mutex> lock(m_mutex);
0052 
0053         if (m_status != JEventSource::Status::Opened) return;
0054 
0055         CallWithJExceptionWrapper("JEventSource::Close", [&](){ Close();});
0056         if (GetResourceName().empty()) {
0057             LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "'" << LOG_END;
0058         }
0059         else {
0060             LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0061         }
0062         m_status = Status::Closed;
0063     }
0064     else {
0065         if (m_status != JEventSource::Status::Opened) return;
0066 
0067         CallWithJExceptionWrapper("JEventSource::Close", [&](){ Close();});
0068         if (GetResourceName().empty()) {
0069             LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "'" << LOG_END;
0070         }
0071         else {
0072             LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0073         }
0074         m_status = Status::Closed;
0075     }
0076 }
0077 
0078 JEventSource::Result JEventSource::DoNext(std::shared_ptr<JEvent> event) {
0079 
0080     std::lock_guard<std::mutex> lock(m_mutex); // In general, DoNext must be synchronized.
0081 
0082     if (m_status == Status::Uninitialized) {
0083         throw JException("JEventSource has not been initialized!");
0084     }
0085     if (m_status == Status::Initialized) {
0086         DoOpen(false);
0087     }
0088     if (m_status == Status::Opened) {
0089 
0090         // First we check whether there are events to skip. If so, we skip as many as possible
0091         if (m_nskip > 0) {
0092             auto [result, remaining_events] = Skip(*event.get(), m_nskip);
0093             m_events_skipped += (m_nskip - remaining_events);
0094             m_nskip = remaining_events;
0095 
0096             LOG_DEBUG(GetLogger()) << "Finished with Skip: " << m_events_skipped << " events skipped, " << m_nskip << " events to skip remain";
0097 
0098             // If we encountered a problem, exit and let the arrow figure out when and whether to resume.
0099             // Note that Skip() will call Close() on our behalf.
0100             if (result != Result::Success) return result;
0101         }
0102 
0103         // Next we check whether we are limited by jana:nevents
0104         if (m_nevents != 0 && m_events_emitted >= m_nevents) {
0105             LOG_DEBUG(GetLogger()) << "Closing EventSource due to reaching nevent limit";
0106             DoClose(false);
0107             return Result::FailureFinished;
0108         }
0109 
0110         // By this point we know that we are done skipping events and are ready to emit them
0111 
0112         // We configure the event
0113         event->SetEventNumber(m_events_emitted); // Default event number to event count
0114         event->SetJEventSource(this);
0115         event->SetSequential(false);
0116         event->GetJCallGraphRecorder()->Reset();
0117 
0118 
0119         JEventSource::Result result = Result::Success;
0120         try {
0121             auto previous_origin = event->GetJCallGraphRecorder()->SetInsertDataOrigin( JCallGraphRecorder::ORIGIN_FROM_SOURCE);  // (see note at top of JCallGraphRecorder.h)
0122             if (m_callback_style == CallbackStyle::LegacyMode) {
0123                 GetEvent(event);
0124             }
0125             else {
0126                 result = Emit(*event);
0127             }
0128             event->GetJCallGraphRecorder()->SetInsertDataOrigin( previous_origin );
0129         }
0130         catch (RETURN_STATUS rs) {
0131 
0132             if (rs == RETURN_STATUS::kNO_MORE_EVENTS) {
0133                 DoClose(false);
0134                 result = Result::FailureFinished;
0135             }
0136             else if (rs == RETURN_STATUS::kTRY_AGAIN || rs == RETURN_STATUS::kBUSY) {
0137                 result = Result::FailureTryAgain;
0138             }
0139             else if (rs == RETURN_STATUS::kERROR || rs == RETURN_STATUS::kUNKNOWN) {
0140                 JException ex ("JEventSource threw RETURN_STATUS::kERROR or kUNKNOWN");
0141                 ex.plugin_name = m_plugin_name;
0142                 ex.type_name = m_type_name;
0143                 ex.function_name = "JEventSource::GetEvent";
0144                 ex.instance_name = m_resource_name;
0145                 throw ex;
0146             }
0147         }
0148         catch (JException& ex) {
0149             if (ex.function_name.empty()) ex.function_name = "JEventSource::GetEvent";
0150             if (ex.type_name.empty()) ex.type_name = m_type_name;
0151             if (ex.instance_name.empty()) ex.instance_name = m_prefix;
0152             if (ex.plugin_name.empty()) ex.plugin_name = m_plugin_name;
0153             throw ex;
0154         }
0155         catch (std::exception& e){
0156             auto ex = JException(e.what());
0157             ex.exception_type = JTypeInfo::demangle_current_exception_type();
0158             ex.nested_exception = std::current_exception();
0159             ex.function_name = "JEventSource::GetEvent";
0160             ex.type_name = m_type_name;
0161             ex.instance_name = m_prefix;
0162             ex.plugin_name = m_plugin_name;
0163             throw ex;
0164         }
0165         catch (...) {
0166             auto ex = JException("Unknown exception");
0167             ex.exception_type = JTypeInfo::demangle_current_exception_type();
0168             ex.nested_exception = std::current_exception();
0169             ex.function_name = "JEventSource::GetEvent";
0170             ex.type_name = m_type_name;
0171             ex.instance_name = m_prefix;
0172             ex.plugin_name = m_plugin_name;
0173             throw ex;
0174         }
0175 
0176         if (result == Result::Success) {
0177             m_events_emitted += 1;
0178             // We end up here if we read an entry in our file or retrieved a message from our socket,
0179             // and believe we could obtain another one immediately if we wanted to
0180             for (auto* output : m_outputs) {
0181                 output->InsertCollection(*event);
0182             }
0183             return Result::Success;
0184         }
0185         else if (result == Result::FailureFinished) {
0186             // We end up here if we tried to read an entry in a file, but found EOF
0187             // or if we received a message from a socket that contained no data and indicated no more data will be coming
0188             DoClose(false);
0189             return Result::FailureFinished;
0190         }
0191         else if (result == Result::FailureTryAgain) {
0192             // We end up here if we tried to read an entry in a file but it is on a tape drive and isn't ready yet
0193             // or if we polled the socket, found no new messages, but still expect messages later
0194             return Result::FailureTryAgain;
0195         }
0196         else {
0197             throw JException("Invalid JEventSource::Result value!");
0198         }
0199     }
0200     else { // status == Closed
0201         return Result::FailureFinished;
0202     }
0203 }
0204 
0205 
0206 void JEventSource::DoFinishEvent(JEvent& event) {
0207 
0208     m_events_processed.fetch_add(1);
0209     if (m_enable_finish_event) {
0210         std::lock_guard<std::mutex> lock(m_mutex);
0211         CallWithJExceptionWrapper("JEventSource::FinishEvent", [&](){
0212             FinishEvent(event);
0213         });
0214     }
0215 }
0216 
0217 void JEventSource::Summarize(JComponentSummary& summary) const {
0218 
0219     auto* result = new JComponentSummary::Component(
0220         "Source", GetPrefix(), GetTypeName(), GetLevel(), GetPluginName());
0221 
0222     for (const auto* output : m_outputs) {
0223         size_t suboutput_count = output->collection_names.size();
0224         for (size_t i=0; i<suboutput_count; ++i) {
0225             result->AddOutput(new JComponentSummary::Collection("", output->collection_names[i], output->type_name, GetLevel()));
0226         }
0227     }
0228 
0229     summary.Add(result);
0230 }
0231 
0232 
0233 std::pair<JEventSource::Result, size_t> JEventSource::Skip(JEvent& event, size_t events_to_skip) {
0234 
0235     // Return values
0236     Result result = Result::Success;
0237 
0238     while (events_to_skip > 0 && result == Result::Success) {
0239         try {
0240             auto previous_origin = event.GetJCallGraphRecorder()->SetInsertDataOrigin( JCallGraphRecorder::ORIGIN_FROM_SOURCE);  // (see note at top of JCallGraphRecorder.h)
0241             if (m_callback_style == CallbackStyle::LegacyMode) {
0242                 GetEvent(event.shared_from_this());
0243             }
0244             else {
0245                 result = Emit(event);
0246             }
0247             event.GetJCallGraphRecorder()->SetInsertDataOrigin( previous_origin );
0248             // We need to call FinishEvent, but we don't want to call it from inside JEvent::Clear() because we are already inside the lock.
0249             // So instead, we call it ourselves out here. This has the added benefit of letting us avoid updating m_events_processed.
0250             if (m_enable_finish_event) {
0251                 CallWithJExceptionWrapper("JEventSource::FinishEvent", [&](){ FinishEvent(event); });
0252             }
0253             event.Clear(false);
0254             events_to_skip -= 1;
0255         }
0256         catch (RETURN_STATUS rs) {
0257 
0258             if (rs == RETURN_STATUS::kNO_MORE_EVENTS) {
0259                 DoClose(false);
0260                 result = Result::FailureFinished;
0261             }
0262             else if (rs == RETURN_STATUS::kTRY_AGAIN || rs == RETURN_STATUS::kBUSY) {
0263                 result = Result::FailureTryAgain;
0264             }
0265             else if (rs == RETURN_STATUS::kERROR || rs == RETURN_STATUS::kUNKNOWN) {
0266                 JException ex ("JEventSource threw RETURN_STATUS::kERROR or kUNKNOWN");
0267                 ex.plugin_name = m_plugin_name;
0268                 ex.type_name = m_type_name;
0269                 ex.function_name = (m_callback_style == CallbackStyle::LegacyMode) ? "JEventSource::GetEvent" : "JEventSource::Emit";
0270                 ex.instance_name = m_resource_name;
0271                 throw ex;
0272             }
0273         }
0274         catch (JException& ex) {
0275             if (ex.function_name.empty()) ex.function_name = (m_callback_style == CallbackStyle::LegacyMode) ? "JEventSource::GetEvent" : "JEventSource::Emit";
0276             if (ex.type_name.empty()) ex.type_name = m_type_name;
0277             if (ex.instance_name.empty()) ex.instance_name = m_prefix;
0278             if (ex.plugin_name.empty()) ex.plugin_name = m_plugin_name;
0279             throw ex;
0280         }
0281         catch (std::exception& e){
0282             auto ex = JException(e.what());
0283             ex.exception_type = JTypeInfo::demangle_current_exception_type();
0284             ex.nested_exception = std::current_exception();
0285             ex.function_name = (m_callback_style == CallbackStyle::LegacyMode) ? "JEventSource::GetEvent" : "JEventSource::Emit";
0286             ex.type_name = m_type_name;
0287             ex.instance_name = m_prefix;
0288             ex.plugin_name = m_plugin_name;
0289             throw ex;
0290         }
0291         catch (...) {
0292             auto ex = JException("Unknown exception");
0293             ex.exception_type = JTypeInfo::demangle_current_exception_type();
0294             ex.nested_exception = std::current_exception();
0295             ex.function_name = (m_callback_style == CallbackStyle::LegacyMode) ? "JEventSource::GetEvent" : "JEventSource::Emit";
0296             ex.type_name = m_type_name;
0297             ex.instance_name = m_prefix;
0298             ex.plugin_name = m_plugin_name;
0299             throw ex;
0300         }
0301     }
0302 
0303     return {result, events_to_skip};
0304 }
0305 
0306 
0307 
0308