File indexing completed on 2025-01-18 10:17:43
0001
0002
0003
0004
0005 #pragma once
0006
0007 #include <JANA/Topology/JArrow.h>
0008 #include <JANA/JEvent.h>
0009
0010 struct EventData {
0011 int x = 0;
0012 double y = 0.0;
0013 double z = 0.0;
0014 };
0015
0016 struct RandIntArrow : public JArrow {
0017
0018 size_t emit_limit = 20;
0019 size_t emit_count = 0;
0020 int emit_sum = 0;
0021
0022 RandIntArrow(std::string name, JEventPool* pool, JEventQueue* output_queue) {
0023 set_name(name);
0024 set_is_source(true);
0025 create_ports(1, 1);
0026 attach(pool, 0);
0027 attach(output_queue, 1);
0028 }
0029
0030 void fire(JEvent* event, OutputData& outputs, size_t& output_count, JArrow::FireResult& status) {
0031
0032 if (emit_count >= emit_limit) {
0033 outputs[0] = {event, 0};
0034 output_count = 1;
0035 status = JArrow::FireResult::Finished;
0036 return;
0037 }
0038
0039 auto data = new EventData {7};
0040 event->Insert(data, "first");
0041
0042 emit_sum += data->x;
0043 emit_count += 1;
0044
0045 outputs[0] = {event, 1};
0046 output_count = 1;
0047 status = (emit_count == emit_limit) ? JArrow::FireResult::Finished : JArrow::FireResult::KeepGoing;
0048
0049
0050 LOG_DEBUG(JArrow::m_logger) << "RandIntSource emitted event " << emit_count << " with value " << data->x << LOG_END;
0051 }
0052 };
0053
0054
0055 struct MultByTwoArrow : public JArrow {
0056
0057 MultByTwoArrow(std::string name, JEventQueue* input_queue, JEventQueue* output_queue) {
0058 set_name(name);
0059 set_is_parallel(true);
0060 create_ports(1, 1);
0061 attach(input_queue, 0);
0062 attach(output_queue, 1);
0063 }
0064
0065 void fire(JEvent* event, OutputData& outputs, size_t& output_count, JArrow::FireResult& status) {
0066 auto prev = event->Get<EventData>("first");
0067 auto x = prev.at(0)->x;
0068 auto next = new EventData { .x=x, .y=x*2.0 };
0069 event->Insert(next, "second");
0070
0071 outputs[0] = {event, 1};
0072 output_count = 1;
0073 status = JArrow::FireResult::KeepGoing;
0074 }
0075 };
0076
0077 struct SubOneArrow : public JArrow {
0078
0079 SubOneArrow(std::string name, JEventQueue* input_queue, JEventQueue* output_queue) {
0080 set_name(name);
0081 set_is_parallel(true);
0082 create_ports(1, 1);
0083 attach(input_queue, 0);
0084 attach(output_queue, 1);
0085 }
0086
0087 void fire(JEvent* event, OutputData& outputs, size_t& output_count, JArrow::FireResult& status) {
0088 auto prev = event->Get<EventData>("second");
0089 auto x = prev.at(0)->x;
0090 auto y = prev.at(0)->y;
0091 auto z = y - 1;
0092 auto next = new EventData { .x=x, .y=y, .z=z };
0093 event->Insert(next, "third");
0094
0095 outputs[0] = {event, 1};
0096 output_count = 1;
0097 status = JArrow::FireResult::KeepGoing;
0098 }
0099 };
0100
0101 struct SumArrow : public JArrow {
0102
0103 double sum = 0;
0104
0105 SumArrow(std::string name, JEventQueue* input_queue, JEventPool* pool) {
0106 set_name(name);
0107 set_is_sink(true);
0108 create_ports(1, 1);
0109 attach(input_queue, 0);
0110 attach(pool, 1);
0111 }
0112
0113 void fire(JEvent* event, OutputData& outputs, size_t& output_count, JArrow::FireResult& status) {
0114 auto prev = event->Get<EventData>("third");
0115 auto z = prev.at(0)->z;
0116 sum += z;
0117
0118 outputs[0] = {event, 1};
0119 output_count = 1;
0120 status = JArrow::FireResult::KeepGoing;
0121 }
0122 };
0123
0124