Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-05-07 08:56:23

0001 #include <catch.hpp>
0002 
0003 #include <JANA/JEventProcessor.h>
0004 #include <JANA/Topology/JTopologyBuilder.h>
0005 #include <JANA/Topology/JArrow.h>
0006 #include <JANA/Topology/JSourceArrow.h>
0007 #include <JANA/Topology/JTapArrow.h>
0008 #include <deque>
0009 
0010 class BatchedArrow : public JArrow {
0011 
0012     size_t m_batch_size = 5;
0013     std::deque<JEvent*> m_batched_events;
0014 
0015 public:
0016     BatchedArrow(JEventLevel level) {
0017         SetName("BatchedArrow");
0018         SetIsParallel(false);
0019         AddPort("in", level);
0020         AddPort("out", level);
0021     }
0022 
0023     void SetBatchSize(int batch_size) { m_batch_size = batch_size; }
0024 
0025     virtual void Batch(const JEvent& evt) {
0026         GetLogger() << "Batching event " << evt.GetEventNumber();
0027     }
0028 
0029     virtual void Process() {
0030         GetLogger() << "Processing batch containing:";
0031         for (auto* evt : m_batched_events) {
0032             GetLogger() << "    " << evt->GetEventNumber();
0033         }
0034     }
0035 
0036     virtual void Unbatch(const JEvent& evt) {
0037         GetLogger() << "Unbatching event " << evt.GetEventNumber();
0038     }
0039 
0040     void Fire(JEvent* event, OutputData& outputs, size_t& output_count, JArrow::FireResult& status) override {
0041 
0042         bool releasing_batch = (event == nullptr);
0043 
0044         if (event != nullptr) {
0045             // In this case, we are _filling_ the batch, and processing it once it gets filled
0046             // Populate inputs
0047             Batch(*event);
0048 
0049             m_batched_events.push_back(event);
0050 
0051             if (m_batched_events.size() == m_batch_size) {
0052                 releasing_batch = true;
0053                 Process();
0054                 for (JEvent* event : m_batched_events) {
0055                     Unbatch(*event);
0056                     // Publish outputs
0057                 }
0058             }
0059         }
0060         if (!releasing_batch) {
0061             GetLogger() << "NOT releasing batch yet";
0062             m_next_input_port = 0;
0063             output_count = 0;
0064             status = JArrow::FireResult::KeepGoing;
0065         }
0066         else {
0067             for (int i=0; i<2 && !m_batched_events.empty(); ++i) {
0068                 JEvent* event = m_batched_events.front();
0069                 m_batched_events.pop_front();
0070                 GetLogger() << "Releasing event " << event->GetEventNumber();
0071                 outputs[i] = {event, 1};
0072                 output_count = i+1;
0073                 status = JArrow::FireResult::KeepGoing;
0074             }
0075 
0076             if (m_batched_events.empty()) {
0077                 m_next_input_port = 0;
0078             }
0079             else {
0080                 m_next_input_port = -1;
0081             }
0082         }
0083     }
0084 };
0085 
0086 struct BatchedProc : public JEventProcessor {
0087   std::vector<int> observed_event_numbers;
0088   BatchedProc() {
0089     SetCallbackStyle(JFactory::CallbackStyle::ExpertMode);
0090   }
0091   void ProcessSequential(const JEvent& event) override {
0092     LOG_INFO(GetLogger()) << "JEP found event" << event.GetEventNumber();
0093         observed_event_numbers.push_back(event.GetEventNumber());
0094   }
0095   void Finish() override {
0096     LOG_INFO(GetLogger()) << "BatchedProc observed event numbers";
0097     for (int x: observed_event_numbers) {
0098       LOG_INFO(GetLogger()) << "    " << x;
0099     }
0100   }
0101 };
0102 
0103 
0104 void configure_batched_topology(JTopologyBuilder& builder, JComponentManager& component_manager) {
0105 
0106     auto* src_arrow = new JSourceArrow("PhysicsEventSource", JEventLevel::PhysicsEvent, component_manager.get_evt_srces());
0107 
0108     BatchedArrow* batched_arrow = new BatchedArrow(JEventLevel::PhysicsEvent);
0109 
0110     JTapArrow* tap_arrow = new JTapArrow("PhysicsEventTap", JEventLevel::PhysicsEvent);
0111     for (auto proc : component_manager.get_evt_procs()) {
0112         tap_arrow->AddProcessor(proc);
0113     }
0114 
0115     builder.AddArrow(src_arrow);
0116     builder.AddArrow(batched_arrow);
0117     builder.AddArrow(tap_arrow);
0118     builder.ConnectPool("PhysicsEventSource", "in", JEventLevel::PhysicsEvent);
0119     builder.ConnectQueue("PhysicsEventSource", "out", "BatchedArrow", "in");
0120     builder.ConnectQueue("BatchedArrow", "out", "PhysicsEventTap", "in");
0121     builder.ConnectPool("PhysicsEventTap", "out", JEventLevel::PhysicsEvent);
0122 }
0123 
0124 
0125 
0126 TEST_CASE("BatchedArrow") {
0127   JApplication app;
0128   app.Add(new JEventSource);
0129   app.Add(new BatchedProc);
0130   app.SetParameterValue("jana:nevents", 49);
0131   app.SetParameterValue("nthreads", 1);
0132   app.SetParameterValue("jana:max_inflight_events", 20);
0133   app.SetParameterValue("jana:log:show_threadstamp", 1);
0134   app.SetParameterValue("jana:loglevel", "TRACE");
0135 
0136   app.GetService<JTopologyBuilder>()->SetConfigureFn(configure_batched_topology);
0137   app.Run();
0138 }
0139 
0140 
0141