File indexing completed on 2025-01-18 10:17:20
0001
0002
0003
0004
0005 #include <JANA/JApplication.h>
0006 #include <JANA/JObject.h>
0007 #include <JANA/JEventSource.h>
0008 #include <JANA/JEventProcessor.h>
0009
0010 #include <JANA/Topology/JEventSourceArrow.h>
0011 #include <JANA/Topology/JEventMapArrow.h>
0012 #include <JANA/Topology/JSubeventArrow.h>
0013 #include <JANA/Topology/JTopologyBuilder.h>
0014
0015
0016 struct MyInput : public JObject {
0017 int x;
0018 float y;
0019 int evt = 0;
0020 int sub = 0;
0021 MyInput(int x, float y, int evt, int sub) : x(x), y(y), evt(evt), sub(sub) {}
0022 };
0023
0024 struct MyOutput : public JObject {
0025 float z;
0026 int evt = 0;
0027 int sub = 0;
0028 explicit MyOutput(float z, int evt, int sub) : z(z), evt(evt), sub(sub) {}
0029 };
0030
0031 struct MyProcessor : public JSubeventProcessor<MyInput, MyOutput> {
0032 MyProcessor() {
0033 inputTag = "";
0034 outputTag = "subeventted";
0035 }
0036 MyOutput* ProcessSubevent(MyInput* input) override {
0037 LOG << "Processing subevent " << input->evt << ":" << input->sub << LOG_END;
0038 return new MyOutput(input->y + (float) input->x, input->evt, input->sub);
0039 }
0040 };
0041
0042
0043
0044 struct SimpleSource : public JEventSource {
0045 SimpleSource() : JEventSource() {
0046 SetCallbackStyle(CallbackStyle::ExpertMode);
0047 };
0048 Result Emit(JEvent& event) override {
0049 auto evt = event.GetEventNumber();
0050 std::vector<MyInput*> inputs;
0051 inputs.push_back(new MyInput(22,3.6,evt,0));
0052 inputs.push_back(new MyInput(23,3.5,evt,1));
0053 inputs.push_back(new MyInput(24,3.4,evt,2));
0054 inputs.push_back(new MyInput(25,3.3,evt,3));
0055 inputs.push_back(new MyInput(26,3.2,evt,4));
0056 event.Insert(inputs);
0057 LOG << "Emitting event " << event.GetEventNumber() << LOG_END;
0058 return Result::Success;
0059 }
0060 };
0061
0062 struct SimpleProcessor : public JEventProcessor {
0063 SimpleProcessor() {
0064 SetCallbackStyle(CallbackStyle::ExpertMode);
0065 }
0066 void Process(const JEvent& event) {
0067
0068 std::lock_guard<std::mutex> guard(m_mutex);
0069
0070 auto outputs = event.Get<MyOutput>();
0071
0072
0073
0074
0075
0076 LOG << " Contents of event " << event.GetEventNumber() << LOG_END;
0077 for (auto output : outputs) {
0078 LOG << " " << output->evt << ":" << output->sub << " " << output->z << LOG_END;
0079 }
0080 LOG << " DONE with contents of event " << event.GetEventNumber() << LOG_END;
0081 }
0082 };
0083
0084
0085
0086 int main() {
0087
0088 JApplication app;
0089 app.SetParameterValue("jana:loglevel", "info");
0090 app.SetTimeoutEnabled(false);
0091 app.SetTicker(false);
0092
0093 auto source = new SimpleSource();
0094 source->SetNEvents(10);
0095
0096 MyProcessor processor;
0097
0098 auto topology = app.GetService<JTopologyBuilder>();
0099 topology->set_configure_fn([&](JTopologyBuilder& builder) {
0100
0101 JMailbox<JEvent*> events_in;
0102 JMailbox<JEvent*> events_out;
0103 JMailbox<SubeventWrapper<MyInput>> subevents_in;
0104 JMailbox<SubeventWrapper<MyOutput>> subevents_out;
0105
0106 auto split_arrow = new JSplitArrow<MyInput, MyOutput>("split", &processor, &events_in, &subevents_in);
0107 auto subprocess_arrow = new JSubeventArrow<MyInput, MyOutput>("subprocess", &processor, &subevents_in, &subevents_out);
0108 auto merge_arrow = new JMergeArrow<MyInput, MyOutput>("merge", &processor, &subevents_out, &events_out);
0109
0110 auto source_arrow = new JEventSourceArrow("simpleSource", {source});
0111 source_arrow->attach(topology->event_pool, 0);
0112 source_arrow->attach(&events_in, 1);
0113
0114 auto proc_arrow = new JEventMapArrow("simpleProcessor");
0115 proc_arrow->attach(&events_out, 0);
0116 proc_arrow->attach(topology->event_pool, 1);
0117 proc_arrow->add_processor(new SimpleProcessor);
0118
0119 builder.arrows.push_back(source_arrow);
0120 builder.arrows.push_back(split_arrow);
0121 builder.arrows.push_back(subprocess_arrow);
0122 builder.arrows.push_back(merge_arrow);
0123 builder.arrows.push_back(proc_arrow);
0124
0125 source_arrow->attach(split_arrow);
0126 split_arrow->attach(subprocess_arrow);
0127 subprocess_arrow->attach(merge_arrow);
0128 merge_arrow->attach(proc_arrow);
0129 });
0130
0131
0132 app.Run(true);
0133
0134 }
0135