File indexing completed on 2025-01-18 10:01:40
0001
0002
0003
0004 #pragma once
0005
0006 #include <JANA/Topology/JArrow.h>
0007 #include <JANA/Utils/JEventPool.h>
0008 #include <JANA/Utils/JEventLevel.h>
0009
0010 class JFoldArrow : public JArrow {
0011 private:
0012 using EventT = std::shared_ptr<JEvent>;
0013
0014
0015
0016
0017 JEventLevel m_parent_level;
0018 JEventLevel m_child_level;
0019
0020 PlaceRef<EventT> m_child_in;
0021 PlaceRef<EventT> m_child_out;
0022 PlaceRef<EventT> m_parent_out;
0023
0024 public:
0025 JFoldArrow(
0026 std::string name,
0027
0028 JEventLevel parent_level,
0029 JEventLevel child_level,
0030 JMailbox<EventT*>* child_in,
0031 JEventPool* child_out,
0032 JMailbox<EventT*>* parent_out)
0033
0034 : JArrow(std::move(name), false, false, false),
0035
0036 m_parent_level(parent_level),
0037 m_child_level(child_level),
0038 m_child_in(this, child_in, true, 1, 1),
0039 m_child_out(this, child_out, false, 1, 1),
0040 m_parent_out(this, parent_out, false, 1, 1)
0041 {
0042 }
0043
0044 JFoldArrow(
0045 std::string name,
0046
0047 JEventLevel parent_level,
0048 JEventLevel child_level,
0049 JMailbox<EventT*>* child_in,
0050 JMailbox<EventT*>* child_out,
0051 JMailbox<EventT*>* parent_out)
0052
0053 : JArrow(std::move(name), false, false, false),
0054
0055 m_parent_level(parent_level),
0056 m_child_level(child_level),
0057 m_child_in(this, child_in, true, 1, 1),
0058 m_child_out(this, child_out, false, 1, 1),
0059 m_parent_out(this, parent_out, false, 1, 1)
0060 {
0061 }
0062
0063 JFoldArrow(
0064 std::string name,
0065
0066 JEventLevel parent_level,
0067 JEventLevel child_level,
0068 JMailbox<EventT*>* child_in,
0069 JEventPool* child_out,
0070 JEventPool* parent_out)
0071
0072 : JArrow(std::move(name), false, false, false),
0073
0074 m_parent_level(parent_level),
0075 m_child_level(child_level),
0076 m_child_in(this, child_in, true, 1, 1),
0077 m_child_out(this, child_out, false, 1, 1),
0078 m_parent_out(this, parent_out, false, 1, 1)
0079 {
0080 }
0081
0082 void attach_child_in(JMailbox<EventT*>* child_in) {
0083 m_child_in.place_ref = child_in;
0084 m_child_in.is_queue = true;
0085 }
0086
0087 void attach_child_out(JMailbox<EventT*>* child_out) {
0088 m_child_out.place_ref = child_out;
0089 m_child_out.is_queue = true;
0090 }
0091
0092 void attach_child_out(JEventPool* child_out) {
0093 m_child_out.place_ref = child_out;
0094 m_child_out.is_queue = false;
0095 }
0096
0097 void attach_parent_out(JEventPool* parent_out) {
0098 m_parent_out.place_ref = parent_out;
0099 m_parent_out.is_queue = false;
0100 }
0101
0102
0103 void attach_parent_out(JMailbox<EventT*>* parent_out) {
0104 m_parent_out.place_ref = parent_out;
0105 m_parent_out.is_queue = true;
0106 }
0107
0108
0109 void initialize() final {
0110
0111
0112
0113
0114
0115
0116
0117
0118 LOG_INFO(m_logger) << "Initialized JEventFolder (trivial)" << LOG_END;
0119 }
0120
0121 void finalize() final {
0122
0123
0124
0125
0126
0127
0128 LOG_INFO(m_logger) << "Finalized JEventFolder (trivial)" << LOG_END;
0129 }
0130
0131 bool try_pull_all(Data<EventT>& ci, Data<EventT>& co, Data<EventT>& po) {
0132 bool success;
0133 success = m_child_in.pull(ci);
0134 if (! success) {
0135 return false;
0136 }
0137 success = m_child_out.pull(co);
0138 if (! success) {
0139 return false;
0140 }
0141 success = m_parent_out.pull(po);
0142 if (! success) {
0143 return false;
0144 }
0145 return true;
0146 }
0147
0148 size_t push_all(Data<EventT>& ci, Data<EventT>& co, Data<EventT>& po) {
0149 size_t message_count = co.item_count;
0150 m_child_in.push(ci);
0151 m_child_out.push(co);
0152 m_parent_out.push(po);
0153 return message_count;
0154 }
0155
0156 void execute(JArrowMetrics& metrics, size_t location_id) final {
0157
0158 auto start_total_time = std::chrono::steady_clock::now();
0159
0160 Data<EventT> child_in_data {location_id};
0161 Data<EventT> child_out_data {location_id};
0162 Data<EventT> parent_out_data {location_id};
0163
0164 bool success = try_pull_all(child_in_data, child_out_data, parent_out_data);
0165 if (success) {
0166
0167 auto start_processing_time = std::chrono::steady_clock::now();
0168 auto child = child_in_data.items[0];
0169 child_in_data.items[0] = nullptr;
0170 child_in_data.item_count = 0;
0171 if (child->get()->GetLevel() != m_child_level) {
0172 throw JException("JFoldArrow received a child with the wrong event level");
0173 }
0174
0175
0176 auto* parent = child->get()->ReleaseParent(m_parent_level);
0177
0178
0179 child_out_data.items[0] = child;
0180 child_out_data.item_count = 1;
0181
0182
0183 if (parent != nullptr) {
0184 parent_out_data.items[0] = parent;
0185 parent_out_data.item_count = 1;
0186 }
0187 else {
0188 parent_out_data.items[0] = nullptr;
0189 parent_out_data.item_count = 0;
0190 }
0191
0192 auto end_processing_time = std::chrono::steady_clock::now();
0193 size_t events_processed = push_all(child_in_data, child_out_data, parent_out_data);
0194
0195 auto end_total_time = std::chrono::steady_clock::now();
0196 auto latency = (end_processing_time - start_processing_time);
0197 auto overhead = (end_total_time - start_total_time) - latency;
0198
0199 metrics.update(JArrowMetrics::Status::KeepGoing, events_processed, 1, latency, overhead);
0200 return;
0201 }
0202 else {
0203 auto end_total_time = std::chrono::steady_clock::now();
0204 metrics.update(JArrowMetrics::Status::ComeBackLater, 0, 1, std::chrono::milliseconds(0), end_total_time - start_total_time);
0205 return;
0206 }
0207 }
0208
0209 };
0210
0211