Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:01:38

0001 
0002 // Copyright 2020, Jefferson Science Associates, LLC.
0003 // Subject to the terms in the LICENSE file found in the top-level directory.
0004 
0005 
0006 
0007 #ifndef JANA2_TRIDASEVENTSOURCE_H
0008 #define JANA2_TRIDASEVENTSOURCE_H
0009 
0010 #include <JANA/JEventSource.h>
0011 #include <JANA/Services/JEventGroupTracker.h>
0012 
0013 #include "TridasEvent.h"
0014 
0015 #include <queue>
0016 
0017 class BlockingGroupedEventSource : public JEventSource {
0018 
0019     JEventGroupManager m_egm;
0020 
0021     int m_pending_group_id;
0022     std::mutex m_pending_mutex;
0023     std::queue<std::pair<TridasEvent*, JEventGroup*>> m_pending_events;
0024 
0025 public:
0026 
0027     BlockingGroupedEventSource(std::string res_name, JApplication* app) : JEventSource(std::move(res_name), app) {
0028         // TODO: Get EventGroupManager from ServiceLocator instead
0029         SetCallbackStyle(CallbackStyle::ExpertMode);
0030         m_pending_group_id = 1;
0031     };
0032 
0033 
0034     /// SubmitAndWait provides a blocking interface for pushing groups of TridasEvents into JANA.
0035     /// JANA does NOT assume ownership of the events vector, nor does it clear it.
0036     void SubmitAndWait(std::vector<TridasEvent*>& events) {
0037         auto group = m_egm.GetEventGroup(m_pending_group_id++);
0038         {
0039             std::lock_guard<std::mutex> lock(m_pending_mutex);
0040             for (auto event : events) {
0041                 group->StartEvent();   // We have to call this immediately in order to 'open' the group
0042                 m_pending_events.push(std::make_pair(event, group));
0043             }
0044         }
0045         group->CloseGroup();
0046         group->WaitUntilGroupFinished();
0047     }
0048 
0049 
0050     /// GetEvent polls the queue of submitted TridasEvents and feeds them into JEvents along with a
0051     /// JEventGroup. A downstream EventProcessor may report the event as being finished. Once all
0052     /// events in the eventgroup are finished, the corresponding call to SubmitAndWait will unblock.
0053     Result Emit(JEvent& event) override {
0054 
0055         std::pair<TridasEvent*, JEventGroup*> next_event;
0056         {
0057             std::lock_guard<std::mutex> lock(m_pending_mutex);
0058             if (m_pending_events.empty()) {
0059                 return Result::FailureTryAgain;
0060             }
0061             else {
0062                 next_event = m_pending_events.front();
0063                 m_pending_events.pop();
0064             }
0065         }
0066 
0067         // Hydrate JEvent with both the TridasEvent and the group pointer.
0068         event.Insert(next_event.first);  // TridasEvent
0069         event.Insert(next_event.second); // JEventGroup
0070 
0071         // Tell JANA not to assume ownership of these objects!
0072         event.GetFactory<TridasEvent>()->SetFactoryFlag(JFactory::JFactory_Flags_t::NOT_OBJECT_OWNER);
0073         event.GetFactory<JEventGroup>()->SetFactoryFlag(JFactory::JFactory_Flags_t::NOT_OBJECT_OWNER);
0074 
0075         // JANA always needs an event number and a run number, so extract these from the Tridas data somehow
0076         event.SetEventNumber(next_event.first->event_number);
0077         event.SetRunNumber(next_event.first->run_number);
0078 
0079         return Result::Success;
0080     }
0081 };
0082 
0083 
0084 #endif //JANA2_TRIDASEVENTSOURCE_H