File indexing completed on 2026-01-08 10:35:28
0001
0002 #define JANA2_TESTCASE
0003
0004 #include "JANA/Engine/JExecutionEngine.h"
0005 #include "JANA/Utils/JBenchUtils.h"
0006 #include <catch.hpp>
0007
0008 #include <JANA/JApplication.h>
0009 #include <JANA/JEventSource.h>
0010 #include <JANA/JEventProcessor.h>
0011 #include <chrono>
0012 #include <thread>
0013
0014 namespace jana::engine::tests {
0015
0016 struct TestData { int x; };
0017 struct TestSource : public JEventSource {
0018 JBenchUtils bench;
0019 TestSource() {
0020 SetCallbackStyle(CallbackStyle::ExpertMode);
0021 }
0022 Result Emit(JEvent& event) override {
0023 event.Insert<TestData>(new TestData {.x=(int) GetEmittedEventCount() * 2}, "src");
0024 bench.consume_cpu_ms(100);
0025 return Result::Success;
0026 }
0027 };
0028 struct TestProc : public JEventProcessor {
0029 JBenchUtils bench;
0030 TestProc() {
0031 SetCallbackStyle(CallbackStyle::ExpertMode);
0032 }
0033 void ProcessParallel(const JEvent& event) override {
0034 JBenchUtils local_bench;
0035 local_bench.consume_cpu_ms(300);
0036 auto src_x = event.Get<TestData>("src").at(0)->x;
0037 event.Insert<TestData>(new TestData {.x=src_x + 1}, "map");
0038 }
0039 void ProcessSequential(const JEvent& event) override {
0040 bench.consume_cpu_ms(200);
0041 auto map_x = event.Get<TestData>("map").at(0)->x;
0042 REQUIRE(map_x == (int)event.GetEventNumber()*2 + 1);
0043 }
0044 };
0045
0046
0047 TEST_CASE("JExecutionEngine_StateMachine") {
0048
0049 JApplication app;
0050 app.Add(new TestSource());
0051 app.Add(new TestProc());
0052 app.Initialize();
0053 auto sut = app.GetService<JExecutionEngine>();
0054
0055 SECTION("ManualFinish") {
0056 REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Paused);
0057 sut->ScaleWorkers(0);
0058 sut->RunTopology();
0059 REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Running);
0060 sut->PauseTopology();
0061 REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Pausing);
0062
0063
0064 auto worker = sut->RegisterWorker();
0065 JExecutionEngine::Task task;
0066 sut->ExchangeTask(task, worker.worker_id, true);
0067
0068 sut->RunSupervisor();
0069 REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Paused);
0070 sut->FinishTopology();
0071 REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Finished);
0072 }
0073
0074 SECTION("AutoFinish") {
0075 REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Paused);
0076 sut->ScaleWorkers(0);
0077 sut->RunTopology();
0078 REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Running);
0079 sut->PauseTopology();
0080 REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Pausing);
0081
0082
0083 auto worker = sut->RegisterWorker();
0084 JExecutionEngine::Task task;
0085 sut->ExchangeTask(task, worker.worker_id, true);
0086
0087 sut->RunSupervisor();
0088 sut->FinishTopology();
0089
0090 REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Finished);
0091 }
0092 }
0093
0094
0095 TEST_CASE("JExecutionEngine_ExternalWorkers") {
0096 JApplication app;
0097 app.SetParameterValue("jana:nevents", 1);
0098 app.SetParameterValue("jana:max_inflight_events", 2);
0099
0100 app.Add(new TestSource());
0101 app.Add(new TestProc());
0102 app.Initialize();
0103 auto sut = app.GetService<JExecutionEngine>();
0104
0105
0106 SECTION("SelfTermination") {
0107
0108 auto worker = sut->RegisterWorker();
0109 REQUIRE(worker.worker_id == 0);
0110
0111 REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Paused);
0112 REQUIRE(sut->GetPerf().thread_count == 1);
0113 sut->RunTopology();
0114 REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Running);
0115
0116 JExecutionEngine::Task task;
0117
0118 sut->ExchangeTask(task, worker.worker_id);
0119 REQUIRE(task.arrow != nullptr);
0120 REQUIRE(task.arrow->get_name() == "PhysicsEventSource");
0121 REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Running);
0122 REQUIRE(sut->GetPerf().event_count == 0);
0123
0124 task.arrow->fire(task.input_event, task.outputs, task.output_count, task.status);
0125 REQUIRE(task.output_count == 1);
0126 REQUIRE(task.status == JArrow::FireResult::KeepGoing);
0127
0128 sut->ExchangeTask(task, worker.worker_id);
0129 REQUIRE(task.arrow != nullptr);
0130 REQUIRE(task.arrow->get_name() == "PhysicsEventSource");
0131 REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Running);
0132 REQUIRE(sut->GetPerf().event_count == 0);
0133
0134 task.arrow->fire(task.input_event, task.outputs, task.output_count, task.status);
0135 REQUIRE(task.output_count == 1);
0136 REQUIRE(task.outputs[0].second == 0);
0137 REQUIRE(task.status == JArrow::FireResult::Finished);
0138
0139 sut->ExchangeTask(task, worker.worker_id);
0140 REQUIRE(task.arrow != nullptr);
0141 REQUIRE(task.arrow->get_name() == "PhysicsEventMap2");
0142 REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Draining);
0143 REQUIRE(sut->GetPerf().event_count == 0);
0144
0145 task.arrow->fire(task.input_event, task.outputs, task.output_count, task.status);
0146 REQUIRE(task.output_count == 1);
0147 REQUIRE(task.status == JArrow::FireResult::KeepGoing);
0148
0149 sut->ExchangeTask(task, worker.worker_id);
0150 REQUIRE(task.arrow != nullptr);
0151 REQUIRE(task.arrow->get_name() == "PhysicsEventTap");
0152 REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Draining);
0153 REQUIRE(sut->GetPerf().event_count == 0);
0154
0155 task.arrow->fire(task.input_event, task.outputs, task.output_count, task.status);
0156 REQUIRE(task.output_count == 1);
0157 REQUIRE(task.status == JArrow::FireResult::KeepGoing);
0158
0159 sut->ExchangeTask(task, worker.worker_id, true);
0160 REQUIRE(task.arrow == nullptr);
0161 REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Paused);
0162 REQUIRE(sut->GetPerf().event_count == 1);
0163
0164 sut->RunSupervisor();
0165 sut->FinishTopology();
0166 REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Finished);
0167 }
0168 }
0169
0170
0171 TEST_CASE("JExecutionEngine_ScaleWorkers") {
0172 JApplication app;
0173 app.SetParameterValue("jana:nevents", 10);
0174 app.SetParameterValue("jana:loglevel", "debug");
0175 app.Add(new TestSource());
0176 app.Add(new TestProc());
0177 app.Initialize();
0178 auto sut = app.GetService<JExecutionEngine>();
0179
0180 SECTION("SingleWorker") {
0181 REQUIRE(sut->GetPerf().thread_count == 0);
0182 sut->ScaleWorkers(1);
0183 REQUIRE(sut->GetPerf().thread_count == 1);
0184 sut->ScaleWorkers(0);
0185 REQUIRE(sut->GetPerf().thread_count == 0);
0186 }
0187 }
0188
0189
0190 TEST_CASE("JExecutionEngine_RunSingleEvent") {
0191 JApplication app;
0192 app.SetParameterValue("jana:nevents", 3);
0193 app.SetParameterValue("jana:loglevel", "debug");
0194 app.Add(new TestSource());
0195 app.Add(new TestProc());
0196 app.Initialize();
0197 auto sut = app.GetService<JExecutionEngine>();
0198
0199 SECTION("SingleWorker") {
0200 REQUIRE(sut->GetPerf().thread_count == 0);
0201 sut->ScaleWorkers(1);
0202 sut->RunTopology();
0203
0204 sut->RunSupervisor();
0205 REQUIRE(sut->GetPerf().thread_count == 1);
0206 REQUIRE(sut->GetPerf().runstatus == JExecutionEngine::RunStatus::Paused);
0207
0208 sut->FinishTopology();
0209 REQUIRE(sut->GetPerf().runstatus == JExecutionEngine::RunStatus::Finished);
0210 REQUIRE(sut->GetPerf().event_count == 3);
0211
0212 REQUIRE(sut->GetPerf().thread_count == 1);
0213 sut->ScaleWorkers(0);
0214 REQUIRE(sut->GetPerf().thread_count == 0);
0215 }
0216
0217 SECTION("MultipleWorker") {
0218 REQUIRE(sut->GetPerf().thread_count == 0);
0219 sut->ScaleWorkers(4);
0220 REQUIRE(sut->GetPerf().thread_count == 4);
0221
0222 sut->RunTopology();
0223
0224 sut->RunSupervisor();
0225 REQUIRE(sut->GetPerf().thread_count == 4);
0226 REQUIRE(sut->GetPerf().runstatus == JExecutionEngine::RunStatus::Paused);
0227
0228 sut->FinishTopology();
0229 REQUIRE(sut->GetPerf().runstatus == JExecutionEngine::RunStatus::Finished);
0230 REQUIRE(sut->GetPerf().event_count == 3);
0231
0232 REQUIRE(sut->GetPerf().thread_count == 4);
0233 sut->ScaleWorkers(0);
0234 REQUIRE(sut->GetPerf().thread_count == 0);
0235 }
0236 }
0237
0238 TEST_CASE("JExecutionEngine_ExternalPause") {
0239 JApplication app;
0240 app.SetParameterValue("jana:loglevel", "info");
0241 app.SetParameterValue("jana:max_inflight_events", 4);
0242 app.Add(new TestSource());
0243 app.Add(new TestProc());
0244 app.Initialize();
0245 auto sut = app.GetService<JExecutionEngine>();
0246
0247 REQUIRE(sut->GetPerf().thread_count == 0);
0248 sut->ScaleWorkers(4);
0249 REQUIRE(sut->GetPerf().thread_count == 4);
0250
0251 SECTION("PauseImmediately") {
0252 sut->RunTopology();
0253 sut->PauseTopology();
0254 sut->RunSupervisor();
0255
0256 auto perf = sut->GetPerf();
0257 REQUIRE(perf.thread_count == 4);
0258 REQUIRE(perf.runstatus == JExecutionEngine::RunStatus::Paused);
0259
0260 sut->FinishTopology();
0261
0262 perf = sut->GetPerf();
0263 REQUIRE(perf.runstatus == JExecutionEngine::RunStatus::Finished);
0264 REQUIRE(perf.event_count == 0);
0265 REQUIRE(perf.thread_count == 4);
0266
0267 sut->ScaleWorkers(0);
0268
0269 perf = sut->GetPerf();
0270 REQUIRE(perf.thread_count == 0);
0271 }
0272
0273 SECTION("RunForABit") {
0274 sut->RunTopology();
0275 std::thread t([&](){
0276 std::this_thread::sleep_for(std::chrono::seconds(3));
0277 sut->PauseTopology();
0278 });
0279
0280 sut->RunSupervisor();
0281 t.join();
0282
0283 auto perf = sut->GetPerf();
0284 REQUIRE(perf.thread_count == 4);
0285 REQUIRE(perf.runstatus == JExecutionEngine::RunStatus::Paused);
0286 REQUIRE(perf.event_count > 5);
0287
0288 sut->ScaleWorkers(0);
0289 REQUIRE(sut->GetPerf().thread_count == 0);
0290 }
0291 }
0292
0293 }
0294
0295
0296