File indexing completed on 2025-01-18 10:17:42
0001
0002 #define JANA2_TESTCASE
0003
0004 #include "JANA/Engine/JExecutionEngine.h"
0005 #include "JANA/Utils/JBenchUtils.h"
0006 #include <catch.hpp>
0007
0008 #include <JANA/JApplication.h>
0009 #include <JANA/JEventSource.h>
0010 #include <JANA/JEventProcessor.h>
0011 #include <chrono>
0012 #include <thread>
0013
0014 namespace jana::engine::tests {
0015
0016 struct TestData { int x; };
0017 struct TestSource : public JEventSource {
0018 JBenchUtils bench;
0019 TestSource() {
0020 SetCallbackStyle(CallbackStyle::ExpertMode);
0021 }
0022 Result Emit(JEvent& event) override {
0023 event.Insert<TestData>(new TestData {.x=(int) GetEmittedEventCount() * 2}, "src");
0024 bench.consume_cpu_ms(100);
0025 return Result::Success;
0026 }
0027 };
0028 struct TestProc : public JEventProcessor {
0029 JBenchUtils bench;
0030 TestProc() {
0031 SetCallbackStyle(CallbackStyle::ExpertMode);
0032 }
0033 void ProcessParallel(const JEvent& event) override {
0034 JBenchUtils local_bench;
0035 local_bench.consume_cpu_ms(300);
0036 auto src_x = event.Get<TestData>("src").at(0)->x;
0037 event.Insert<TestData>(new TestData {.x=src_x + 1}, "map");
0038 }
0039 void Process(const JEvent& event) override {
0040 bench.consume_cpu_ms(200);
0041 auto map_x = event.Get<TestData>("map").at(0)->x;
0042 REQUIRE(map_x == (int)event.GetEventNumber()*2 + 1);
0043 }
0044 };
0045
0046
0047 TEST_CASE("JExecutionEngine_StateMachine") {
0048
0049 JApplication app;
0050 app.Add(new TestSource());
0051 app.Add(new TestProc());
0052 app.Initialize();
0053 auto sut = app.GetService<JExecutionEngine>();
0054
0055 SECTION("ManualFinish") {
0056 REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Paused);
0057 sut->ScaleWorkers(0);
0058 sut->RunTopology();
0059 REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Running);
0060 sut->PauseTopology();
0061 REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Pausing);
0062
0063
0064 auto worker = sut->RegisterWorker();
0065 JExecutionEngine::Task task;
0066 sut->ExchangeTask(task, worker.worker_id, true);
0067
0068 sut->RunSupervisor();
0069 REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Paused);
0070 sut->FinishTopology();
0071 REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Finished);
0072 }
0073
0074 SECTION("AutoFinish") {
0075 REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Paused);
0076 sut->ScaleWorkers(0);
0077 sut->RunTopology();
0078 REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Running);
0079 sut->PauseTopology();
0080 REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Pausing);
0081
0082
0083 auto worker = sut->RegisterWorker();
0084 JExecutionEngine::Task task;
0085 sut->ExchangeTask(task, worker.worker_id, true);
0086
0087 sut->RunSupervisor();
0088 sut->FinishTopology();
0089
0090 REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Finished);
0091 }
0092 }
0093
0094
0095 TEST_CASE("JExecutionEngine_ExternalWorkers") {
0096 JApplication app;
0097 app.SetParameterValue("jana:nevents", 1);
0098 app.Add(new TestSource());
0099 app.Add(new TestProc());
0100 app.Initialize();
0101 auto sut = app.GetService<JExecutionEngine>();
0102
0103
0104 SECTION("SelfTermination") {
0105
0106 auto worker = sut->RegisterWorker();
0107 REQUIRE(worker.worker_id == 0);
0108
0109 REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Paused);
0110 REQUIRE(sut->GetPerf().thread_count == 1);
0111 sut->RunTopology();
0112 REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Running);
0113
0114 JExecutionEngine::Task task;
0115
0116 sut->ExchangeTask(task, worker.worker_id);
0117 REQUIRE(task.arrow != nullptr);
0118 REQUIRE(task.arrow->get_name() == "PhysicsEventSource");
0119 REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Running);
0120 REQUIRE(sut->GetPerf().event_count == 0);
0121
0122 task.arrow->fire(task.input_event, task.outputs, task.output_count, task.status);
0123 REQUIRE(task.output_count == 1);
0124 REQUIRE(task.status == JArrow::FireResult::KeepGoing);
0125
0126 sut->ExchangeTask(task, worker.worker_id);
0127 REQUIRE(task.arrow != nullptr);
0128 REQUIRE(task.arrow->get_name() == "PhysicsEventSource");
0129 REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Running);
0130 REQUIRE(sut->GetPerf().event_count == 0);
0131
0132 task.arrow->fire(task.input_event, task.outputs, task.output_count, task.status);
0133 REQUIRE(task.output_count == 1);
0134 REQUIRE(task.outputs[0].second == 0);
0135 REQUIRE(task.status == JArrow::FireResult::Finished);
0136
0137 sut->ExchangeTask(task, worker.worker_id);
0138 REQUIRE(task.arrow != nullptr);
0139 REQUIRE(task.arrow->get_name() == "PhysicsEventMap2");
0140 REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Draining);
0141 REQUIRE(sut->GetPerf().event_count == 0);
0142
0143 task.arrow->fire(task.input_event, task.outputs, task.output_count, task.status);
0144 REQUIRE(task.output_count == 1);
0145 REQUIRE(task.status == JArrow::FireResult::KeepGoing);
0146
0147 sut->ExchangeTask(task, worker.worker_id);
0148 REQUIRE(task.arrow != nullptr);
0149 REQUIRE(task.arrow->get_name() == "PhysicsEventTap");
0150 REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Draining);
0151 REQUIRE(sut->GetPerf().event_count == 0);
0152
0153 task.arrow->fire(task.input_event, task.outputs, task.output_count, task.status);
0154 REQUIRE(task.output_count == 1);
0155 REQUIRE(task.status == JArrow::FireResult::KeepGoing);
0156
0157 sut->ExchangeTask(task, worker.worker_id, true);
0158 REQUIRE(task.arrow == nullptr);
0159 REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Paused);
0160 REQUIRE(sut->GetPerf().event_count == 1);
0161
0162 sut->RunSupervisor();
0163 sut->FinishTopology();
0164 REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Finished);
0165 }
0166 }
0167
0168
0169 TEST_CASE("JExecutionEngine_ScaleWorkers") {
0170 JApplication app;
0171 app.SetParameterValue("jana:nevents", 10);
0172 app.SetParameterValue("jana:loglevel", "debug");
0173 app.Add(new TestSource());
0174 app.Add(new TestProc());
0175 app.Initialize();
0176 auto sut = app.GetService<JExecutionEngine>();
0177
0178 SECTION("SingleWorker") {
0179 REQUIRE(sut->GetPerf().thread_count == 0);
0180 sut->ScaleWorkers(1);
0181 REQUIRE(sut->GetPerf().thread_count == 1);
0182 sut->ScaleWorkers(0);
0183 REQUIRE(sut->GetPerf().thread_count == 0);
0184 }
0185 }
0186
0187
0188 TEST_CASE("JExecutionEngine_RunSingleEvent") {
0189 JApplication app;
0190 app.SetParameterValue("jana:nevents", 3);
0191 app.SetParameterValue("jana:loglevel", "debug");
0192 app.Add(new TestSource());
0193 app.Add(new TestProc());
0194 app.Initialize();
0195 auto sut = app.GetService<JExecutionEngine>();
0196
0197 SECTION("SingleWorker") {
0198 REQUIRE(sut->GetPerf().thread_count == 0);
0199 sut->ScaleWorkers(1);
0200 sut->RunTopology();
0201
0202 sut->RunSupervisor();
0203 REQUIRE(sut->GetPerf().thread_count == 1);
0204 REQUIRE(sut->GetPerf().runstatus == JExecutionEngine::RunStatus::Paused);
0205
0206 sut->FinishTopology();
0207 REQUIRE(sut->GetPerf().runstatus == JExecutionEngine::RunStatus::Finished);
0208 REQUIRE(sut->GetPerf().event_count == 3);
0209
0210 REQUIRE(sut->GetPerf().thread_count == 1);
0211 sut->ScaleWorkers(0);
0212 REQUIRE(sut->GetPerf().thread_count == 0);
0213 }
0214
0215 SECTION("MultipleWorker") {
0216 REQUIRE(sut->GetPerf().thread_count == 0);
0217 sut->ScaleWorkers(4);
0218 REQUIRE(sut->GetPerf().thread_count == 4);
0219
0220 sut->RunTopology();
0221
0222 sut->RunSupervisor();
0223 REQUIRE(sut->GetPerf().thread_count == 4);
0224 REQUIRE(sut->GetPerf().runstatus == JExecutionEngine::RunStatus::Paused);
0225
0226 sut->FinishTopology();
0227 REQUIRE(sut->GetPerf().runstatus == JExecutionEngine::RunStatus::Finished);
0228 REQUIRE(sut->GetPerf().event_count == 3);
0229
0230 REQUIRE(sut->GetPerf().thread_count == 4);
0231 sut->ScaleWorkers(0);
0232 REQUIRE(sut->GetPerf().thread_count == 0);
0233 }
0234 }
0235
0236 TEST_CASE("JExecutionEngine_ExternalPause") {
0237 JApplication app;
0238 app.SetParameterValue("jana:loglevel", "info");
0239 app.Add(new TestSource());
0240 app.Add(new TestProc());
0241 app.Initialize();
0242 auto sut = app.GetService<JExecutionEngine>();
0243
0244 REQUIRE(sut->GetPerf().thread_count == 0);
0245 sut->ScaleWorkers(4);
0246 REQUIRE(sut->GetPerf().thread_count == 4);
0247
0248 SECTION("PauseImmediately") {
0249 sut->RunTopology();
0250 sut->PauseTopology();
0251 sut->RunSupervisor();
0252
0253 auto perf = sut->GetPerf();
0254 REQUIRE(perf.thread_count == 4);
0255 REQUIRE(perf.runstatus == JExecutionEngine::RunStatus::Paused);
0256
0257 sut->FinishTopology();
0258
0259 perf = sut->GetPerf();
0260 REQUIRE(perf.runstatus == JExecutionEngine::RunStatus::Finished);
0261 REQUIRE(perf.event_count == 0);
0262 REQUIRE(perf.thread_count == 4);
0263
0264 sut->ScaleWorkers(0);
0265
0266 perf = sut->GetPerf();
0267 REQUIRE(perf.thread_count == 0);
0268 }
0269
0270 SECTION("RunForABit") {
0271 sut->RunTopology();
0272 std::thread t([&](){
0273 std::this_thread::sleep_for(std::chrono::seconds(3));
0274 sut->PauseTopology();
0275 });
0276
0277 sut->RunSupervisor();
0278 t.join();
0279
0280 auto perf = sut->GetPerf();
0281 REQUIRE(perf.thread_count == 4);
0282 REQUIRE(perf.runstatus == JExecutionEngine::RunStatus::Paused);
0283 REQUIRE(perf.event_count > 5);
0284
0285 sut->ScaleWorkers(0);
0286 REQUIRE(sut->GetPerf().thread_count == 0);
0287 }
0288 }
0289
0290 }
0291
0292
0293