Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:17:38

0001 #include <JANA/JEventSource.h>
0002 
0003 void JEventSource::DoInit() {
0004     std::lock_guard<std::mutex> lock(m_mutex);
0005     if (m_status != Status::Uninitialized) {
0006         throw JException("Attempted to initialize a JEventSource that is already initialized!");
0007     }
0008     for (auto* parameter : m_parameters) {
0009         parameter->Configure(*(m_app->GetJParameterManager()), m_prefix);
0010     }
0011     for (auto* service : m_services) {
0012         service->Fetch(m_app);
0013     }
0014     CallWithJExceptionWrapper("JEventSource::Init", [&](){ Init(); });
0015     m_status = Status::Initialized;
0016     LOG_INFO(GetLogger()) << "Initialized JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0017 }
0018 
0019 void JEventSource::DoInitialize() {
0020     DoOpen();
0021 }
0022 
0023 void JEventSource::DoOpen(bool with_lock) {
0024     if (with_lock) {
0025         std::lock_guard<std::mutex> lock(m_mutex);
0026         if (m_status != Status::Initialized) {
0027             throw JException("Attempted to open a JEventSource that hasn't been initialized!");
0028         }
0029         CallWithJExceptionWrapper("JEventSource::Open", [&](){ Open();});
0030         if (GetResourceName().empty()) {
0031             LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "'" << LOG_END;
0032         }
0033         else {
0034             LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0035         }
0036         m_status = Status::Opened;
0037     }
0038     else {
0039         if (m_status != Status::Initialized) {
0040             throw JException("Attempted to open a JEventSource that hasn't been initialized!");
0041         }
0042         CallWithJExceptionWrapper("JEventSource::Open", [&](){ Open();});
0043         if (GetResourceName().empty()) {
0044             LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "'" << LOG_END;
0045         }
0046         else {
0047             LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0048         }
0049         m_status = Status::Opened;
0050     }
0051 }
0052 
0053 void JEventSource::DoClose(bool with_lock) {
0054     if (with_lock) {
0055         std::lock_guard<std::mutex> lock(m_mutex);
0056 
0057         if (m_status != JEventSource::Status::Opened) return;
0058 
0059         CallWithJExceptionWrapper("JEventSource::Close", [&](){ Close();});
0060         if (GetResourceName().empty()) {
0061             LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "'" << LOG_END;
0062         }
0063         else {
0064             LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0065         }
0066         m_status = Status::Closed;
0067     }
0068     else {
0069         if (m_status != JEventSource::Status::Opened) return;
0070 
0071         CallWithJExceptionWrapper("JEventSource::Close", [&](){ Close();});
0072         if (GetResourceName().empty()) {
0073             LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "'" << LOG_END;
0074         }
0075         else {
0076             LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0077         }
0078         m_status = Status::Closed;
0079     }
0080 }
0081 
0082 JEventSource::Result JEventSource::DoNext(std::shared_ptr<JEvent> event) {
0083 
0084     std::lock_guard<std::mutex> lock(m_mutex); // In general, DoNext must be synchronized.
0085     
0086     if (m_status == Status::Uninitialized) {
0087         throw JException("JEventSource has not been initialized!");
0088     }
0089 
0090     if (m_callback_style == CallbackStyle::LegacyMode) {
0091         return DoNextCompatibility(event);
0092     }
0093 
0094     auto first_evt_nr = m_nskip;
0095     auto last_evt_nr = m_nevents + m_nskip;
0096 
0097     if (m_status == Status::Initialized) {
0098         DoOpen(false);
0099     }
0100     if (m_status == Status::Opened) {
0101         if (m_nevents != 0 && (m_events_emitted == last_evt_nr)) {
0102             // We exit early (and recycle) because we hit our jana:nevents limit
0103             DoClose(false);
0104             return Result::FailureFinished;
0105         }
0106         // If we reach this point, we will need to actually read an event
0107 
0108         // We configure the event
0109         event->SetEventNumber(m_events_emitted); // Default event number to event count
0110         event->SetJEventSource(this);
0111         event->SetSequential(false);
0112         event->GetJCallGraphRecorder()->Reset();
0113 
0114         // Now we call the new-style interface
0115         auto previous_origin = event->GetJCallGraphRecorder()->SetInsertDataOrigin( JCallGraphRecorder::ORIGIN_FROM_SOURCE);  // (see note at top of JCallGraphRecorder.h)
0116         JEventSource::Result result;
0117         CallWithJExceptionWrapper("JEventSource::Emit", [&](){
0118             result = Emit(*event);
0119         });
0120         event->GetJCallGraphRecorder()->SetInsertDataOrigin( previous_origin );
0121 
0122         if (result == Result::Success) {
0123             m_events_emitted += 1; 
0124             // We end up here if we read an entry in our file or retrieved a message from our socket,
0125             // and believe we could obtain another one immediately if we wanted to
0126             for (auto* output : m_outputs) {
0127                 output->InsertCollection(*event);
0128             }
0129             if (m_events_emitted <= first_evt_nr) {
0130                 // We immediately throw away this whole event because of nskip 
0131                 // (although really we should be handling this with Seek())
0132                 return Result::FailureTryAgain;
0133             }
0134             return Result::Success;
0135         }
0136         else if (result == Result::FailureFinished) {
0137             // We end up here if we tried to read an entry in a file, but found EOF
0138             // or if we received a message from a socket that contained no data and indicated no more data will be coming
0139             DoClose(false);
0140             return Result::FailureFinished;
0141         }
0142         else if (result == Result::FailureTryAgain) {
0143             // We end up here if we tried to read an entry in a file but it is on a tape drive and isn't ready yet
0144             // or if we polled the socket, found no new messages, but still expect messages later
0145             return Result::FailureTryAgain;
0146         }
0147         else {
0148             throw JException("Invalid JEventSource::Result value!");
0149         }
0150     }
0151     else { // status == Closed
0152         return Result::FailureFinished;
0153     }
0154 }
0155 
0156 JEventSource::Result JEventSource::DoNextCompatibility(std::shared_ptr<JEvent> event) {
0157 
0158     auto first_evt_nr = m_nskip;
0159     auto last_evt_nr = m_nevents + m_nskip;
0160 
0161     try {
0162         if (m_status == Status::Initialized) {
0163             DoOpen(false);
0164         }
0165         if (m_status == Status::Opened) {
0166             if (m_events_emitted < first_evt_nr) {
0167                 // Skip these events due to nskip
0168                 event->SetEventNumber(m_events_emitted); // Default event number to event count
0169                 auto previous_origin = event->GetJCallGraphRecorder()->SetInsertDataOrigin( JCallGraphRecorder::ORIGIN_FROM_SOURCE);  // (see note at top of JCallGraphRecorder.h)
0170                 GetEvent(event);
0171                 event->GetJCallGraphRecorder()->SetInsertDataOrigin( previous_origin );
0172                 m_events_emitted += 1;
0173                 return Result::FailureTryAgain;  // Reject this event and recycle it
0174             } else if (m_nevents != 0 && (m_events_emitted == last_evt_nr)) {
0175                 // Declare ourselves finished due to nevents
0176                 DoClose(false); // Close out the event source as soon as it declares itself finished
0177                 return Result::FailureFinished;
0178             } else {
0179                 // Actually emit an event.
0180                 // GetEvent() expects the following things from its incoming JEvent
0181                 event->SetEventNumber(m_events_emitted);
0182                 event->SetJApplication(m_app);
0183                 event->SetJEventSource(this);
0184                 event->SetSequential(false);
0185                 event->GetJCallGraphRecorder()->Reset();
0186                 auto previous_origin = event->GetJCallGraphRecorder()->SetInsertDataOrigin( JCallGraphRecorder::ORIGIN_FROM_SOURCE);  // (see note at top of JCallGraphRecorder.h)
0187                 GetEvent(event);
0188                 for (auto* output : m_outputs) {
0189                     output->InsertCollection(*event);
0190                 }
0191                 event->GetJCallGraphRecorder()->SetInsertDataOrigin( previous_origin );
0192                 m_events_emitted += 1;
0193                 return Result::Success; // Don't reject this event!
0194             }
0195         } else if (m_status == Status::Closed) {
0196             return Result::FailureFinished;
0197         } else {
0198             throw JException("Invalid m_status");
0199         }
0200     }
0201     catch (RETURN_STATUS rs) {
0202 
0203         if (rs == RETURN_STATUS::kNO_MORE_EVENTS) {
0204             DoClose(false);
0205             return Result::FailureFinished;
0206         }
0207         else if (rs == RETURN_STATUS::kTRY_AGAIN || rs == RETURN_STATUS::kBUSY) {
0208             return Result::FailureTryAgain;
0209         }
0210         else if (rs == RETURN_STATUS::kERROR || rs == RETURN_STATUS::kUNKNOWN) {
0211             JException ex ("JEventSource threw RETURN_STATUS::kERROR or kUNKNOWN");
0212             ex.plugin_name = m_plugin_name;
0213             ex.type_name = m_type_name;
0214             ex.function_name = "JEventSource::GetEvent";
0215             ex.instance_name = m_resource_name;
0216             throw ex;
0217         }
0218         else {
0219             return Result::Success;
0220         }
0221     }
0222     catch (JException& ex) {
0223         if (ex.function_name.empty()) ex.function_name = "JEventSource::GetEvent";
0224         if (ex.type_name.empty()) ex.type_name = m_type_name;
0225         if (ex.instance_name.empty()) ex.instance_name = m_prefix;
0226         if (ex.plugin_name.empty()) ex.plugin_name = m_plugin_name;
0227         throw ex;
0228     }
0229     catch (std::exception& e){
0230         auto ex = JException(e.what());
0231         ex.exception_type = JTypeInfo::demangle_current_exception_type();
0232         ex.nested_exception = std::current_exception();
0233         ex.function_name = "JEventSource::GetEvent";
0234         ex.type_name = m_type_name;
0235         ex.instance_name = m_prefix;
0236         ex.plugin_name = m_plugin_name;
0237         throw ex;
0238     }
0239     catch (...) {
0240         auto ex = JException("Unknown exception");
0241         ex.exception_type = JTypeInfo::demangle_current_exception_type();
0242         ex.nested_exception = std::current_exception();
0243         ex.function_name = "JEventSource::GetEvent";
0244         ex.type_name = m_type_name;
0245         ex.instance_name = m_prefix;
0246         ex.plugin_name = m_plugin_name;
0247         throw ex;
0248     }
0249 }
0250 
0251 
0252 void JEventSource::DoFinishEvent(JEvent& event) {
0253 
0254     m_events_finished.fetch_add(1);
0255     if (m_enable_finish_event) {
0256         std::lock_guard<std::mutex> lock(m_mutex);
0257         CallWithJExceptionWrapper("JEventSource::FinishEvent", [&](){
0258             FinishEvent(event);
0259         });
0260     }
0261 }
0262 
0263 void JEventSource::Summarize(JComponentSummary& summary) const {
0264 
0265     auto* result = new JComponentSummary::Component(
0266         "Source", GetPrefix(), GetTypeName(), GetLevel(), GetPluginName());
0267 
0268     for (const auto* output : m_outputs) {
0269         size_t suboutput_count = output->collection_names.size();
0270         for (size_t i=0; i<suboutput_count; ++i) {
0271             result->AddOutput(new JComponentSummary::Collection("", output->collection_names[i], output->type_name, GetLevel()));
0272         }
0273     }
0274 
0275     summary.Add(result);
0276 }