Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-12-05 10:24:26

0001 #include <JANA/JEventSource.h>
0002 
0003 void JEventSource::DoOpen(bool with_lock) {
0004     if (with_lock) {
0005         std::lock_guard<std::mutex> lock(m_mutex);
0006         if (!m_is_initialized) {
0007             throw JException("Attempted to open a JEventSource that hasn't been initialized!");
0008         }
0009         CallWithJExceptionWrapper("JEventSource::Open", [&](){ Open();});
0010         if (GetResourceName().empty()) {
0011             LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "'" << LOG_END;
0012         }
0013         else {
0014             LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0015         }
0016         m_status = Status::Opened;
0017     }
0018     else {
0019         if (!m_is_initialized) {
0020             throw JException("Attempted to open a JEventSource that hasn't been initialized!");
0021         }
0022         CallWithJExceptionWrapper("JEventSource::Open", [&](){ Open();});
0023         if (GetResourceName().empty()) {
0024             LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "'" << LOG_END;
0025         }
0026         else {
0027             LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0028         }
0029         m_status = Status::Opened;
0030     }
0031 }
0032 
0033 void JEventSource::DoClose(bool with_lock) {
0034     if (with_lock) {
0035         std::lock_guard<std::mutex> lock(m_mutex);
0036 
0037         if (m_status != JEventSource::Status::Opened) return;
0038 
0039         CallWithJExceptionWrapper("JEventSource::Close", [&](){ Close();});
0040         if (GetResourceName().empty()) {
0041             LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "'" << LOG_END;
0042         }
0043         else {
0044             LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0045         }
0046         m_status = Status::Closed;
0047     }
0048     else {
0049         if (m_status != JEventSource::Status::Opened) return;
0050 
0051         CallWithJExceptionWrapper("JEventSource::Close", [&](){ Close();});
0052         if (GetResourceName().empty()) {
0053             LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "'" << LOG_END;
0054         }
0055         else {
0056             LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0057         }
0058         m_status = Status::Closed;
0059     }
0060 }
0061 
0062 JEventSource::Result JEventSource::DoNext(std::shared_ptr<JEvent> event) {
0063 
0064     std::lock_guard<std::mutex> lock(m_mutex); // In general, DoNext must be synchronized.
0065 
0066     if (!m_is_initialized) {
0067         throw JException("JEventSource has not been initialized!");
0068     }
0069     if (m_status == Status::Unopened) {
0070         DoOpen(false);
0071     }
0072     if (m_status == Status::Opened) {
0073 
0074         // First we check whether there are events to skip. If so, we skip as many as possible
0075         if (m_nskip > 0) {
0076             auto [result, remaining_events] = Skip(*event.get(), m_nskip);
0077             m_events_skipped += (m_nskip - remaining_events);
0078             m_nskip = remaining_events;
0079 
0080             LOG_DEBUG(GetLogger()) << "Finished with Skip: " << m_events_skipped << " events skipped, " << m_nskip << " events to skip remain";
0081 
0082             // If we encountered a problem, exit and let the arrow figure out when and whether to resume.
0083             // Note that Skip() will call Close() on our behalf.
0084             if (result != Result::Success) return result;
0085         }
0086 
0087         // Next we check whether we are limited by jana:nevents
0088         if (m_nevents != 0 && m_events_emitted >= m_nevents) {
0089             LOG_DEBUG(GetLogger()) << "Closing EventSource due to reaching nevent limit";
0090             DoClose(false);
0091             return Result::FailureFinished;
0092         }
0093 
0094         // By this point we know that we are done skipping events and are ready to emit them
0095 
0096         // We configure the event
0097         event->SetEventNumber(m_events_emitted); // Default event number to event count
0098         event->SetJEventSource(this);
0099         event->SetSequential(false);
0100         event->GetJCallGraphRecorder()->Reset();
0101 
0102 
0103         JEventSource::Result result = Result::Success;
0104         try {
0105             auto previous_origin = event->GetJCallGraphRecorder()->SetInsertDataOrigin( JCallGraphRecorder::ORIGIN_FROM_SOURCE);  // (see note at top of JCallGraphRecorder.h)
0106             if (m_callback_style == CallbackStyle::LegacyMode) {
0107                 GetEvent(event);
0108             }
0109             else {
0110                 result = Emit(*event);
0111             }
0112             event->GetJCallGraphRecorder()->SetInsertDataOrigin( previous_origin );
0113         }
0114         catch (RETURN_STATUS rs) {
0115 
0116             if (rs == RETURN_STATUS::kNO_MORE_EVENTS) {
0117                 DoClose(false);
0118                 result = Result::FailureFinished;
0119             }
0120             else if (rs == RETURN_STATUS::kTRY_AGAIN || rs == RETURN_STATUS::kBUSY) {
0121                 result = Result::FailureTryAgain;
0122             }
0123             else if (rs == RETURN_STATUS::kERROR || rs == RETURN_STATUS::kUNKNOWN) {
0124                 JException ex ("JEventSource threw RETURN_STATUS::kERROR or kUNKNOWN");
0125                 ex.plugin_name = m_plugin_name;
0126                 ex.type_name = m_type_name;
0127                 ex.function_name = "JEventSource::GetEvent";
0128                 ex.instance_name = m_resource_name;
0129                 throw ex;
0130             }
0131         }
0132         catch (JException& ex) {
0133             if (ex.function_name.empty()) ex.function_name = "JEventSource::GetEvent";
0134             if (ex.type_name.empty()) ex.type_name = m_type_name;
0135             if (ex.instance_name.empty()) ex.instance_name = m_prefix;
0136             if (ex.plugin_name.empty()) ex.plugin_name = m_plugin_name;
0137             throw ex;
0138         }
0139         catch (std::exception& e){
0140             auto ex = JException(e.what());
0141             ex.exception_type = JTypeInfo::demangle_current_exception_type();
0142             ex.nested_exception = std::current_exception();
0143             ex.function_name = "JEventSource::GetEvent";
0144             ex.type_name = m_type_name;
0145             ex.instance_name = m_prefix;
0146             ex.plugin_name = m_plugin_name;
0147             throw ex;
0148         }
0149         catch (...) {
0150             auto ex = JException("Unknown exception");
0151             ex.exception_type = JTypeInfo::demangle_current_exception_type();
0152             ex.nested_exception = std::current_exception();
0153             ex.function_name = "JEventSource::GetEvent";
0154             ex.type_name = m_type_name;
0155             ex.instance_name = m_prefix;
0156             ex.plugin_name = m_plugin_name;
0157             throw ex;
0158         }
0159 
0160         if (result == Result::Success) {
0161             m_events_emitted += 1;
0162             // We end up here if we read an entry in our file or retrieved a message from our socket,
0163             // and believe we could obtain another one immediately if we wanted to
0164             for (auto* output : GetOutputs()) {
0165                 output->EulerianStore(*event->GetFactorySet());
0166             }
0167             for (auto* output : GetVariadicOutputs()) {
0168                 output->EulerianStore(*event->GetFactorySet());
0169             }
0170             return Result::Success;
0171         }
0172         else if (result == Result::FailureFinished) {
0173             // We end up here if we tried to read an entry in a file, but found EOF
0174             // or if we received a message from a socket that contained no data and indicated no more data will be coming
0175             DoClose(false);
0176             return Result::FailureFinished;
0177         }
0178         else if (result == Result::FailureTryAgain) {
0179             // We end up here if we tried to read an entry in a file but it is on a tape drive and isn't ready yet
0180             // or if we polled the socket, found no new messages, but still expect messages later
0181             return Result::FailureTryAgain;
0182         }
0183         else {
0184             throw JException("Invalid JEventSource::Result value!");
0185         }
0186     }
0187     else { // status == Closed
0188         return Result::FailureFinished;
0189     }
0190 }
0191 
0192 
0193 void JEventSource::DoFinishEvent(JEvent& event) {
0194 
0195     m_events_processed.fetch_add(1);
0196     if (m_enable_finish_event) {
0197         std::lock_guard<std::mutex> lock(m_mutex);
0198         CallWithJExceptionWrapper("JEventSource::FinishEvent", [&](){
0199             FinishEvent(event);
0200         });
0201     }
0202 }
0203 
0204 void JEventSource::Summarize(JComponentSummary& summary) const {
0205 
0206     auto* result = new JComponentSummary::Component(
0207         "Source", GetPrefix(), GetTypeName(), GetLevel(), GetPluginName());
0208 
0209     SummarizeOutputs(*result);
0210     summary.Add(result);
0211 }
0212 
0213 
0214 std::pair<JEventSource::Result, size_t> JEventSource::Skip(JEvent& event, size_t events_to_skip) {
0215 
0216     // Return values
0217     Result result = Result::Success;
0218 
0219     while (events_to_skip > 0 && result == Result::Success) {
0220         try {
0221             auto previous_origin = event.GetJCallGraphRecorder()->SetInsertDataOrigin( JCallGraphRecorder::ORIGIN_FROM_SOURCE);  // (see note at top of JCallGraphRecorder.h)
0222             if (m_callback_style == CallbackStyle::LegacyMode) {
0223                 GetEvent(event.shared_from_this());
0224             }
0225             else {
0226                 result = Emit(event);
0227             }
0228             event.GetJCallGraphRecorder()->SetInsertDataOrigin( previous_origin );
0229             // We need to call FinishEvent, but we don't want to call it from inside JEvent::Clear() because we are already inside the lock.
0230             // So instead, we call it ourselves out here. This has the added benefit of letting us avoid updating m_events_processed.
0231             if (m_enable_finish_event) {
0232                 CallWithJExceptionWrapper("JEventSource::FinishEvent", [&](){ FinishEvent(event); });
0233             }
0234             event.Clear(false);
0235             events_to_skip -= 1;
0236         }
0237         catch (RETURN_STATUS rs) {
0238 
0239             if (rs == RETURN_STATUS::kNO_MORE_EVENTS) {
0240                 DoClose(false);
0241                 result = Result::FailureFinished;
0242             }
0243             else if (rs == RETURN_STATUS::kTRY_AGAIN || rs == RETURN_STATUS::kBUSY) {
0244                 result = Result::FailureTryAgain;
0245             }
0246             else if (rs == RETURN_STATUS::kERROR || rs == RETURN_STATUS::kUNKNOWN) {
0247                 JException ex ("JEventSource threw RETURN_STATUS::kERROR or kUNKNOWN");
0248                 ex.plugin_name = m_plugin_name;
0249                 ex.type_name = m_type_name;
0250                 ex.function_name = (m_callback_style == CallbackStyle::LegacyMode) ? "JEventSource::GetEvent" : "JEventSource::Emit";
0251                 ex.instance_name = m_resource_name;
0252                 throw ex;
0253             }
0254         }
0255         catch (JException& ex) {
0256             if (ex.function_name.empty()) ex.function_name = (m_callback_style == CallbackStyle::LegacyMode) ? "JEventSource::GetEvent" : "JEventSource::Emit";
0257             if (ex.type_name.empty()) ex.type_name = m_type_name;
0258             if (ex.instance_name.empty()) ex.instance_name = m_prefix;
0259             if (ex.plugin_name.empty()) ex.plugin_name = m_plugin_name;
0260             throw ex;
0261         }
0262         catch (std::exception& e){
0263             auto ex = JException(e.what());
0264             ex.exception_type = JTypeInfo::demangle_current_exception_type();
0265             ex.nested_exception = std::current_exception();
0266             ex.function_name = (m_callback_style == CallbackStyle::LegacyMode) ? "JEventSource::GetEvent" : "JEventSource::Emit";
0267             ex.type_name = m_type_name;
0268             ex.instance_name = m_prefix;
0269             ex.plugin_name = m_plugin_name;
0270             throw ex;
0271         }
0272         catch (...) {
0273             auto ex = JException("Unknown exception");
0274             ex.exception_type = JTypeInfo::demangle_current_exception_type();
0275             ex.nested_exception = std::current_exception();
0276             ex.function_name = (m_callback_style == CallbackStyle::LegacyMode) ? "JEventSource::GetEvent" : "JEventSource::Emit";
0277             ex.type_name = m_type_name;
0278             ex.instance_name = m_prefix;
0279             ex.plugin_name = m_plugin_name;
0280             throw ex;
0281         }
0282     }
0283 
0284     return {result, events_to_skip};
0285 }
0286 
0287 
0288 
0289