Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-09-18 09:41:06

0001 #include <JANA/JEventSource.h>
0002 
0003 void JEventSource::DoInit() {
0004     std::lock_guard<std::mutex> lock(m_mutex);
0005     if (m_status != Status::Uninitialized) {
0006         throw JException("Attempted to initialize a JEventSource that is already initialized!");
0007     }
0008     for (auto* parameter : m_parameters) {
0009         parameter->Init(*(m_app->GetJParameterManager()), m_prefix);
0010     }
0011     for (auto* service : m_services) {
0012         service->Fetch(m_app);
0013     }
0014     CallWithJExceptionWrapper("JEventSource::Init", [&](){ Init(); });
0015     m_status = Status::Initialized;
0016     LOG_INFO(GetLogger()) << "Initialized JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0017 }
0018 
0019 void JEventSource::DoOpen(bool with_lock) {
0020     if (with_lock) {
0021         std::lock_guard<std::mutex> lock(m_mutex);
0022         if (m_status != Status::Initialized) {
0023             throw JException("Attempted to open a JEventSource that hasn't been initialized!");
0024         }
0025         CallWithJExceptionWrapper("JEventSource::Open", [&](){ Open();});
0026         if (GetResourceName().empty()) {
0027             LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "'" << LOG_END;
0028         }
0029         else {
0030             LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0031         }
0032         m_status = Status::Opened;
0033     }
0034     else {
0035         if (m_status != Status::Initialized) {
0036             throw JException("Attempted to open a JEventSource that hasn't been initialized!");
0037         }
0038         CallWithJExceptionWrapper("JEventSource::Open", [&](){ Open();});
0039         if (GetResourceName().empty()) {
0040             LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "'" << LOG_END;
0041         }
0042         else {
0043             LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0044         }
0045         m_status = Status::Opened;
0046     }
0047 }
0048 
0049 void JEventSource::DoClose(bool with_lock) {
0050     if (with_lock) {
0051         std::lock_guard<std::mutex> lock(m_mutex);
0052 
0053         if (m_status != JEventSource::Status::Opened) return;
0054 
0055         CallWithJExceptionWrapper("JEventSource::Close", [&](){ Close();});
0056         if (GetResourceName().empty()) {
0057             LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "'" << LOG_END;
0058         }
0059         else {
0060             LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0061         }
0062         m_status = Status::Closed;
0063     }
0064     else {
0065         if (m_status != JEventSource::Status::Opened) return;
0066 
0067         CallWithJExceptionWrapper("JEventSource::Close", [&](){ Close();});
0068         if (GetResourceName().empty()) {
0069             LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "'" << LOG_END;
0070         }
0071         else {
0072             LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0073         }
0074         m_status = Status::Closed;
0075     }
0076 }
0077 
0078 JEventSource::Result JEventSource::DoNext(std::shared_ptr<JEvent> event) {
0079 
0080     std::lock_guard<std::mutex> lock(m_mutex); // In general, DoNext must be synchronized.
0081 
0082     if (m_status == Status::Uninitialized) {
0083         throw JException("JEventSource has not been initialized!");
0084     }
0085     if (m_status == Status::Initialized) {
0086         DoOpen(false);
0087     }
0088     if (m_status == Status::Opened) {
0089 
0090         // First we check whether there are events to skip. If so, we skip as many as possible
0091         if (m_nskip > 0) {
0092             auto [result, remaining_events] = Skip(*event.get(), m_nskip);
0093             m_events_skipped += (m_nskip - remaining_events);
0094             m_nskip = remaining_events;
0095 
0096             LOG_DEBUG(GetLogger()) << "Finished with Skip: " << m_events_skipped << " events skipped, " << m_nskip << " events to skip remain";
0097 
0098             // If we encountered a problem, exit and let the arrow figure out when and whether to resume.
0099             // Note that Skip() will call Close() on our behalf.
0100             if (result != Result::Success) return result;
0101         }
0102 
0103         // Next we check whether we are limited by jana:nevents
0104         if (m_nevents != 0 && m_events_emitted >= m_nevents) {
0105             LOG_DEBUG(GetLogger()) << "Closing EventSource due to reaching nevent limit";
0106             DoClose(false);
0107             return Result::FailureFinished;
0108         }
0109 
0110         // By this point we know that we are done skipping events and are ready to emit them
0111 
0112         // We configure the event
0113         event->SetEventNumber(m_events_emitted); // Default event number to event count
0114         event->SetJEventSource(this);
0115         event->SetSequential(false);
0116         event->GetJCallGraphRecorder()->Reset();
0117 
0118 
0119         JEventSource::Result result = Result::Success;
0120         try {
0121             auto previous_origin = event->GetJCallGraphRecorder()->SetInsertDataOrigin( JCallGraphRecorder::ORIGIN_FROM_SOURCE);  // (see note at top of JCallGraphRecorder.h)
0122             if (m_callback_style == CallbackStyle::LegacyMode) {
0123                 GetEvent(event);
0124             }
0125             else {
0126                 result = Emit(*event);
0127             }
0128             event->GetJCallGraphRecorder()->SetInsertDataOrigin( previous_origin );
0129         }
0130         catch (RETURN_STATUS rs) {
0131 
0132             if (rs == RETURN_STATUS::kNO_MORE_EVENTS) {
0133                 DoClose(false);
0134                 result = Result::FailureFinished;
0135             }
0136             else if (rs == RETURN_STATUS::kTRY_AGAIN || rs == RETURN_STATUS::kBUSY) {
0137                 result = Result::FailureTryAgain;
0138             }
0139             else if (rs == RETURN_STATUS::kERROR || rs == RETURN_STATUS::kUNKNOWN) {
0140                 JException ex ("JEventSource threw RETURN_STATUS::kERROR or kUNKNOWN");
0141                 ex.plugin_name = m_plugin_name;
0142                 ex.type_name = m_type_name;
0143                 ex.function_name = "JEventSource::GetEvent";
0144                 ex.instance_name = m_resource_name;
0145                 throw ex;
0146             }
0147         }
0148         catch (JException& ex) {
0149             if (ex.function_name.empty()) ex.function_name = "JEventSource::GetEvent";
0150             if (ex.type_name.empty()) ex.type_name = m_type_name;
0151             if (ex.instance_name.empty()) ex.instance_name = m_prefix;
0152             if (ex.plugin_name.empty()) ex.plugin_name = m_plugin_name;
0153             throw ex;
0154         }
0155         catch (std::exception& e){
0156             auto ex = JException(e.what());
0157             ex.exception_type = JTypeInfo::demangle_current_exception_type();
0158             ex.nested_exception = std::current_exception();
0159             ex.function_name = "JEventSource::GetEvent";
0160             ex.type_name = m_type_name;
0161             ex.instance_name = m_prefix;
0162             ex.plugin_name = m_plugin_name;
0163             throw ex;
0164         }
0165         catch (...) {
0166             auto ex = JException("Unknown exception");
0167             ex.exception_type = JTypeInfo::demangle_current_exception_type();
0168             ex.nested_exception = std::current_exception();
0169             ex.function_name = "JEventSource::GetEvent";
0170             ex.type_name = m_type_name;
0171             ex.instance_name = m_prefix;
0172             ex.plugin_name = m_plugin_name;
0173             throw ex;
0174         }
0175 
0176         if (result == Result::Success) {
0177             m_events_emitted += 1;
0178             // We end up here if we read an entry in our file or retrieved a message from our socket,
0179             // and believe we could obtain another one immediately if we wanted to
0180             for (auto* output : GetOutputs()) {
0181                 output->EulerianStore(*event->GetFactorySet());
0182             }
0183             for (auto* output : GetVariadicOutputs()) {
0184                 output->EulerianStore(*event->GetFactorySet());
0185             }
0186             return Result::Success;
0187         }
0188         else if (result == Result::FailureFinished) {
0189             // We end up here if we tried to read an entry in a file, but found EOF
0190             // or if we received a message from a socket that contained no data and indicated no more data will be coming
0191             DoClose(false);
0192             return Result::FailureFinished;
0193         }
0194         else if (result == Result::FailureTryAgain) {
0195             // We end up here if we tried to read an entry in a file but it is on a tape drive and isn't ready yet
0196             // or if we polled the socket, found no new messages, but still expect messages later
0197             return Result::FailureTryAgain;
0198         }
0199         else {
0200             throw JException("Invalid JEventSource::Result value!");
0201         }
0202     }
0203     else { // status == Closed
0204         return Result::FailureFinished;
0205     }
0206 }
0207 
0208 
0209 void JEventSource::DoFinishEvent(JEvent& event) {
0210 
0211     m_events_processed.fetch_add(1);
0212     if (m_enable_finish_event) {
0213         std::lock_guard<std::mutex> lock(m_mutex);
0214         CallWithJExceptionWrapper("JEventSource::FinishEvent", [&](){
0215             FinishEvent(event);
0216         });
0217     }
0218 }
0219 
0220 void JEventSource::Summarize(JComponentSummary& summary) const {
0221 
0222     auto* result = new JComponentSummary::Component(
0223         "Source", GetPrefix(), GetTypeName(), GetLevel(), GetPluginName());
0224 
0225     SummarizeOutputs(*result);
0226     summary.Add(result);
0227 }
0228 
0229 
0230 std::pair<JEventSource::Result, size_t> JEventSource::Skip(JEvent& event, size_t events_to_skip) {
0231 
0232     // Return values
0233     Result result = Result::Success;
0234 
0235     while (events_to_skip > 0 && result == Result::Success) {
0236         try {
0237             auto previous_origin = event.GetJCallGraphRecorder()->SetInsertDataOrigin( JCallGraphRecorder::ORIGIN_FROM_SOURCE);  // (see note at top of JCallGraphRecorder.h)
0238             if (m_callback_style == CallbackStyle::LegacyMode) {
0239                 GetEvent(event.shared_from_this());
0240             }
0241             else {
0242                 result = Emit(event);
0243             }
0244             event.GetJCallGraphRecorder()->SetInsertDataOrigin( previous_origin );
0245             // We need to call FinishEvent, but we don't want to call it from inside JEvent::Clear() because we are already inside the lock.
0246             // So instead, we call it ourselves out here. This has the added benefit of letting us avoid updating m_events_processed.
0247             if (m_enable_finish_event) {
0248                 CallWithJExceptionWrapper("JEventSource::FinishEvent", [&](){ FinishEvent(event); });
0249             }
0250             event.Clear(false);
0251             events_to_skip -= 1;
0252         }
0253         catch (RETURN_STATUS rs) {
0254 
0255             if (rs == RETURN_STATUS::kNO_MORE_EVENTS) {
0256                 DoClose(false);
0257                 result = Result::FailureFinished;
0258             }
0259             else if (rs == RETURN_STATUS::kTRY_AGAIN || rs == RETURN_STATUS::kBUSY) {
0260                 result = Result::FailureTryAgain;
0261             }
0262             else if (rs == RETURN_STATUS::kERROR || rs == RETURN_STATUS::kUNKNOWN) {
0263                 JException ex ("JEventSource threw RETURN_STATUS::kERROR or kUNKNOWN");
0264                 ex.plugin_name = m_plugin_name;
0265                 ex.type_name = m_type_name;
0266                 ex.function_name = (m_callback_style == CallbackStyle::LegacyMode) ? "JEventSource::GetEvent" : "JEventSource::Emit";
0267                 ex.instance_name = m_resource_name;
0268                 throw ex;
0269             }
0270         }
0271         catch (JException& ex) {
0272             if (ex.function_name.empty()) ex.function_name = (m_callback_style == CallbackStyle::LegacyMode) ? "JEventSource::GetEvent" : "JEventSource::Emit";
0273             if (ex.type_name.empty()) ex.type_name = m_type_name;
0274             if (ex.instance_name.empty()) ex.instance_name = m_prefix;
0275             if (ex.plugin_name.empty()) ex.plugin_name = m_plugin_name;
0276             throw ex;
0277         }
0278         catch (std::exception& e){
0279             auto ex = JException(e.what());
0280             ex.exception_type = JTypeInfo::demangle_current_exception_type();
0281             ex.nested_exception = std::current_exception();
0282             ex.function_name = (m_callback_style == CallbackStyle::LegacyMode) ? "JEventSource::GetEvent" : "JEventSource::Emit";
0283             ex.type_name = m_type_name;
0284             ex.instance_name = m_prefix;
0285             ex.plugin_name = m_plugin_name;
0286             throw ex;
0287         }
0288         catch (...) {
0289             auto ex = JException("Unknown exception");
0290             ex.exception_type = JTypeInfo::demangle_current_exception_type();
0291             ex.nested_exception = std::current_exception();
0292             ex.function_name = (m_callback_style == CallbackStyle::LegacyMode) ? "JEventSource::GetEvent" : "JEventSource::Emit";
0293             ex.type_name = m_type_name;
0294             ex.instance_name = m_prefix;
0295             ex.plugin_name = m_plugin_name;
0296             throw ex;
0297         }
0298     }
0299 
0300     return {result, events_to_skip};
0301 }
0302 
0303 
0304 
0305