File indexing completed on 2026-05-07 08:56:23
0001 #include <catch.hpp>
0002
0003 #include <JANA/JEventProcessor.h>
0004 #include <JANA/Topology/JTopologyBuilder.h>
0005 #include <JANA/Topology/JArrow.h>
0006 #include <JANA/Topology/JSourceArrow.h>
0007 #include <JANA/Topology/JTapArrow.h>
0008 #include <deque>
0009
0010 class BatchedArrow : public JArrow {
0011
0012 size_t m_batch_size = 5;
0013 std::deque<JEvent*> m_batched_events;
0014
0015 public:
0016 BatchedArrow(JEventLevel level) {
0017 SetName("BatchedArrow");
0018 SetIsParallel(false);
0019 AddPort("in", level);
0020 AddPort("out", level);
0021 }
0022
0023 void SetBatchSize(int batch_size) { m_batch_size = batch_size; }
0024
0025 virtual void Batch(const JEvent& evt) {
0026 GetLogger() << "Batching event " << evt.GetEventNumber();
0027 }
0028
0029 virtual void Process() {
0030 GetLogger() << "Processing batch containing:";
0031 for (auto* evt : m_batched_events) {
0032 GetLogger() << " " << evt->GetEventNumber();
0033 }
0034 }
0035
0036 virtual void Unbatch(const JEvent& evt) {
0037 GetLogger() << "Unbatching event " << evt.GetEventNumber();
0038 }
0039
0040 void Fire(JEvent* event, OutputData& outputs, size_t& output_count, JArrow::FireResult& status) override {
0041
0042 bool releasing_batch = (event == nullptr);
0043
0044 if (event != nullptr) {
0045
0046
0047 Batch(*event);
0048
0049 m_batched_events.push_back(event);
0050
0051 if (m_batched_events.size() == m_batch_size) {
0052 releasing_batch = true;
0053 Process();
0054 for (JEvent* event : m_batched_events) {
0055 Unbatch(*event);
0056
0057 }
0058 }
0059 }
0060 if (!releasing_batch) {
0061 GetLogger() << "NOT releasing batch yet";
0062 m_next_input_port = 0;
0063 output_count = 0;
0064 status = JArrow::FireResult::KeepGoing;
0065 }
0066 else {
0067 for (int i=0; i<2 && !m_batched_events.empty(); ++i) {
0068 JEvent* event = m_batched_events.front();
0069 m_batched_events.pop_front();
0070 GetLogger() << "Releasing event " << event->GetEventNumber();
0071 outputs[i] = {event, 1};
0072 output_count = i+1;
0073 status = JArrow::FireResult::KeepGoing;
0074 }
0075
0076 if (m_batched_events.empty()) {
0077 m_next_input_port = 0;
0078 }
0079 else {
0080 m_next_input_port = -1;
0081 }
0082 }
0083 }
0084 };
0085
0086 struct BatchedProc : public JEventProcessor {
0087 std::vector<int> observed_event_numbers;
0088 BatchedProc() {
0089 SetCallbackStyle(JFactory::CallbackStyle::ExpertMode);
0090 }
0091 void ProcessSequential(const JEvent& event) override {
0092 LOG_INFO(GetLogger()) << "JEP found event" << event.GetEventNumber();
0093 observed_event_numbers.push_back(event.GetEventNumber());
0094 }
0095 void Finish() override {
0096 LOG_INFO(GetLogger()) << "BatchedProc observed event numbers";
0097 for (int x: observed_event_numbers) {
0098 LOG_INFO(GetLogger()) << " " << x;
0099 }
0100 }
0101 };
0102
0103
0104 void configure_batched_topology(JTopologyBuilder& builder, JComponentManager& component_manager) {
0105
0106 auto* src_arrow = new JSourceArrow("PhysicsEventSource", JEventLevel::PhysicsEvent, component_manager.get_evt_srces());
0107
0108 BatchedArrow* batched_arrow = new BatchedArrow(JEventLevel::PhysicsEvent);
0109
0110 JTapArrow* tap_arrow = new JTapArrow("PhysicsEventTap", JEventLevel::PhysicsEvent);
0111 for (auto proc : component_manager.get_evt_procs()) {
0112 tap_arrow->AddProcessor(proc);
0113 }
0114
0115 builder.AddArrow(src_arrow);
0116 builder.AddArrow(batched_arrow);
0117 builder.AddArrow(tap_arrow);
0118 builder.ConnectPool("PhysicsEventSource", "in", JEventLevel::PhysicsEvent);
0119 builder.ConnectQueue("PhysicsEventSource", "out", "BatchedArrow", "in");
0120 builder.ConnectQueue("BatchedArrow", "out", "PhysicsEventTap", "in");
0121 builder.ConnectPool("PhysicsEventTap", "out", JEventLevel::PhysicsEvent);
0122 }
0123
0124
0125
0126 TEST_CASE("BatchedArrow") {
0127 JApplication app;
0128 app.Add(new JEventSource);
0129 app.Add(new BatchedProc);
0130 app.SetParameterValue("jana:nevents", 49);
0131 app.SetParameterValue("nthreads", 1);
0132 app.SetParameterValue("jana:max_inflight_events", 20);
0133 app.SetParameterValue("jana:log:show_threadstamp", 1);
0134 app.SetParameterValue("jana:loglevel", "TRACE");
0135
0136 app.GetService<JTopologyBuilder>()->SetConfigureFn(configure_batched_topology);
0137 app.Run();
0138 }
0139
0140
0141