Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:17:42

0001 
0002 #define JANA2_TESTCASE
0003 
0004 #include "JANA/Engine/JExecutionEngine.h"
0005 #include "JANA/Utils/JBenchUtils.h"
0006 #include <catch.hpp>
0007 
0008 #include <JANA/JApplication.h>
0009 #include <JANA/JEventSource.h>
0010 #include <JANA/JEventProcessor.h>
0011 #include <chrono>
0012 #include <thread>
0013 
0014 namespace jana::engine::tests {
0015 
0016 struct TestData { int x; };
0017 struct TestSource : public JEventSource {
0018     JBenchUtils bench;
0019     TestSource() {
0020         SetCallbackStyle(CallbackStyle::ExpertMode);
0021     }
0022     Result Emit(JEvent& event) override {
0023         event.Insert<TestData>(new TestData {.x=(int) GetEmittedEventCount() * 2}, "src");
0024         bench.consume_cpu_ms(100);
0025         return Result::Success;
0026     }
0027 };
0028 struct TestProc : public JEventProcessor {
0029     JBenchUtils bench;
0030     TestProc() {
0031         SetCallbackStyle(CallbackStyle::ExpertMode);
0032     }
0033     void ProcessParallel(const JEvent& event) override {
0034         JBenchUtils local_bench;
0035         local_bench.consume_cpu_ms(300);
0036         auto src_x = event.Get<TestData>("src").at(0)->x;
0037         event.Insert<TestData>(new TestData {.x=src_x + 1}, "map");
0038     }
0039     void Process(const JEvent& event) override {
0040         bench.consume_cpu_ms(200);
0041         auto map_x = event.Get<TestData>("map").at(0)->x;
0042         REQUIRE(map_x == (int)event.GetEventNumber()*2 + 1);
0043     }
0044 };
0045 
0046 
0047 TEST_CASE("JExecutionEngine_StateMachine") {
0048 
0049     JApplication app;
0050     app.Add(new TestSource());
0051     app.Add(new TestProc());
0052     app.Initialize();
0053     auto sut = app.GetService<JExecutionEngine>();
0054 
0055     SECTION("ManualFinish") {
0056         REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Paused);
0057         sut->ScaleWorkers(0);
0058         sut->RunTopology();
0059         REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Running);
0060         sut->PauseTopology();
0061         REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Pausing);
0062         
0063         // Need to trigger the Pause since there are no workers to do so
0064         auto worker = sut->RegisterWorker();
0065         JExecutionEngine::Task task;
0066         sut->ExchangeTask(task, worker.worker_id, true);
0067 
0068         sut->RunSupervisor();
0069         REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Paused);
0070         sut->FinishTopology();
0071         REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Finished);
0072     }
0073 
0074     SECTION("AutoFinish") {
0075         REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Paused);
0076         sut->ScaleWorkers(0);
0077         sut->RunTopology();
0078         REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Running);
0079         sut->PauseTopology();
0080         REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Pausing);
0081 
0082         // Need to trigger the Pause since there are no workers to do so
0083         auto worker = sut->RegisterWorker();
0084         JExecutionEngine::Task task;
0085         sut->ExchangeTask(task, worker.worker_id, true);
0086 
0087         sut->RunSupervisor();
0088         sut->FinishTopology();
0089 
0090         REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Finished);
0091     }
0092 }
0093 
0094 
0095 TEST_CASE("JExecutionEngine_ExternalWorkers") {
0096     JApplication app;
0097     app.SetParameterValue("jana:nevents", 1);
0098     app.Add(new TestSource());
0099     app.Add(new TestProc());
0100     app.Initialize();
0101     auto sut = app.GetService<JExecutionEngine>();
0102 
0103 
0104     SECTION("SelfTermination") {
0105 
0106         auto worker = sut->RegisterWorker();
0107         REQUIRE(worker.worker_id == 0); // Not threadsafe doing this
0108 
0109         REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Paused);
0110         REQUIRE(sut->GetPerf().thread_count == 1);
0111         sut->RunTopology();
0112         REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Running);
0113 
0114         JExecutionEngine::Task task;
0115 
0116         sut->ExchangeTask(task, worker.worker_id);
0117         REQUIRE(task.arrow != nullptr);
0118         REQUIRE(task.arrow->get_name() == "PhysicsEventSource"); // Only task available at this point!
0119         REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Running);
0120         REQUIRE(sut->GetPerf().event_count == 0);
0121 
0122         task.arrow->fire(task.input_event, task.outputs, task.output_count, task.status);
0123         REQUIRE(task.output_count == 1);
0124         REQUIRE(task.status == JArrow::FireResult::KeepGoing);
0125 
0126         sut->ExchangeTask(task, worker.worker_id);
0127         REQUIRE(task.arrow != nullptr);
0128         REQUIRE(task.arrow->get_name() == "PhysicsEventSource"); // This will fail due to jana:nevents
0129         REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Running);
0130         REQUIRE(sut->GetPerf().event_count == 0);
0131 
0132         task.arrow->fire(task.input_event, task.outputs, task.output_count, task.status);
0133         REQUIRE(task.output_count == 1);
0134         REQUIRE(task.outputs[0].second == 0); // Failure => return to pool
0135         REQUIRE(task.status == JArrow::FireResult::Finished);
0136 
0137         sut->ExchangeTask(task, worker.worker_id);
0138         REQUIRE(task.arrow != nullptr);
0139         REQUIRE(task.arrow->get_name() == "PhysicsEventMap2");
0140         REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Draining);
0141         REQUIRE(sut->GetPerf().event_count == 0);
0142 
0143         task.arrow->fire(task.input_event, task.outputs, task.output_count, task.status);
0144         REQUIRE(task.output_count == 1);
0145         REQUIRE(task.status == JArrow::FireResult::KeepGoing);
0146 
0147         sut->ExchangeTask(task, worker.worker_id);
0148         REQUIRE(task.arrow != nullptr);
0149         REQUIRE(task.arrow->get_name() == "PhysicsEventTap");
0150         REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Draining);
0151         REQUIRE(sut->GetPerf().event_count == 0);
0152 
0153         task.arrow->fire(task.input_event, task.outputs, task.output_count, task.status);
0154         REQUIRE(task.output_count == 1);
0155         REQUIRE(task.status == JArrow::FireResult::KeepGoing);
0156 
0157         sut->ExchangeTask(task, worker.worker_id, true);
0158         REQUIRE(task.arrow == nullptr);
0159         REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Paused);
0160         REQUIRE(sut->GetPerf().event_count == 1);
0161 
0162         sut->RunSupervisor();
0163         sut->FinishTopology();
0164         REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Finished);
0165     }
0166 }
0167 
0168 
0169 TEST_CASE("JExecutionEngine_ScaleWorkers") {
0170     JApplication app;
0171     app.SetParameterValue("jana:nevents", 10);
0172     app.SetParameterValue("jana:loglevel", "debug");
0173     app.Add(new TestSource());
0174     app.Add(new TestProc());
0175     app.Initialize();
0176     auto sut = app.GetService<JExecutionEngine>();
0177 
0178     SECTION("SingleWorker") {
0179         REQUIRE(sut->GetPerf().thread_count == 0);
0180         sut->ScaleWorkers(1);
0181         REQUIRE(sut->GetPerf().thread_count == 1);
0182         sut->ScaleWorkers(0);
0183         REQUIRE(sut->GetPerf().thread_count == 0);
0184     }
0185 }
0186 
0187 
0188 TEST_CASE("JExecutionEngine_RunSingleEvent") {
0189     JApplication app;
0190     app.SetParameterValue("jana:nevents", 3);
0191     app.SetParameterValue("jana:loglevel", "debug");
0192     app.Add(new TestSource());
0193     app.Add(new TestProc());
0194     app.Initialize();
0195     auto sut = app.GetService<JExecutionEngine>();
0196 
0197     SECTION("SingleWorker") {
0198         REQUIRE(sut->GetPerf().thread_count == 0);
0199         sut->ScaleWorkers(1);
0200         sut->RunTopology();
0201 
0202         sut->RunSupervisor();
0203         REQUIRE(sut->GetPerf().thread_count == 1);
0204         REQUIRE(sut->GetPerf().runstatus == JExecutionEngine::RunStatus::Paused);
0205 
0206         sut->FinishTopology();
0207         REQUIRE(sut->GetPerf().runstatus == JExecutionEngine::RunStatus::Finished);
0208         REQUIRE(sut->GetPerf().event_count == 3);
0209 
0210         REQUIRE(sut->GetPerf().thread_count == 1);
0211         sut->ScaleWorkers(0);
0212         REQUIRE(sut->GetPerf().thread_count == 0);
0213     }
0214     
0215     SECTION("MultipleWorker") {
0216         REQUIRE(sut->GetPerf().thread_count == 0);
0217         sut->ScaleWorkers(4);
0218         REQUIRE(sut->GetPerf().thread_count == 4);
0219 
0220         sut->RunTopology();
0221 
0222         sut->RunSupervisor();
0223         REQUIRE(sut->GetPerf().thread_count == 4);
0224         REQUIRE(sut->GetPerf().runstatus == JExecutionEngine::RunStatus::Paused);
0225 
0226         sut->FinishTopology();
0227         REQUIRE(sut->GetPerf().runstatus == JExecutionEngine::RunStatus::Finished);
0228         REQUIRE(sut->GetPerf().event_count == 3);
0229 
0230         REQUIRE(sut->GetPerf().thread_count == 4);
0231         sut->ScaleWorkers(0);
0232         REQUIRE(sut->GetPerf().thread_count == 0);
0233     }
0234 }
0235 
0236 TEST_CASE("JExecutionEngine_ExternalPause") {
0237     JApplication app;
0238     app.SetParameterValue("jana:loglevel", "info");
0239     app.Add(new TestSource());
0240     app.Add(new TestProc());
0241     app.Initialize();
0242     auto sut = app.GetService<JExecutionEngine>();
0243 
0244     REQUIRE(sut->GetPerf().thread_count == 0);
0245     sut->ScaleWorkers(4);
0246     REQUIRE(sut->GetPerf().thread_count == 4);
0247 
0248     SECTION("PauseImmediately") {
0249         sut->RunTopology();
0250         sut->PauseTopology();
0251         sut->RunSupervisor();
0252 
0253         auto perf = sut->GetPerf();
0254         REQUIRE(perf.thread_count == 4);
0255         REQUIRE(perf.runstatus == JExecutionEngine::RunStatus::Paused);
0256 
0257         sut->FinishTopology();
0258 
0259         perf = sut->GetPerf();
0260         REQUIRE(perf.runstatus == JExecutionEngine::RunStatus::Finished);
0261         REQUIRE(perf.event_count == 0);
0262         REQUIRE(perf.thread_count == 4);
0263 
0264         sut->ScaleWorkers(0);
0265 
0266         perf = sut->GetPerf();
0267         REQUIRE(perf.thread_count == 0);
0268     }
0269 
0270     SECTION("RunForABit") {
0271         sut->RunTopology();
0272         std::thread t([&](){ 
0273             std::this_thread::sleep_for(std::chrono::seconds(3)); 
0274             sut->PauseTopology();
0275         });
0276 
0277         sut->RunSupervisor();
0278         t.join();
0279 
0280         auto perf = sut->GetPerf();
0281         REQUIRE(perf.thread_count == 4);
0282         REQUIRE(perf.runstatus == JExecutionEngine::RunStatus::Paused);
0283         REQUIRE(perf.event_count > 5);
0284 
0285         sut->ScaleWorkers(0);
0286         REQUIRE(sut->GetPerf().thread_count == 0);
0287     }
0288 }
0289 
0290 } // jana::engine::tests
0291 
0292 
0293