File indexing completed on 2025-01-18 10:17:41
0001
0002
0003
0004
0005 #include <JANA/JApplication.h>
0006 #include <JANA/JEventSource.h>
0007 #include <JANA/JEventProcessor.h>
0008 #include "JANA/Utils/JBenchUtils.h"
0009 #include "catch.hpp"
0010
0011 size_t global_resource = 0;
0012
0013
0014 struct BarrierSource : public JEventSource {
0015
0016 JBenchUtils bench;
0017
0018 BarrierSource() {
0019 SetCallbackStyle(CallbackStyle::ExpertMode);
0020 }
0021
0022 void Open() override {
0023 }
0024
0025 Result Emit(JEvent& event) override {
0026
0027 auto event_nr = GetEmittedEventCount() + 1;
0028 event.SetEventNumber(event_nr);
0029
0030 if (event_nr % 10 == 0) {
0031 LOG_INFO(GetLogger()) << "Emitting barrier event " << event_nr << LOG_END;
0032 event.SetSequential(true);
0033 }
0034 else {
0035 LOG_INFO(GetLogger()) << "Emitting non-barrier event " << event_nr << LOG_END;
0036 }
0037 bench.consume_cpu_ms(50, 0, false);
0038 return Result::Success;
0039 }
0040 };
0041
0042
0043 struct LegacyBarrierProcessor : public JEventProcessor {
0044
0045 JBenchUtils bench;
0046 std::mutex m_my_mutex;
0047
0048
0049 void Process(const std::shared_ptr<const JEvent>& event) override {
0050
0051 bench.consume_cpu_ms(200, 0, true);
0052
0053 std::lock_guard<std::mutex> lock(m_my_mutex);
0054
0055 if (event->GetSequential()) {
0056 LOG_INFO(GetLogger()) << "Processing barrier event = " << event->GetEventNumber() << ", writing global var = " << global_resource+1 << LOG_END;
0057 REQUIRE(global_resource == ((event->GetEventNumber() - 1) / 10));
0058 global_resource += 1;
0059 }
0060 else {
0061 LOG_INFO(GetLogger()) << "Processing non-barrier event = " << event->GetEventNumber() << ", reading global var = " << global_resource << LOG_END;
0062 REQUIRE(global_resource == (event->GetEventNumber() / 10));
0063 }
0064 bench.consume_cpu_ms(100, 0, true);
0065 }
0066 };
0067
0068
0069 struct BarrierProcessor : public JEventProcessor {
0070
0071 JBenchUtils bench;
0072
0073 BarrierProcessor() {
0074 SetCallbackStyle(CallbackStyle::ExpertMode);
0075 }
0076 void ProcessParallel(const JEvent&) override {
0077 bench.consume_cpu_ms(200, 0, false);
0078 }
0079
0080 void Process(const JEvent& event) override {
0081
0082 if (event.GetSequential()) {
0083 LOG_INFO(GetLogger()) << "Processing barrier event = " << event.GetEventNumber() << ", writing global var = " << global_resource+1 << LOG_END;
0084 REQUIRE(global_resource == ((event.GetEventNumber() - 1) / 10));
0085 global_resource += 1;
0086 }
0087 else {
0088 LOG_INFO(GetLogger()) << "Processing non-barrier event = " << event.GetEventNumber() << ", reading global var = " << global_resource << LOG_END;
0089 REQUIRE(global_resource == (event.GetEventNumber() / 10));
0090 }
0091 bench.consume_cpu_ms(100, 0, false);
0092 }
0093 };
0094
0095
0096 TEST_CASE("BarrierEventTests_SingleThread") {
0097 global_resource = 0;
0098 JApplication app;
0099 app.Add(new BarrierProcessor);
0100 app.Add(new BarrierSource);
0101 app.SetParameterValue("nthreads", 1);
0102 app.SetParameterValue("jana:nevents", 40);
0103
0104
0105 app.Run(true);
0106 };
0107
0108
0109 TEST_CASE("BarrierEventTests_Legacy_SingleThread") {
0110 global_resource = 0;
0111 JApplication app;
0112 app.Add(new LegacyBarrierProcessor);
0113 app.Add(new BarrierSource);
0114 app.SetParameterValue("nthreads", 1);
0115 app.SetParameterValue("jana:nevents", 40);
0116
0117
0118 app.Run(true);
0119 };
0120
0121
0122 TEST_CASE("BarrierEventTests") {
0123 global_resource = 0;
0124 JApplication app;
0125 app.Add(new BarrierProcessor);
0126 app.Add(new BarrierSource);
0127 app.SetParameterValue("nthreads", 4);
0128 app.SetParameterValue("jana:nevents", 40);
0129
0130
0131 app.Run(true);
0132 };
0133
0134
0135 TEST_CASE("BarrierEventTests_Legacy") {
0136 global_resource = 0;
0137 JApplication app;
0138 app.Add(new LegacyBarrierProcessor);
0139 app.Add(new BarrierSource);
0140 app.SetParameterValue("nthreads", 4);
0141 app.SetParameterValue("jana:nevents", 40);
0142
0143
0144 app.Run(true);
0145 };
0146
0147