File indexing completed on 2025-01-30 10:12:06
0001
0002
0003
0004
0005 #pragma once
0006 #include "JMailbox.h"
0007 #include "JArrow.h"
0008 #include <JANA/JEvent.h>
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026 template <typename InputT, typename OutputT>
0027 struct JSubeventProcessor {
0028
0029 std::string inputTag;
0030 std::string outputTag;
0031
0032 virtual OutputT* ProcessSubevent(InputT* input) = 0;
0033
0034 };
0035
0036 template <typename SubeventT>
0037 struct SubeventWrapper {
0038
0039 std::shared_ptr<JEvent>* parent;
0040 SubeventT* data;
0041 size_t id;
0042 size_t total;
0043
0044 SubeventWrapper(std::shared_ptr<JEvent>* parent, SubeventT* data, size_t id, size_t total)
0045 : parent(std::move(parent))
0046 , data(data)
0047 , id(id)
0048 , total(total) {}
0049 };
0050
0051
0052 template <typename InputT, typename OutputT>
0053 class JSubeventArrow : public JArrow {
0054 JSubeventProcessor<InputT, OutputT>* m_processor;
0055 JMailbox<SubeventWrapper<InputT>>* m_inbox;
0056 JMailbox<SubeventWrapper<OutputT>>* m_outbox;
0057 public:
0058 JSubeventArrow(std::string name,
0059 JSubeventProcessor<InputT,OutputT>* processor,
0060 JMailbox<SubeventWrapper<InputT>>* inbox,
0061 JMailbox<SubeventWrapper<OutputT>>* outbox)
0062 : JArrow(name, true, false, false), m_processor(processor), m_inbox(inbox), m_outbox(outbox) {
0063 }
0064
0065 size_t get_pending() final { return m_inbox->size(); };
0066 size_t get_threshold() final { return m_inbox->get_threshold(); };
0067 void set_threshold(size_t threshold) final { m_inbox->set_threshold(threshold);};
0068
0069 void execute(JArrowMetrics&, size_t location_id) override;
0070 };
0071
0072 template <typename InputT, typename OutputT>
0073 class JSplitArrow : public JArrow {
0074 JSubeventProcessor<InputT, OutputT>* m_processor;
0075 JMailbox<std::shared_ptr<JEvent>*>* m_inbox;
0076 JMailbox<SubeventWrapper<InputT>>* m_outbox;
0077 public:
0078 JSplitArrow(std::string name,
0079 JSubeventProcessor<InputT,OutputT>* processor,
0080 JMailbox<std::shared_ptr<JEvent>*>* inbox,
0081 JMailbox<SubeventWrapper<InputT>>* outbox)
0082 : JArrow(name, true, false, false), m_processor(processor), m_inbox(inbox), m_outbox(outbox) {
0083 }
0084
0085 size_t get_pending() final { return m_inbox->size(); };
0086 size_t get_threshold() final { return m_inbox->get_threshold(); };
0087 void set_threshold(size_t threshold) final { m_inbox->set_threshold(threshold);};
0088
0089 void execute(JArrowMetrics&, size_t location_id) override;
0090 };
0091
0092 template <typename InputT, typename OutputT>
0093 class JMergeArrow : public JArrow {
0094 JSubeventProcessor<InputT,OutputT>* m_processor;
0095 JMailbox<SubeventWrapper<OutputT>>* m_inbox;
0096 JMailbox<std::shared_ptr<JEvent>*>* m_outbox;
0097 std::map<std::shared_ptr<JEvent>*, size_t> m_in_progress;
0098 public:
0099 JMergeArrow(std::string name,
0100 JSubeventProcessor<InputT,OutputT>* processor,
0101 JMailbox<SubeventWrapper<OutputT>>* inbox,
0102 JMailbox<std::shared_ptr<JEvent>*>* outbox)
0103 : JArrow(name, false, false, false), m_processor(processor), m_inbox(inbox), m_outbox(outbox) {
0104 }
0105
0106 size_t get_pending() final { return m_inbox->size(); };
0107 size_t get_threshold() final { return m_inbox->get_threshold(); };
0108 void set_threshold(size_t threshold) final { m_inbox->set_threshold(threshold);};
0109 void execute(JArrowMetrics&, size_t location_id) override;
0110 };
0111
0112
0113
0114 template <typename InputT, typename OutputT>
0115 void JSplitArrow<InputT, OutputT>::execute(JArrowMetrics& result, size_t location_id) {
0116 using InQueue = JMailbox<std::shared_ptr<JEvent>*>;
0117 using OutQueue = JMailbox<SubeventWrapper<InputT>>;
0118 auto start_total_time = std::chrono::steady_clock::now();
0119
0120 std::shared_ptr<JEvent>* event = nullptr;
0121 bool success;
0122 size_t reserved_size = m_outbox->reserve(get_chunksize());
0123 size_t actual_size = reserved_size;
0124
0125
0126 auto in_status = m_inbox->pop(event, success, location_id);
0127 auto start_latency_time = std::chrono::steady_clock::now();
0128
0129 std::vector<SubeventWrapper<InputT>> wrapped;
0130 if (success) {
0131
0132
0133 std::vector<const InputT*> originals = (*event)->Get<InputT>(m_processor->inputTag);
0134 size_t i = 1;
0135 actual_size = originals.size();
0136
0137 for (const InputT* original : originals) {
0138 InputT* unconsted = const_cast<InputT*>(original);
0139 wrapped.push_back(SubeventWrapper<InputT>(event, unconsted, i++, actual_size));
0140 }
0141 }
0142 auto end_latency_time = std::chrono::steady_clock::now();
0143 auto out_status = OutQueue::Status::Ready;
0144
0145 size_t output_size = wrapped.size();
0146 if (success) {
0147 assert(m_outbox != nullptr);
0148 out_status = m_outbox->push(wrapped, reserved_size, location_id);
0149 }
0150 auto end_queue_time = std::chrono::steady_clock::now();
0151
0152 JArrowMetrics::Status status;
0153 if (in_status == InQueue::Status::Ready && out_status == OutQueue::Status::Ready) {
0154 status = JArrowMetrics::Status::KeepGoing;
0155 }
0156 else {
0157 status = JArrowMetrics::Status::ComeBackLater;
0158 }
0159 auto latency = (end_latency_time - start_latency_time);
0160 auto overhead = (end_queue_time - start_total_time) - latency;
0161 result.update(status, output_size, 1, latency, overhead);
0162
0163 }
0164 template <typename InputT, typename OutputT>
0165 void JSubeventArrow<InputT, OutputT>::execute(JArrowMetrics& result, size_t location_id) {
0166 using SubeventQueue = JMailbox<SubeventWrapper<InputT>>;
0167 auto start_total_time = std::chrono::steady_clock::now();
0168
0169
0170 std::vector<SubeventWrapper<InputT>> inputs;
0171 size_t downstream_accepts = m_outbox->reserve(get_chunksize(), location_id);
0172 auto in_status = m_inbox->pop(inputs, downstream_accepts, location_id);
0173 auto start_latency_time = std::chrono::steady_clock::now();
0174
0175 std::vector<SubeventWrapper<OutputT>> outputs;
0176 for (const auto& input : inputs) {
0177 auto output = m_processor->ProcessSubevent(input.data);
0178 auto wrapped = SubeventWrapper<OutputT>(input.parent, output, input.id, input.total);
0179 outputs.push_back(wrapped);
0180 }
0181 size_t outputs_size = outputs.size();
0182 auto end_latency_time = std::chrono::steady_clock::now();
0183 auto out_status = JMailbox<SubeventWrapper<OutputT>>::Status::Ready;
0184
0185 if (outputs_size > 0) {
0186 assert(m_outbox != nullptr);
0187 out_status = m_outbox->push(outputs, downstream_accepts, location_id);
0188 }
0189 auto end_queue_time = std::chrono::steady_clock::now();
0190
0191 JArrowMetrics::Status status;
0192 if (in_status == SubeventQueue::Status::Ready && out_status == JMailbox<SubeventWrapper<OutputT>>::Status::Ready) {
0193 status = JArrowMetrics::Status::KeepGoing;
0194 }
0195 else {
0196 status = JArrowMetrics::Status::ComeBackLater;
0197 }
0198 auto latency = (end_latency_time - start_latency_time);
0199 auto overhead = (end_queue_time - start_total_time) - latency;
0200 result.update(status, outputs_size, 1, latency, overhead);
0201
0202 }
0203 template <typename InputT, typename OutputT>
0204 void JMergeArrow<InputT, OutputT>::execute(JArrowMetrics& result, size_t location_id) {
0205 using InQueue = JMailbox<SubeventWrapper<OutputT>>;
0206 using OutQueue = JMailbox<std::shared_ptr<JEvent>*>;
0207
0208 auto start_total_time = std::chrono::steady_clock::now();
0209
0210
0211 std::vector<SubeventWrapper<OutputT>> inputs;
0212 size_t downstream_accepts = m_outbox->reserve(get_chunksize(), location_id);
0213 auto in_status = m_inbox->pop(inputs, downstream_accepts, location_id);
0214 auto start_latency_time = std::chrono::steady_clock::now();
0215
0216 std::vector<std::shared_ptr<JEvent>*> outputs;
0217 for (const auto& input : inputs) {
0218 LOG_TRACE(m_logger) << "JMergeArrow: Processing input with parent=" << input.parent << ", evt=" << (*(input.parent))->GetEventNumber() << ", sub=" << input.id << " and total=" << input.total << LOG_END;
0219
0220
0221 (*(input.parent))->template Insert<OutputT>(input.data);
0222 if (input.total == 1) {
0223
0224 outputs.push_back(input.parent);
0225 LOG_TRACE(m_logger) << "JMergeArrow: Finished parent=" << input.parent << ", evt=" << (*(input.parent))->GetEventNumber() << LOG_END;
0226 }
0227 else {
0228 auto pair = m_in_progress.find(input.parent);
0229 if (pair == m_in_progress.end()) {
0230 m_in_progress[input.parent] = input.total-1;
0231 }
0232 else {
0233 if (pair->second == 0) {
0234 pair->second = input.total-1;
0235 }
0236 else if (pair->second == 1) {
0237 pair->second -= 1;
0238 outputs.push_back(input.parent);
0239 LOG_TRACE(m_logger) << "JMergeArrow: Finished parent=" << input.parent << ", evt=" << (*(input.parent))->GetEventNumber() << LOG_END;
0240 }
0241 else {
0242 pair->second -= 1;
0243 }
0244 }
0245 }
0246 }
0247 LOG_DEBUG(m_logger) << "MergeArrow consumed " << inputs.size() << " subevents, produced " << outputs.size() << " events" << LOG_END;
0248 auto end_latency_time = std::chrono::steady_clock::now();
0249
0250 auto outputs_size = outputs.size();
0251 auto out_status = m_outbox->push(outputs, downstream_accepts, location_id);
0252
0253 auto end_queue_time = std::chrono::steady_clock::now();
0254
0255 JArrowMetrics::Status status;
0256 if (in_status == InQueue::Status::Ready && out_status == OutQueue::Status::Ready && inputs.size() > 0) {
0257 status = JArrowMetrics::Status::KeepGoing;
0258 }
0259 else {
0260 status = JArrowMetrics::Status::ComeBackLater;
0261 }
0262 auto latency = (end_latency_time - start_latency_time);
0263 auto overhead = (end_queue_time - start_total_time) - latency;
0264 result.update(status, outputs_size, 1, latency, overhead);
0265 }
0266
0267
0268