Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-30 10:12:06

0001 
0002 // Copyright 2020, Jefferson Science Associates, LLC.
0003 // Subject to the terms in the LICENSE file found in the top-level directory.
0004 
0005 #pragma once
0006 #include "JMailbox.h"
0007 #include "JArrow.h"
0008 #include <JANA/JEvent.h>
0009 
0010 /// SubeventProcessor offers sub-event-level parallelism. The idea is to split parent
0011 /// event S into independent subtasks T, and automatically bundling them with
0012 /// bookkeeping information X onto a Queue<pair<T,X>. process :: T -> U handles the stateless,
0013 /// parallel parts; its Arrow pushes messages on to a Queue<pair<U,X>, so that merge() :: S -> [U] -> V
0014 /// "joins" all completed "subtasks" of type U corresponding to one parent of type S, (i.e.
0015 /// a specific JEvent), back into a single entity of type V, (most likely the same JEvent as S,
0016 /// only now containing more data) which is pushed onto a Queue<V>, bookkeeping information now gone.
0017 /// Note that there is no blocking and that our streaming paradigm is not compromised.
0018 
0019 /// Abstract class which is meant to extended by the user to contain all
0020 /// subtask-related functions. (Data lives in a JObject instead)
0021 /// Future versions might be templated for two reasons:
0022 ///  1. To make the functions non-virtual,
0023 ///  2. To replace the generic JObject pointer with something typesafe
0024 /// Future versions could also recycle JObjects by using another Queue.
0025 
0026 template <typename InputT, typename OutputT>
0027 struct JSubeventProcessor {
0028 
0029     std::string inputTag;
0030     std::string outputTag;
0031 
0032     virtual OutputT* ProcessSubevent(InputT* input) = 0;
0033 
0034 };
0035 
0036 template <typename SubeventT>
0037 struct SubeventWrapper {
0038 
0039     std::shared_ptr<JEvent>* parent;
0040     SubeventT* data;
0041     size_t id;
0042     size_t total;
0043 
0044     SubeventWrapper(std::shared_ptr<JEvent>* parent, SubeventT* data, size_t id, size_t total)
0045     : parent(std::move(parent))
0046     , data(data)
0047     , id(id)
0048     , total(total) {}
0049 };
0050 
0051 
0052 template <typename InputT, typename OutputT>
0053 class JSubeventArrow : public JArrow {
0054     JSubeventProcessor<InputT, OutputT>* m_processor;
0055     JMailbox<SubeventWrapper<InputT>>* m_inbox;
0056     JMailbox<SubeventWrapper<OutputT>>* m_outbox;
0057 public:
0058     JSubeventArrow(std::string name,
0059                    JSubeventProcessor<InputT,OutputT>* processor,
0060                    JMailbox<SubeventWrapper<InputT>>* inbox,
0061                    JMailbox<SubeventWrapper<OutputT>>* outbox)
0062         : JArrow(name, true, false, false), m_processor(processor), m_inbox(inbox), m_outbox(outbox) {
0063     }
0064 
0065     size_t get_pending() final { return m_inbox->size(); };
0066     size_t get_threshold() final { return m_inbox->get_threshold(); };
0067     void set_threshold(size_t threshold) final { m_inbox->set_threshold(threshold);};
0068 
0069     void execute(JArrowMetrics&, size_t location_id) override;
0070 };
0071 
0072 template <typename InputT, typename OutputT>
0073 class JSplitArrow : public JArrow {
0074     JSubeventProcessor<InputT, OutputT>* m_processor;
0075     JMailbox<std::shared_ptr<JEvent>*>* m_inbox;
0076     JMailbox<SubeventWrapper<InputT>>* m_outbox;
0077 public:
0078     JSplitArrow(std::string name,
0079                 JSubeventProcessor<InputT,OutputT>* processor,
0080                 JMailbox<std::shared_ptr<JEvent>*>* inbox,
0081                 JMailbox<SubeventWrapper<InputT>>* outbox)
0082         : JArrow(name, true, false, false), m_processor(processor), m_inbox(inbox), m_outbox(outbox) {
0083     }
0084 
0085     size_t get_pending() final { return m_inbox->size(); };
0086     size_t get_threshold() final { return m_inbox->get_threshold(); };
0087     void set_threshold(size_t threshold) final { m_inbox->set_threshold(threshold);};
0088 
0089     void execute(JArrowMetrics&, size_t location_id) override;
0090 };
0091 
0092 template <typename InputT, typename OutputT>
0093 class JMergeArrow : public JArrow {
0094     JSubeventProcessor<InputT,OutputT>* m_processor;
0095     JMailbox<SubeventWrapper<OutputT>>* m_inbox;
0096     JMailbox<std::shared_ptr<JEvent>*>* m_outbox;
0097     std::map<std::shared_ptr<JEvent>*, size_t> m_in_progress;
0098 public:
0099     JMergeArrow(std::string name,
0100                 JSubeventProcessor<InputT,OutputT>* processor,
0101                 JMailbox<SubeventWrapper<OutputT>>* inbox,
0102                 JMailbox<std::shared_ptr<JEvent>*>* outbox)
0103         : JArrow(name, false, false, false), m_processor(processor), m_inbox(inbox), m_outbox(outbox) {
0104     }
0105 
0106     size_t get_pending() final { return m_inbox->size(); };
0107     size_t get_threshold() final { return m_inbox->get_threshold(); };
0108     void set_threshold(size_t threshold) final { m_inbox->set_threshold(threshold);};
0109     void execute(JArrowMetrics&, size_t location_id) override;
0110 };
0111 
0112 
0113 
0114 template <typename InputT, typename OutputT>
0115 void JSplitArrow<InputT, OutputT>::execute(JArrowMetrics& result, size_t location_id) {
0116     using InQueue = JMailbox<std::shared_ptr<JEvent>*>;
0117     using OutQueue = JMailbox<SubeventWrapper<InputT>>;
0118     auto start_total_time = std::chrono::steady_clock::now();
0119 
0120     std::shared_ptr<JEvent>* event = nullptr;
0121     bool success;
0122     size_t reserved_size = m_outbox->reserve(get_chunksize());
0123     size_t actual_size = reserved_size;
0124     // TODO: Exit early if we don't have enough space on output queue
0125 
0126     auto in_status = m_inbox->pop(event, success, location_id);
0127     auto start_latency_time = std::chrono::steady_clock::now();
0128 
0129     std::vector<SubeventWrapper<InputT>> wrapped;
0130     if (success) {
0131 
0132         // Construct prereqs
0133         std::vector<const InputT*> originals = (*event)->Get<InputT>(m_processor->inputTag);
0134         size_t i = 1;
0135         actual_size = originals.size();
0136 
0137         for (const InputT* original : originals) {
0138             InputT* unconsted = const_cast<InputT*>(original); // TODO: Get constness right
0139             wrapped.push_back(SubeventWrapper<InputT>(event, unconsted, i++, actual_size));
0140         }
0141     }
0142     auto end_latency_time = std::chrono::steady_clock::now();
0143     auto out_status = OutQueue::Status::Ready;
0144 
0145     size_t output_size = wrapped.size();
0146     if (success) {
0147         assert(m_outbox != nullptr);
0148         out_status = m_outbox->push(wrapped, reserved_size, location_id);
0149     }
0150     auto end_queue_time = std::chrono::steady_clock::now();
0151 
0152     JArrowMetrics::Status status;
0153     if (in_status == InQueue::Status::Ready && out_status == OutQueue::Status::Ready) {
0154         status = JArrowMetrics::Status::KeepGoing;
0155     }
0156     else {
0157         status = JArrowMetrics::Status::ComeBackLater;
0158     }
0159     auto latency = (end_latency_time - start_latency_time);
0160     auto overhead = (end_queue_time - start_total_time) - latency;
0161     result.update(status, output_size, 1, latency, overhead);
0162 
0163 }
0164 template <typename InputT, typename OutputT>
0165 void JSubeventArrow<InputT, OutputT>::execute(JArrowMetrics& result, size_t location_id) {
0166     using SubeventQueue = JMailbox<SubeventWrapper<InputT>>;
0167     auto start_total_time = std::chrono::steady_clock::now();
0168 
0169     // TODO: Think more carefully about subevent bucket size
0170     std::vector<SubeventWrapper<InputT>> inputs;
0171     size_t downstream_accepts = m_outbox->reserve(get_chunksize(), location_id);
0172     auto in_status = m_inbox->pop(inputs, downstream_accepts, location_id);
0173     auto start_latency_time = std::chrono::steady_clock::now();
0174 
0175     std::vector<SubeventWrapper<OutputT>> outputs;
0176     for (const auto& input : inputs) {
0177         auto output = m_processor->ProcessSubevent(input.data);
0178         auto wrapped = SubeventWrapper<OutputT>(input.parent, output, input.id, input.total);
0179         outputs.push_back(wrapped);
0180     }
0181     size_t outputs_size = outputs.size();
0182     auto end_latency_time = std::chrono::steady_clock::now();
0183     auto out_status = JMailbox<SubeventWrapper<OutputT>>::Status::Ready;
0184 
0185     if (outputs_size > 0) {
0186         assert(m_outbox != nullptr);
0187         out_status = m_outbox->push(outputs, downstream_accepts, location_id);
0188     }
0189     auto end_queue_time = std::chrono::steady_clock::now();
0190 
0191     JArrowMetrics::Status status;
0192     if (in_status == SubeventQueue::Status::Ready && out_status == JMailbox<SubeventWrapper<OutputT>>::Status::Ready) {
0193         status = JArrowMetrics::Status::KeepGoing;
0194     }
0195     else {
0196         status = JArrowMetrics::Status::ComeBackLater;
0197     }
0198     auto latency = (end_latency_time - start_latency_time);
0199     auto overhead = (end_queue_time - start_total_time) - latency;
0200     result.update(status, outputs_size, 1, latency, overhead);
0201 
0202 }
0203 template <typename InputT, typename OutputT>
0204 void JMergeArrow<InputT, OutputT>::execute(JArrowMetrics& result, size_t location_id) {
0205     using InQueue = JMailbox<SubeventWrapper<OutputT>>;
0206     using OutQueue = JMailbox<std::shared_ptr<JEvent>*>;
0207 
0208     auto start_total_time = std::chrono::steady_clock::now();
0209 
0210     // TODO: Think more carefully about subevent bucket size
0211     std::vector<SubeventWrapper<OutputT>> inputs;
0212     size_t downstream_accepts = m_outbox->reserve(get_chunksize(), location_id);
0213     auto in_status = m_inbox->pop(inputs, downstream_accepts, location_id);
0214     auto start_latency_time = std::chrono::steady_clock::now();
0215 
0216     std::vector<std::shared_ptr<JEvent>*> outputs;
0217     for (const auto& input : inputs) {
0218         LOG_TRACE(m_logger) << "JMergeArrow: Processing input with parent=" << input.parent << ", evt=" << (*(input.parent))->GetEventNumber() << ", sub=" << input.id << " and total=" << input.total << LOG_END;
0219         // Problem: Are we sure we are updating the event in a way which is effectively thread-safe?
0220         // Should we be doing this insert, or should the caller?
0221         (*(input.parent))->template Insert<OutputT>(input.data);
0222         if (input.total == 1) {
0223             // Goes straight into "ready"
0224             outputs.push_back(input.parent);
0225             LOG_TRACE(m_logger) << "JMergeArrow: Finished parent=" << input.parent << ", evt=" << (*(input.parent))->GetEventNumber() << LOG_END;
0226         }
0227         else {
0228             auto pair = m_in_progress.find(input.parent);
0229             if (pair == m_in_progress.end()) {
0230                 m_in_progress[input.parent] = input.total-1;
0231             }
0232             else {
0233                 if (pair->second == 0) {   // Event was already in the map. TODO: What happens when we turn off event recycling?
0234                     pair->second = input.total-1;
0235                 }
0236                 else if (pair->second == 1) {
0237                     pair->second -= 1;
0238                     outputs.push_back(input.parent);
0239                     LOG_TRACE(m_logger) << "JMergeArrow: Finished parent=" << input.parent << ", evt=" << (*(input.parent))->GetEventNumber() << LOG_END;
0240                 }
0241                 else {
0242                     pair->second -= 1;
0243                 }
0244             }
0245         }
0246     }
0247     LOG_DEBUG(m_logger) << "MergeArrow consumed " << inputs.size() << " subevents, produced " << outputs.size() << " events" << LOG_END;
0248     auto end_latency_time = std::chrono::steady_clock::now();
0249 
0250     auto outputs_size = outputs.size();
0251     auto out_status = m_outbox->push(outputs, downstream_accepts, location_id);
0252 
0253     auto end_queue_time = std::chrono::steady_clock::now();
0254 
0255     JArrowMetrics::Status status;
0256     if (in_status == InQueue::Status::Ready && out_status == OutQueue::Status::Ready && inputs.size() > 0) {
0257         status = JArrowMetrics::Status::KeepGoing;
0258     }
0259     else {
0260         status = JArrowMetrics::Status::ComeBackLater;
0261     }
0262     auto latency = (end_latency_time - start_latency_time);
0263     auto overhead = (end_queue_time - start_total_time) - latency;
0264     result.update(status, outputs_size, 1, latency, overhead);
0265 }
0266 
0267 
0268