Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:01:41

0001 
0002 // Copyright 2020, Jefferson Science Associates, LLC.
0003 // Subject to the terms in the LICENSE file found in the top-level directory.
0004 
0005 #pragma once
0006 
0007 #include <JANA/Components/JComponent.h>
0008 #include <JANA/Components/JHasOutputs.h>
0009 #include <JANA/JEvent.h>
0010 #include <JANA/JException.h>
0011 #include <JANA/JFactoryGenerator.h>
0012 
0013 
0014 class JFactoryGenerator;
0015 class JApplication;
0016 class JFactory;
0017 
0018 
0019 class JEventSource : public jana::components::JComponent, 
0020                      public jana::components::JHasOutputs {
0021 
0022 public:
0023     
0024     /// Result describes what happened the last time a GetEvent() was attempted.
0025     /// If Emit() or GetEvent() reaches an error state, it should throw a JException instead.
0026     enum class Result { Success, FailureTryAgain, FailureFinished };
0027 
0028     // TODO: Deprecate me!
0029     /// The user is supposed to _throw_ RETURN_STATUS::kNO_MORE_EVENTS or kBUSY from GetEvent()
0030     enum class RETURN_STATUS { kSUCCESS, kNO_MORE_EVENTS, kBUSY, kTRY_AGAIN, kERROR, kUNKNOWN };
0031 
0032 
0033     // Constructor
0034     // TODO: Deprecate me!
0035     explicit JEventSource(std::string resource_name, JApplication* app = nullptr)
0036         : m_resource_name(std::move(resource_name))
0037         , m_event_count{0}
0038         {
0039             m_app = app;
0040         }
0041 
0042     JEventSource() = default;
0043     virtual ~JEventSource() = default;
0044 
0045 
0046     // `Init` is where the user requests parameters and services. If the user requests all parameters and services here,
0047     // JANA can report them back to the user without having to open the resource and run the topology.
0048 
0049     virtual void Init() {}
0050 
0051     // To be implemented by the user
0052     /// `Open` is called by JANA when it is ready to accept events from this event source. The implementor should open
0053     /// file pointers or sockets here, instead of in the constructor. This is because the implementor won't know how many
0054     /// or which event sources the user will decide to activate within one job. Thus the implementor can avoid problems
0055     /// such as running out of file pointers, or multiple event sources attempting to bind to the same socket.
0056 
0057     virtual void Open() {}
0058 
0059 
0060     // `Emit` is called by JANA in order to emit a fresh event into the stream, when using CallbackStyle::ExpertMode. 
0061     // It is very similar to GetEvent(), except the user returns a Result status code instead of throwing an exception.
0062     // Exceptions are reserved for unrecoverable errors. It accepts an out parameter JEvent. If there is another 
0063     // entry in the file, or another message waiting at the socket, the user reads the data into the JEvent and returns
0064     // Result::Success, at which point JANA pushes the JEvent onto the downstream queue. If there is no data waiting yet,
0065     // the user returns Result::FailureTryAgain, at which point JANA recycles the JEvent to the pool. If there is no more
0066     // data, the user returns Result::FailureFinished, at which point JANA recycles the JEvent to the pool and calls Close().
0067 
0068     virtual Result Emit(JEvent&) { return Result::Success; };
0069 
0070 
0071     /// `Close` is called by JANA when it is finished accepting events from this event source. Here is where you should
0072     /// cleanly close files, sockets, etc. Although GetEvent() knows when (for instance) there are no more events in a
0073     /// file, the logic for closing needs to live here because there are other ways a computation may end besides
0074     /// running out of events in a file. For instance, the user may want to process a limited number of events using
0075     /// the `jana:nevents` parameter, or they may want to terminate the computation manually using Ctrl-C.
0076 
0077     virtual void Close() {}
0078 
0079 
0080     /// `GetEvent` is called by JANA in order to emit a fresh event into the stream. JANA manages the entire lifetime of
0081     /// the JEvent. The `JEvent` being passed in is empty and merely needs to hydrated as follows:
0082 
0083     ///  1. `SetRunNumber()`
0084     ///  2. `SetEventNumber()`
0085     ///  3. `Insert<T>()` raw data of type `T` into the `JEvent` container. Note that `T` should be a child class, such as
0086     ///     `FADC250DigiHit`, not a parent class such as `JObject` or `TObject`. Also note that `Insert` transfers
0087     ///     ownership of the data to that JEvent. If the data is shared among multiple `JEvents`, e.g. BOR data,
0088     ///     use `SetFactoryFlags(JFactory::NOT_OBJECT_OWNER)`
0089 
0090     /// Note that JEvents are usually recycled. Although all reconstruction data is cleared before `GetEvent` is called,
0091     /// the constituent `JFactories` may retain some state, e.g. statistics, or calibration data keyed off of run number.
0092 
0093     /// If an event cannot be emitted, either because the resource is not ready or because we have reached the end of
0094     /// the event stream, the implementor should throw the corresponding `RETURN_STATUS`. The user should NEVER throw
0095     /// `RETURN_STATUS SUCCESS` because this will hurt performance. Instead, they should simply return normally.
0096 
0097     virtual void GetEvent(std::shared_ptr<JEvent>) {};
0098 
0099     virtual void Preprocess(const JEvent&) {};
0100 
0101 
0102     /// `FinishEvent` is used to notify the `JEventSource` that an event has been completely processed. This is the final
0103     /// chance to interact with the `JEvent` before it is either cleared and recycled, or deleted. Although it is
0104     /// possible to use this for freeing JObjects stored in the JEvent , this is strongly discouraged in favor of putting
0105     /// that logic on the destructor, RAII-style. Instead, this callback should be used for updating and freeing state
0106     /// owned by the JEventSource, e.g. raw data which is keyed off of run number and therefore shared among multiple
0107     /// JEvents. `FinishEvent` is also well-suited for use with `EventGroup`s, e.g. to notify someone that a batch of
0108     /// events has finished, or to implement "barrier events".
0109     virtual void FinishEvent(JEvent&) {};
0110 
0111 
0112     /// `GetObjects` was historically used for lazily unpacking data from a JEvent and putting it into a "dummy" JFactory.
0113     /// This mechanism has been replaced by `JEvent::Insert`. All lazy evaluation should happen in a (non-dummy)
0114     /// JFactory, whereas eager evaluation should happen in `JEventSource::GetEvent` via `JEvent::Insert`.
0115     virtual bool GetObjects(const std::shared_ptr<const JEvent>&, JFactory*) {
0116         return false;
0117     }
0118 
0119 
0120     virtual void DoInit() {
0121         if (m_status == Status::Uninitialized) {
0122             CallWithJExceptionWrapper("JEventSource::Init", [&](){ Init();});
0123             m_status = Status::Initialized;
0124             LOG_INFO(GetLogger()) << "Initialized JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0125         }
0126         else {
0127             throw JException("Attempted to initialize a JEventSource that is not uninitialized!");
0128         }
0129     }
0130 
0131     [[deprecated("Replaced by JEventSource::DoOpen()")]]
0132     virtual void DoInitialize() {
0133         DoOpen();
0134     }
0135 
0136     virtual void DoOpen(bool with_lock=true) {
0137         if (with_lock) {
0138             std::lock_guard<std::mutex> lock(m_mutex);
0139             if (m_status != Status::Initialized) {
0140                 throw JException("Attempted to open a JEventSource that hasn't been initialized!");
0141             }
0142             CallWithJExceptionWrapper("JEventSource::Open", [&](){ Open();});
0143             if (GetResourceName().empty()) {
0144                 LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "'" << LOG_END;
0145             }
0146             else {
0147                 LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0148             }
0149             m_status = Status::Opened;
0150         }
0151         else {
0152             if (m_status != Status::Initialized) {
0153                 throw JException("Attempted to open a JEventSource that hasn't been initialized!");
0154             }
0155             CallWithJExceptionWrapper("JEventSource::Open", [&](){ Open();});
0156             if (GetResourceName().empty()) {
0157                 LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "'" << LOG_END;
0158             }
0159             else {
0160                 LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0161             }
0162             m_status = Status::Opened;
0163         }
0164     }
0165 
0166     virtual void DoClose(bool with_lock=true) {
0167         if (with_lock) {
0168             std::lock_guard<std::mutex> lock(m_mutex);
0169 
0170             if (m_status != JEventSource::Status::Opened) return;
0171 
0172             CallWithJExceptionWrapper("JEventSource::Close", [&](){ Close();});
0173             if (GetResourceName().empty()) {
0174                 LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "'" << LOG_END;
0175             }
0176             else {
0177                 LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0178             }
0179             m_status = Status::Closed;
0180         }
0181         else {
0182             if (m_status != JEventSource::Status::Opened) return;
0183 
0184             CallWithJExceptionWrapper("JEventSource::Close", [&](){ Close();});
0185             if (GetResourceName().empty()) {
0186                 LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "'" << LOG_END;
0187             }
0188             else {
0189                 LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END;
0190             }
0191             m_status = Status::Closed;
0192         }
0193     }
0194     
0195     Result DoNext(std::shared_ptr<JEvent> event) {
0196 
0197         std::lock_guard<std::mutex> lock(m_mutex); // In general, DoNext must be synchronized.
0198         
0199         if (m_status == Status::Uninitialized) {
0200             throw JException("JEventSource has not been initialized!");
0201         }
0202 
0203         if (m_callback_style == CallbackStyle::LegacyMode) {
0204             return DoNextCompatibility(event);
0205         }
0206 
0207         auto first_evt_nr = m_nskip;
0208         auto last_evt_nr = m_nevents + m_nskip;
0209 
0210         if (m_status == Status::Initialized) {
0211             DoOpen(false);
0212         }
0213         if (m_status == Status::Opened) {
0214             if (m_nevents != 0 && (m_event_count == last_evt_nr)) {
0215                 // We exit early (and recycle) because we hit our jana:nevents limit
0216                 DoClose(false);
0217                 return Result::FailureFinished;
0218             }
0219             // If we reach this point, we will need to actually read an event
0220 
0221             // We configure the event
0222             event->SetEventNumber(m_event_count); // Default event number to event count
0223             event->SetJEventSource(this);
0224             event->SetSequential(false);
0225             event->GetJCallGraphRecorder()->Reset();
0226 
0227             // Now we call the new-style interface
0228             auto previous_origin = event->GetJCallGraphRecorder()->SetInsertDataOrigin( JCallGraphRecorder::ORIGIN_FROM_SOURCE);  // (see note at top of JCallGraphRecorder.h)
0229             JEventSource::Result result;
0230             CallWithJExceptionWrapper("JEventSource::Emit", [&](){
0231                 result = Emit(*event);
0232             });
0233             event->GetJCallGraphRecorder()->SetInsertDataOrigin( previous_origin );
0234 
0235             if (result == Result::Success) {
0236                 m_event_count += 1; 
0237                 // We end up here if we read an entry in our file or retrieved a message from our socket,
0238                 // and believe we could obtain another one immediately if we wanted to
0239                 for (auto* output : m_outputs) {
0240                     output->InsertCollection(*event);
0241                 }
0242                 if (m_event_count <= first_evt_nr) {
0243                     // We immediately throw away this whole event because of nskip 
0244                     // (although really we should be handling this with Seek())
0245                     return Result::FailureTryAgain;
0246                 }
0247                 return Result::Success;
0248             }
0249             else if (result == Result::FailureFinished) {
0250                 // We end up here if we tried to read an entry in a file, but found EOF
0251                 // or if we received a message from a socket that contained no data and indicated no more data will be coming
0252                 DoClose(false);
0253                 return Result::FailureFinished;
0254             }
0255             else if (result == Result::FailureTryAgain) {
0256                 // We end up here if we tried to read an entry in a file but it is on a tape drive and isn't ready yet
0257                 // or if we polled the socket, found no new messages, but still expect messages later
0258                 return Result::FailureTryAgain;
0259             }
0260             else {
0261                 throw JException("Invalid JEventSource::Result value!");
0262             }
0263         }
0264         else { // status == Closed
0265             return Result::FailureFinished;
0266         }
0267     }
0268 
0269     Result DoNextCompatibility(std::shared_ptr<JEvent> event) {
0270 
0271         auto first_evt_nr = m_nskip;
0272         auto last_evt_nr = m_nevents + m_nskip;
0273 
0274         try {
0275             if (m_status == Status::Initialized) {
0276                 DoOpen(false);
0277             }
0278             if (m_status == Status::Opened) {
0279                 if (m_event_count < first_evt_nr) {
0280                     // Skip these events due to nskip
0281                     event->SetEventNumber(m_event_count); // Default event number to event count
0282                     auto previous_origin = event->GetJCallGraphRecorder()->SetInsertDataOrigin( JCallGraphRecorder::ORIGIN_FROM_SOURCE);  // (see note at top of JCallGraphRecorder.h)
0283                     GetEvent(event);
0284                     event->GetJCallGraphRecorder()->SetInsertDataOrigin( previous_origin );
0285                     m_event_count += 1;
0286                     return Result::FailureTryAgain;  // Reject this event and recycle it
0287                 } else if (m_nevents != 0 && (m_event_count == last_evt_nr)) {
0288                     // Declare ourselves finished due to nevents
0289                     DoClose(false); // Close out the event source as soon as it declares itself finished
0290                     return Result::FailureFinished;
0291                 } else {
0292                     // Actually emit an event.
0293                     // GetEvent() expects the following things from its incoming JEvent
0294                     event->SetEventNumber(m_event_count);
0295                     event->SetJApplication(m_app);
0296                     event->SetJEventSource(this);
0297                     event->SetSequential(false);
0298                     event->GetJCallGraphRecorder()->Reset();
0299                     auto previous_origin = event->GetJCallGraphRecorder()->SetInsertDataOrigin( JCallGraphRecorder::ORIGIN_FROM_SOURCE);  // (see note at top of JCallGraphRecorder.h)
0300                     GetEvent(event);
0301                     for (auto* output : m_outputs) {
0302                         output->InsertCollection(*event);
0303                     }
0304                     event->GetJCallGraphRecorder()->SetInsertDataOrigin( previous_origin );
0305                     m_event_count += 1;
0306                     return Result::Success; // Don't reject this event!
0307                 }
0308             } else if (m_status == Status::Closed) {
0309                 return Result::FailureFinished;
0310             } else {
0311                 throw JException("Invalid m_status");
0312             }
0313         }
0314         catch (RETURN_STATUS rs) {
0315 
0316             if (rs == RETURN_STATUS::kNO_MORE_EVENTS) {
0317                 DoClose(false);
0318                 return Result::FailureFinished;
0319             }
0320             else if (rs == RETURN_STATUS::kTRY_AGAIN || rs == RETURN_STATUS::kBUSY) {
0321                 return Result::FailureTryAgain;
0322             }
0323             else if (rs == RETURN_STATUS::kERROR || rs == RETURN_STATUS::kUNKNOWN) {
0324                 JException ex ("JEventSource threw RETURN_STATUS::kERROR or kUNKNOWN");
0325                 ex.plugin_name = m_plugin_name;
0326                 ex.type_name = m_type_name;
0327                 ex.function_name = "JEventSource::GetEvent";
0328                 ex.instance_name = m_resource_name;
0329                 throw ex;
0330             }
0331             else {
0332                 return Result::Success;
0333             }
0334         }
0335         catch (JException& ex) {
0336             if (ex.function_name.empty()) ex.function_name = "JEventSource::GetEvent";
0337             if (ex.type_name.empty()) ex.type_name = m_type_name;
0338             if (ex.instance_name.empty()) ex.instance_name = m_prefix;
0339             if (ex.plugin_name.empty()) ex.plugin_name = m_plugin_name;
0340             throw ex;
0341         }
0342         catch (std::exception& e){
0343             auto ex = JException(e.what());
0344             ex.exception_type = JTypeInfo::demangle_current_exception_type();
0345             ex.nested_exception = std::current_exception();
0346             ex.function_name = "JEventSource::GetEvent";
0347             ex.type_name = m_type_name;
0348             ex.instance_name = m_prefix;
0349             ex.plugin_name = m_plugin_name;
0350             throw ex;
0351         }
0352         catch (...) {
0353             auto ex = JException("Unknown exception");
0354             ex.exception_type = JTypeInfo::demangle_current_exception_type();
0355             ex.nested_exception = std::current_exception();
0356             ex.function_name = "JEventSource::GetEvent";
0357             ex.type_name = m_type_name;
0358             ex.instance_name = m_prefix;
0359             ex.plugin_name = m_plugin_name;
0360             throw ex;
0361         }
0362     }
0363 
0364     /// Calls the optional-and-discouraged user-provided FinishEvent virtual method, enforcing
0365     /// 1. Thread safety
0366     /// 2. The m_enable_finish_event flag
0367 
0368     void DoFinish(JEvent& event) {
0369         if (m_enable_finish_event) {
0370             std::lock_guard<std::mutex> lock(m_mutex);
0371             CallWithJExceptionWrapper("JEventSource::FinishEvent", [&](){
0372                 FinishEvent(event);
0373             });
0374         }
0375     }
0376 
0377     void Summarize(JComponentSummary& summary) const override {
0378 
0379         auto* result = new JComponentSummary::Component(
0380             "Source", GetPrefix(), GetTypeName(), GetLevel(), GetPluginName());
0381 
0382         for (const auto* output : m_outputs) {
0383             size_t suboutput_count = output->collection_names.size();
0384             for (size_t i=0; i<suboutput_count; ++i) {
0385                 result->AddOutput(new JComponentSummary::Collection("", output->collection_names[i], output->type_name, GetLevel()));
0386             }
0387         }
0388 
0389         summary.Add(result);
0390     }
0391 
0392     // Getters and setters
0393     
0394     void SetResourceName(std::string resource_name) { m_resource_name = resource_name; }
0395 
0396     std::string GetResourceName() const { return m_resource_name; }
0397 
0398     uint64_t GetEventCount() const { return m_event_count; };
0399 
0400     // TODO: Deprecate me
0401     virtual std::string GetType() const { return m_type_name; }
0402 
0403     // TODO: Deprecate me
0404     std::string GetName() const { return m_resource_name; }
0405 
0406     bool IsGetObjectsEnabled() const { return m_enable_get_objects; }
0407     bool IsFinishEventEnabled() const { return m_enable_finish_event; }
0408 
0409     // TODO: Deprecate me
0410     virtual std::string GetVDescription() const {
0411         return "<description unavailable>";
0412     } ///< Optional for getting description via source rather than JEventSourceGenerator
0413 
0414 
0415     uint64_t GetNSkip() { return m_nskip; }
0416     uint64_t GetNEvents() { return m_nevents; }
0417 
0418     // Meant to be called by user
0419     /// EnableFinishEvent() is intended to be called by the user in the constructor in order to
0420     /// tell JANA to call the provided FinishEvent method after all JEventProcessors
0421     /// have finished with a given event. This should only be enabled when absolutely necessary
0422     /// (e.g. for backwards compatibility) because it introduces contention for the JEventSource mutex,
0423     /// which will hurt performance. Conceptually, FinishEvent isn't great, and so should be avoided when possible.
0424     void EnableFinishEvent(bool enable=true) { m_enable_finish_event = enable; }
0425     void EnableGetObjects(bool enable=true) { m_enable_get_objects = enable; }
0426 
0427     // Meant to be called by JANA
0428     void SetNEvents(uint64_t nevents) { m_nevents = nevents; };
0429 
0430     // Meant to be called by JANA
0431     void SetNSkip(uint64_t nskip) { m_nskip = nskip; };
0432 
0433 
0434 private:
0435     std::string m_resource_name;
0436     std::atomic_ullong m_event_count {0};
0437     uint64_t m_nskip = 0;
0438     uint64_t m_nevents = 0;
0439     bool m_enable_finish_event = false;
0440     bool m_enable_get_objects = false;
0441 
0442 };
0443 
0444