Back to home page

EIC code displayed by LXR

 
 

    


Warning, file /jana2/src/libraries/JANA/JEventSource.h was not indexed or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).

0001 
0002 // Copyright 2020, Jefferson Science Associates, LLC.
0003 // Subject to the terms in the LICENSE file found in the top-level directory.
0004 
0005 #pragma once
0006 
0007 #include <JANA/Components/JComponent.h>
0008 #include <JANA/Components/JHasOutputs.h>
0009 #include <JANA/JEvent.h>
0010 #include <JANA/JException.h>
0011 #include <JANA/JFactoryGenerator.h>
0012 
0013 
0014 class JFactoryGenerator;
0015 class JApplication;
0016 class JFactory;
0017 
0018 
0019 class JEventSource : public jana::components::JComponent,
0020                      public jana::components::JHasOutputs {
0021 
0022 public:
0023     /// Result describes what happened the last time a GetEvent() was attempted.
0024     /// If Emit() or GetEvent() reaches an error state, it should throw a JException instead.
0025     enum class Result { Success, FailureTryAgain, FailureFinished, FailureLevelChange };
0026 
0027     /// The user is supposed to _throw_ RETURN_STATUS::kNO_MORE_EVENTS or kBUSY from GetEvent()
0028     enum class RETURN_STATUS { kSUCCESS, kNO_MORE_EVENTS, kBUSY, kTRY_AGAIN, kERROR, kUNKNOWN };
0029 
0030     enum class Status { Unopened, Opened, Closed };
0031 
0032 private:
0033     std::string m_resource_name;
0034     std::atomic_ullong m_events_emitted {0};
0035     std::atomic_ullong m_events_skipped {0};
0036     std::atomic_ullong m_events_processed {0};
0037     uint64_t m_nskip = 0;
0038     uint64_t m_nevents = 0;
0039     bool m_enable_finish_event = false;
0040     bool m_enable_get_objects = false;
0041     bool m_enable_process_parallel = false;
0042     Status m_status = Status::Unopened;
0043 
0044     std::vector<JEventLevel> m_event_levels;
0045     JEventLevel m_next_level = JEventLevel::None;
0046 
0047 
0048 public:
0049     explicit JEventSource(std::string resource_name, JApplication* app = nullptr)
0050         : m_resource_name(std::move(resource_name)) {
0051             m_app = app;
0052         }
0053 
0054     JEventSource() {
0055         m_type_name = "JEventSource";
0056     }
0057     virtual ~JEventSource() = default;
0058 
0059 
0060 
0061     /// `Open` is called by JANA when it is ready to accept events from this event source. The implementor should open
0062     /// file pointers or sockets here, instead of in the constructor. This is because the implementor won't know how many
0063     /// or which event sources the user will decide to activate within one job. Thus the implementor can avoid problems
0064     /// such as running out of file pointers, or multiple event sources attempting to bind to the same socket.
0065 
0066     virtual void Open() {}
0067 
0068 
0069     // `Emit` is called by JANA in order to emit a fresh event into the stream, when using CallbackStyle::ExpertMode. 
0070     // It is very similar to GetEvent(), except the user returns a Result status code instead of throwing an exception.
0071     // Exceptions are reserved for unrecoverable errors. It accepts an out parameter JEvent. If there is another 
0072     // entry in the file, or another message waiting at the socket, the user reads the data into the JEvent and returns
0073     // Result::Success, at which point JANA pushes the JEvent onto the downstream queue. If there is no data waiting yet,
0074     // the user returns Result::FailureTryAgain, at which point JANA recycles the JEvent to the pool. If there is no more
0075     // data, the user returns Result::FailureFinished, at which point JANA recycles the JEvent to the pool and calls Close().
0076 
0077     virtual Result Emit(JEvent&) { return Result::Success; };
0078 
0079 
0080     /// For work that should be done in parallel on a JEvent, but is tightly coupled to the JEventSource for some reason.
0081     /// Called after Emit() by JEventMapArrow, but only if EnableProcessParallel(true) is set. Note that the JEvent& is not
0082     /// const here, because we need to be able to call event.Insert() from here. Also note that `this` IS const, because
0083     /// it is not safe to access any state in parallel from here. Note that this includes things like calibration constants.
0084     /// If you need to safely access state, put use a JFactory instead.
0085     virtual void ProcessParallel(JEvent&) const {};
0086 
0087 
0088     /// `FinishEvent` is used to notify the `JEventSource` that an event has been completely processed. This is the final
0089     /// chance to interact with the `JEvent` before it is either cleared and recycled, or deleted. Although it is
0090     /// possible to use this for freeing JObjects stored in the JEvent , this is strongly discouraged in favor of putting
0091     /// that logic on the destructor, RAII-style. Instead, this callback should be used for updating and freeing state
0092     /// owned by the JEventSource, e.g. raw data which is keyed off of run number and therefore shared among multiple
0093     /// JEvents. `FinishEvent` is also well-suited for use with `EventGroup`s, e.g. to notify someone that a batch of
0094     /// events has finished, or to implement "barrier events".
0095 
0096     virtual void FinishEvent(JEvent&) {};
0097 
0098 
0099     /// `Close` is called by JANA when it is finished accepting events from this event source. Here is where you should
0100     /// cleanly close files, sockets, etc. Although GetEvent() knows when (for instance) there are no more events in a
0101     /// file, the logic for closing needs to live here because there are other ways a computation may end besides
0102     /// running out of events in a file. For instance, the user may want to process a limited number of events using
0103     /// the `jana:nevents` parameter, or they may want to terminate the computation manually using Ctrl-C.
0104 
0105     virtual void Close() {}
0106 
0107 
0108     /// `GetEvent` is called by JANA in order to emit a fresh event into the stream. JANA manages the entire lifetime of
0109     /// the JEvent. The `JEvent` being passed in is empty and merely needs to hydrated as follows:
0110 
0111     ///  1. `SetRunNumber()`
0112     ///  2. `SetEventNumber()`
0113     ///  3. `Insert<T>()` raw data of type `T` into the `JEvent` container. Note that `T` should be a child class, such as
0114     ///     `FADC250DigiHit`, not a parent class such as `JObject` or `TObject`. Also note that `Insert` transfers
0115     ///     ownership of the data to that JEvent. If the data is shared among multiple `JEvents`, e.g. BOR data,
0116     ///     use `SetFactoryFlags(JFactory::NOT_OBJECT_OWNER)`
0117 
0118     /// Note that JEvents are usually recycled. Although all reconstruction data is cleared before `GetEvent` is called,
0119     /// the constituent `JFactories` may retain some state, e.g. statistics, or calibration data keyed off of run number.
0120 
0121     /// If an event cannot be emitted, either because the resource is not ready or because we have reached the end of
0122     /// the event stream, the implementor should throw the corresponding `RETURN_STATUS`. The user should NEVER throw
0123     /// `RETURN_STATUS SUCCESS` because this will hurt performance. Instead, they should simply return normally.
0124 
0125     virtual void GetEvent(std::shared_ptr<JEvent>) {};
0126 
0127 
0128     /// `GetObjects` was historically used for lazily unpacking data from a JEvent and putting it into a "dummy" JFactory.
0129     /// This mechanism has been replaced by `JEvent::Insert`. All lazy evaluation should happen in a (non-dummy)
0130     /// JFactory, whereas eager evaluation should happen in `JEventSource::GetEvent` via `JEvent::Insert`.
0131 
0132     virtual bool GetObjects(const std::shared_ptr<const JEvent>&, JFactory*) {
0133         return false;
0134     }
0135 
0136     /// `Skip` allows the user to move forward in the file without having to read and discard entire events. It takes
0137     /// as inputs an event object and the number of events to skip, and returns a pair containing a JEventSource::Result
0138     /// and the number of events that still need to be skipped. The event object can be completely ignored. If it is
0139     /// populated, however, it should be cleared before Skip() returns.
0140     virtual std::pair<JEventSource::Result, size_t> Skip(JEvent& event, size_t events_to_skip);
0141 
0142 
0143     // Getters
0144     
0145     std::string GetResourceName() const { return m_resource_name; }
0146 
0147     uint64_t GetEmittedEventCount() const { return m_events_emitted; };
0148     uint64_t GetSkippedEventCount() const { return m_events_skipped; };
0149     uint64_t GetProcessedEventCount() const { return m_events_processed; };
0150 
0151     const std::vector<JEventLevel> GetEventLevels() { return m_event_levels; }
0152 
0153     bool IsGetObjectsEnabled() const { return m_enable_get_objects; }
0154     bool IsFinishEventEnabled() const { return m_enable_finish_event; }
0155     bool IsProcessParallelEnabled() const { return m_enable_process_parallel; }
0156 
0157     uint64_t GetNSkip() { return m_nskip; }
0158     uint64_t GetNEvents() { return m_nevents; }
0159 
0160     virtual std::string GetVDescription() const {
0161         return "<description unavailable>";
0162     } ///< Optional for getting description via source rather than JEventSourceGenerator
0163 
0164     Status GetStatus() const { return m_status; }
0165 
0166 
0167     // Setters
0168 
0169     void SetResourceName(std::string resource_name) { m_resource_name = resource_name; }
0170 
0171     /// EnableFinishEvent() is intended to be called by the user in the constructor in order to
0172     /// tell JANA to call the provided FinishEvent method after all JEventProcessors
0173     /// have finished with a given event. This should only be enabled when absolutely necessary
0174     /// (e.g. for backwards compatibility) because it introduces contention for the JEventSource mutex,
0175     /// which will hurt performance. Conceptually, FinishEvent isn't great, and so should be avoided when possible.
0176     void EnableFinishEvent(bool enable=true) { m_enable_finish_event = enable; }
0177     void EnableGetObjects(bool enable=true) { m_enable_get_objects = enable; }
0178     void EnableProcessParallel(bool enable=true) { m_enable_process_parallel = enable; }
0179 
0180     void SetNEvents(uint64_t nevents) { m_nevents = nevents; };
0181     void SetNSkip(uint64_t nskip) { m_nskip = nskip; };
0182 
0183     void SetNextEventLevel(JEventLevel level) { m_next_level = level; }
0184     void SetEventLevels(std::vector<JEventLevel> levels) { m_event_levels = levels; }
0185     JEventLevel GetNextInputLevel() const { return m_next_level; }
0186 
0187 
0188     // Internal
0189 
0190     void DoOpen(bool with_lock=true);
0191 
0192     void DoClose(bool with_lock=true);
0193 
0194     Result DoNext(std::shared_ptr<JEvent> event);
0195 
0196     Result DoNextCompatibility(std::shared_ptr<JEvent> event);
0197 
0198     void DoFinishEvent(JEvent& event);
0199 
0200     void Summarize(JComponentSummary& summary) const override;
0201 
0202 
0203 };
0204 
0205