File indexing completed on 2025-01-18 10:01:40
0001
0002
0003
0004
0005 #pragma once
0006
0007 #include <JANA/Topology/JArrow.h>
0008 #include <JANA/Topology/JMailbox.h>
0009 #include <JANA/Topology/JPool.h>
0010
0011 template <typename DerivedT, typename MessageT>
0012 class JPipelineArrow : public JArrow {
0013 private:
0014 PlaceRef<MessageT> m_input {this, true, 1, 1};
0015 PlaceRef<MessageT> m_output {this, false, 1, 1};
0016
0017 public:
0018 JPipelineArrow(std::string name,
0019 bool is_parallel,
0020 bool is_source,
0021 bool is_sink,
0022 JMailbox<MessageT*>* input_queue,
0023 JMailbox<MessageT*>* output_queue,
0024 JPool<MessageT>* pool
0025 )
0026 : JArrow(std::move(name), is_parallel, is_source, is_sink) {
0027
0028 if (input_queue == nullptr) {
0029 assert(pool != nullptr);
0030 m_input.set_pool(pool);
0031 }
0032 else {
0033 m_input.set_queue(input_queue);
0034 }
0035 if (output_queue == nullptr) {
0036 assert(pool != nullptr);
0037 m_output.set_pool(pool);
0038 }
0039 else {
0040 m_output.set_queue(output_queue);
0041 }
0042 }
0043
0044 void execute(JArrowMetrics& result, size_t location_id) final {
0045
0046 auto start_total_time = std::chrono::steady_clock::now();
0047
0048 Data<MessageT> in_data {location_id};
0049 Data<MessageT> out_data {location_id};
0050
0051 bool success = m_input.pull(in_data) && m_output.pull(out_data);
0052 if (!success) {
0053 m_input.revert(in_data);
0054 m_output.revert(out_data);
0055
0056
0057 auto end_total_time = std::chrono::steady_clock::now();
0058 result.update(JArrowMetrics::Status::ComeBackLater, 0, 1, std::chrono::milliseconds(0), end_total_time - start_total_time);
0059 return;
0060 }
0061
0062 bool process_succeeded = true;
0063 JArrowMetrics::Status process_status = JArrowMetrics::Status::KeepGoing;
0064 assert(in_data.item_count == 1);
0065 MessageT* event = in_data.items[0];
0066
0067 auto start_processing_time = std::chrono::steady_clock::now();
0068 static_cast<DerivedT*>(this)->process(event, process_succeeded, process_status);
0069 auto end_processing_time = std::chrono::steady_clock::now();
0070
0071 if (process_succeeded) {
0072 in_data.item_count = 0;
0073 out_data.item_count = 1;
0074 out_data.items[0] = event;
0075 }
0076 m_input.push(in_data);
0077 m_output.push(out_data);
0078
0079
0080 auto end_total_time = std::chrono::steady_clock::now();
0081 auto latency = (end_processing_time - start_processing_time);
0082 auto overhead = (end_total_time - start_total_time) - latency;
0083 result.update(process_status, process_succeeded, 1, latency, overhead);
0084 }
0085 };