Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-01-08 10:35:28

0001 
0002 #define JANA2_TESTCASE
0003 
0004 #include "JANA/Engine/JExecutionEngine.h"
0005 #include "JANA/Utils/JBenchUtils.h"
0006 #include <catch.hpp>
0007 
0008 #include <JANA/JApplication.h>
0009 #include <JANA/JEventSource.h>
0010 #include <JANA/JEventProcessor.h>
0011 #include <chrono>
0012 #include <thread>
0013 
0014 namespace jana::engine::tests {
0015 
0016 struct TestData { int x; };
0017 struct TestSource : public JEventSource {
0018     JBenchUtils bench;
0019     TestSource() {
0020         SetCallbackStyle(CallbackStyle::ExpertMode);
0021     }
0022     Result Emit(JEvent& event) override {
0023         event.Insert<TestData>(new TestData {.x=(int) GetEmittedEventCount() * 2}, "src");
0024         bench.consume_cpu_ms(100);
0025         return Result::Success;
0026     }
0027 };
0028 struct TestProc : public JEventProcessor {
0029     JBenchUtils bench;
0030     TestProc() {
0031         SetCallbackStyle(CallbackStyle::ExpertMode);
0032     }
0033     void ProcessParallel(const JEvent& event) override {
0034         JBenchUtils local_bench;
0035         local_bench.consume_cpu_ms(300);
0036         auto src_x = event.Get<TestData>("src").at(0)->x;
0037         event.Insert<TestData>(new TestData {.x=src_x + 1}, "map");
0038     }
0039     void ProcessSequential(const JEvent& event) override {
0040         bench.consume_cpu_ms(200);
0041         auto map_x = event.Get<TestData>("map").at(0)->x;
0042         REQUIRE(map_x == (int)event.GetEventNumber()*2 + 1);
0043     }
0044 };
0045 
0046 
0047 TEST_CASE("JExecutionEngine_StateMachine") {
0048 
0049     JApplication app;
0050     app.Add(new TestSource());
0051     app.Add(new TestProc());
0052     app.Initialize();
0053     auto sut = app.GetService<JExecutionEngine>();
0054 
0055     SECTION("ManualFinish") {
0056         REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Paused);
0057         sut->ScaleWorkers(0);
0058         sut->RunTopology();
0059         REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Running);
0060         sut->PauseTopology();
0061         REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Pausing);
0062         
0063         // Need to trigger the Pause since there are no workers to do so
0064         auto worker = sut->RegisterWorker();
0065         JExecutionEngine::Task task;
0066         sut->ExchangeTask(task, worker.worker_id, true);
0067 
0068         sut->RunSupervisor();
0069         REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Paused);
0070         sut->FinishTopology();
0071         REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Finished);
0072     }
0073 
0074     SECTION("AutoFinish") {
0075         REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Paused);
0076         sut->ScaleWorkers(0);
0077         sut->RunTopology();
0078         REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Running);
0079         sut->PauseTopology();
0080         REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Pausing);
0081 
0082         // Need to trigger the Pause since there are no workers to do so
0083         auto worker = sut->RegisterWorker();
0084         JExecutionEngine::Task task;
0085         sut->ExchangeTask(task, worker.worker_id, true);
0086 
0087         sut->RunSupervisor();
0088         sut->FinishTopology();
0089 
0090         REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Finished);
0091     }
0092 }
0093 
0094 
0095 TEST_CASE("JExecutionEngine_ExternalWorkers") {
0096     JApplication app;
0097     app.SetParameterValue("jana:nevents", 1);
0098     app.SetParameterValue("jana:max_inflight_events", 2);
0099 
0100     app.Add(new TestSource());
0101     app.Add(new TestProc());
0102     app.Initialize();
0103     auto sut = app.GetService<JExecutionEngine>();
0104 
0105 
0106     SECTION("SelfTermination") {
0107 
0108         auto worker = sut->RegisterWorker();
0109         REQUIRE(worker.worker_id == 0); // Not threadsafe doing this
0110 
0111         REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Paused);
0112         REQUIRE(sut->GetPerf().thread_count == 1);
0113         sut->RunTopology();
0114         REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Running);
0115 
0116         JExecutionEngine::Task task;
0117 
0118         sut->ExchangeTask(task, worker.worker_id);
0119         REQUIRE(task.arrow != nullptr);
0120         REQUIRE(task.arrow->get_name() == "PhysicsEventSource"); // Only task available at this point!
0121         REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Running);
0122         REQUIRE(sut->GetPerf().event_count == 0);
0123 
0124         task.arrow->fire(task.input_event, task.outputs, task.output_count, task.status);
0125         REQUIRE(task.output_count == 1);
0126         REQUIRE(task.status == JArrow::FireResult::KeepGoing);
0127 
0128         sut->ExchangeTask(task, worker.worker_id);
0129         REQUIRE(task.arrow != nullptr);
0130         REQUIRE(task.arrow->get_name() == "PhysicsEventSource"); // This will fail due to jana:nevents
0131         REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Running);
0132         REQUIRE(sut->GetPerf().event_count == 0);
0133 
0134         task.arrow->fire(task.input_event, task.outputs, task.output_count, task.status);
0135         REQUIRE(task.output_count == 1);
0136         REQUIRE(task.outputs[0].second == 0); // Failure => return to pool
0137         REQUIRE(task.status == JArrow::FireResult::Finished);
0138 
0139         sut->ExchangeTask(task, worker.worker_id);
0140         REQUIRE(task.arrow != nullptr);
0141         REQUIRE(task.arrow->get_name() == "PhysicsEventMap2");
0142         REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Draining);
0143         REQUIRE(sut->GetPerf().event_count == 0);
0144 
0145         task.arrow->fire(task.input_event, task.outputs, task.output_count, task.status);
0146         REQUIRE(task.output_count == 1);
0147         REQUIRE(task.status == JArrow::FireResult::KeepGoing);
0148 
0149         sut->ExchangeTask(task, worker.worker_id);
0150         REQUIRE(task.arrow != nullptr);
0151         REQUIRE(task.arrow->get_name() == "PhysicsEventTap");
0152         REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Draining);
0153         REQUIRE(sut->GetPerf().event_count == 0);
0154 
0155         task.arrow->fire(task.input_event, task.outputs, task.output_count, task.status);
0156         REQUIRE(task.output_count == 1);
0157         REQUIRE(task.status == JArrow::FireResult::KeepGoing);
0158 
0159         sut->ExchangeTask(task, worker.worker_id, true);
0160         REQUIRE(task.arrow == nullptr);
0161         REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Paused);
0162         REQUIRE(sut->GetPerf().event_count == 1);
0163 
0164         sut->RunSupervisor();
0165         sut->FinishTopology();
0166         REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Finished);
0167     }
0168 }
0169 
0170 
0171 TEST_CASE("JExecutionEngine_ScaleWorkers") {
0172     JApplication app;
0173     app.SetParameterValue("jana:nevents", 10);
0174     app.SetParameterValue("jana:loglevel", "debug");
0175     app.Add(new TestSource());
0176     app.Add(new TestProc());
0177     app.Initialize();
0178     auto sut = app.GetService<JExecutionEngine>();
0179 
0180     SECTION("SingleWorker") {
0181         REQUIRE(sut->GetPerf().thread_count == 0);
0182         sut->ScaleWorkers(1);
0183         REQUIRE(sut->GetPerf().thread_count == 1);
0184         sut->ScaleWorkers(0);
0185         REQUIRE(sut->GetPerf().thread_count == 0);
0186     }
0187 }
0188 
0189 
0190 TEST_CASE("JExecutionEngine_RunSingleEvent") {
0191     JApplication app;
0192     app.SetParameterValue("jana:nevents", 3);
0193     app.SetParameterValue("jana:loglevel", "debug");
0194     app.Add(new TestSource());
0195     app.Add(new TestProc());
0196     app.Initialize();
0197     auto sut = app.GetService<JExecutionEngine>();
0198 
0199     SECTION("SingleWorker") {
0200         REQUIRE(sut->GetPerf().thread_count == 0);
0201         sut->ScaleWorkers(1);
0202         sut->RunTopology();
0203 
0204         sut->RunSupervisor();
0205         REQUIRE(sut->GetPerf().thread_count == 1);
0206         REQUIRE(sut->GetPerf().runstatus == JExecutionEngine::RunStatus::Paused);
0207 
0208         sut->FinishTopology();
0209         REQUIRE(sut->GetPerf().runstatus == JExecutionEngine::RunStatus::Finished);
0210         REQUIRE(sut->GetPerf().event_count == 3);
0211 
0212         REQUIRE(sut->GetPerf().thread_count == 1);
0213         sut->ScaleWorkers(0);
0214         REQUIRE(sut->GetPerf().thread_count == 0);
0215     }
0216     
0217     SECTION("MultipleWorker") {
0218         REQUIRE(sut->GetPerf().thread_count == 0);
0219         sut->ScaleWorkers(4);
0220         REQUIRE(sut->GetPerf().thread_count == 4);
0221 
0222         sut->RunTopology();
0223 
0224         sut->RunSupervisor();
0225         REQUIRE(sut->GetPerf().thread_count == 4);
0226         REQUIRE(sut->GetPerf().runstatus == JExecutionEngine::RunStatus::Paused);
0227 
0228         sut->FinishTopology();
0229         REQUIRE(sut->GetPerf().runstatus == JExecutionEngine::RunStatus::Finished);
0230         REQUIRE(sut->GetPerf().event_count == 3);
0231 
0232         REQUIRE(sut->GetPerf().thread_count == 4);
0233         sut->ScaleWorkers(0);
0234         REQUIRE(sut->GetPerf().thread_count == 0);
0235     }
0236 }
0237 
0238 TEST_CASE("JExecutionEngine_ExternalPause") {
0239     JApplication app;
0240     app.SetParameterValue("jana:loglevel", "info");
0241     app.SetParameterValue("jana:max_inflight_events", 4);
0242     app.Add(new TestSource());
0243     app.Add(new TestProc());
0244     app.Initialize();
0245     auto sut = app.GetService<JExecutionEngine>();
0246 
0247     REQUIRE(sut->GetPerf().thread_count == 0);
0248     sut->ScaleWorkers(4);
0249     REQUIRE(sut->GetPerf().thread_count == 4);
0250 
0251     SECTION("PauseImmediately") {
0252         sut->RunTopology();
0253         sut->PauseTopology();
0254         sut->RunSupervisor();
0255 
0256         auto perf = sut->GetPerf();
0257         REQUIRE(perf.thread_count == 4);
0258         REQUIRE(perf.runstatus == JExecutionEngine::RunStatus::Paused);
0259 
0260         sut->FinishTopology();
0261 
0262         perf = sut->GetPerf();
0263         REQUIRE(perf.runstatus == JExecutionEngine::RunStatus::Finished);
0264         REQUIRE(perf.event_count == 0);
0265         REQUIRE(perf.thread_count == 4);
0266 
0267         sut->ScaleWorkers(0);
0268 
0269         perf = sut->GetPerf();
0270         REQUIRE(perf.thread_count == 0);
0271     }
0272 
0273     SECTION("RunForABit") {
0274         sut->RunTopology();
0275         std::thread t([&](){ 
0276             std::this_thread::sleep_for(std::chrono::seconds(3)); 
0277             sut->PauseTopology();
0278         });
0279 
0280         sut->RunSupervisor();
0281         t.join();
0282 
0283         auto perf = sut->GetPerf();
0284         REQUIRE(perf.thread_count == 4);
0285         REQUIRE(perf.runstatus == JExecutionEngine::RunStatus::Paused);
0286         REQUIRE(perf.event_count > 5);
0287 
0288         sut->ScaleWorkers(0);
0289         REQUIRE(sut->GetPerf().thread_count == 0);
0290     }
0291 }
0292 
0293 } // jana::engine::tests
0294 
0295 
0296