Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:01:40

0001 
0002 // Copyright 2023, Jefferson Science Associates, LLC.
0003 // Subject to the terms in the LICENSE file found in the top-level directory.
0004 
0005 #pragma once
0006 
0007 #include <JANA/Topology/JArrow.h>
0008 #include <JANA/Topology/JMailbox.h>
0009 #include <JANA/Topology/JPool.h>
0010 
0011 template <typename DerivedT, typename MessageT>
0012 class JPipelineArrow : public JArrow {
0013 private:
0014     PlaceRef<MessageT> m_input {this, true, 1, 1};
0015     PlaceRef<MessageT> m_output {this, false, 1, 1};
0016 
0017 public:
0018     JPipelineArrow(std::string name,
0019                    bool is_parallel,
0020                    bool is_source,
0021                    bool is_sink,
0022                    JMailbox<MessageT*>* input_queue,
0023                    JMailbox<MessageT*>* output_queue,
0024                    JPool<MessageT>* pool
0025                   )
0026         : JArrow(std::move(name), is_parallel, is_source, is_sink) {
0027 
0028         if (input_queue == nullptr) {
0029             assert(pool != nullptr);
0030             m_input.set_pool(pool);
0031         }
0032         else {
0033             m_input.set_queue(input_queue);
0034         }
0035         if (output_queue == nullptr) {
0036             assert(pool != nullptr);
0037             m_output.set_pool(pool);
0038         }
0039         else {
0040             m_output.set_queue(output_queue);
0041         }
0042     }
0043 
0044     void execute(JArrowMetrics& result, size_t location_id) final {
0045 
0046         auto start_total_time = std::chrono::steady_clock::now();
0047 
0048         Data<MessageT> in_data {location_id};
0049         Data<MessageT> out_data {location_id};
0050 
0051         bool success = m_input.pull(in_data) && m_output.pull(out_data);
0052         if (!success) {
0053             m_input.revert(in_data);
0054             m_output.revert(out_data);
0055             // TODO: Test that revert works properly
0056             
0057             auto end_total_time = std::chrono::steady_clock::now();
0058             result.update(JArrowMetrics::Status::ComeBackLater, 0, 1, std::chrono::milliseconds(0), end_total_time - start_total_time);
0059             return;
0060         }
0061 
0062         bool process_succeeded = true;
0063         JArrowMetrics::Status process_status = JArrowMetrics::Status::KeepGoing;
0064         assert(in_data.item_count == 1);
0065         MessageT* event = in_data.items[0];
0066 
0067         auto start_processing_time = std::chrono::steady_clock::now();
0068         static_cast<DerivedT*>(this)->process(event, process_succeeded, process_status);
0069         auto end_processing_time = std::chrono::steady_clock::now();
0070 
0071         if (process_succeeded) {
0072             in_data.item_count = 0;
0073             out_data.item_count = 1;
0074             out_data.items[0] = event;
0075         }
0076         m_input.push(in_data);
0077         m_output.push(out_data);
0078 
0079         // Publish metrics
0080         auto end_total_time = std::chrono::steady_clock::now();
0081         auto latency = (end_processing_time - start_processing_time);
0082         auto overhead = (end_total_time - start_total_time) - latency;
0083         result.update(process_status, process_succeeded, 1, latency, overhead);
0084     }
0085 };