Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:01:40

0001 // Copyright 2023, Jefferson Science Associates, LLC.
0002 // Subject to the terms in the LICENSE file found in the top-level directory.
0003 
0004 #pragma once
0005 
0006 #include <JANA/Topology/JArrow.h>
0007 #include <JANA/Topology/JMailbox.h>
0008 #include <JANA/Topology/JPool.h>
0009 
0010 
0011 
0012 template <typename DerivedT, typename FirstT, typename SecondT>
0013 class JJunctionArrow : public JArrow {
0014 
0015 protected:    
0016     PlaceRef<FirstT> first_input {this};
0017     PlaceRef<FirstT> first_output {this};
0018     PlaceRef<SecondT> second_input {this};
0019     PlaceRef<SecondT> second_output {this};
0020 
0021 public:
0022     using Status = JArrowMetrics::Status;
0023 
0024     JJunctionArrow(std::string name,
0025                    bool is_parallel,
0026                    bool is_source,
0027                    bool is_sink
0028                   )
0029         : JArrow(std::move(name), is_parallel, is_source, is_sink)
0030     {
0031     }
0032 
0033     bool try_pull_all(Data<FirstT>& fi, Data<FirstT>& fo, Data<SecondT>& si, Data<SecondT>& so) {
0034 
0035         bool success;
0036         success = first_input.pull(fi);
0037         if (! success) {
0038             return false;
0039         }
0040         success = first_output.pull(fo);
0041         if (! success) {
0042             first_input.revert(fi);
0043             return false;
0044         }
0045         success = second_input.pull(si);
0046         if (! success) {
0047             first_input.revert(fi);
0048             first_output.revert(fo);
0049             return false;
0050         }
0051         success = second_output.pull(so);
0052         if (! success) {
0053             first_input.revert(fi);
0054             first_output.revert(fo);
0055             second_input.revert(si);
0056             return false;
0057         }
0058         return true;
0059     }
0060 
0061     size_t push_all(Data<FirstT>& fi, Data<FirstT>& fo, Data<SecondT>& si, Data<SecondT>& so) {
0062         size_t message_count = 0;
0063         message_count += first_input.push(fi);
0064         message_count += first_output.push(fo);
0065         message_count += second_input.push(si);
0066         message_count += second_output.push(so);
0067         return message_count;
0068     }
0069 
0070     void execute(JArrowMetrics& result, size_t location_id) final {
0071 
0072         auto start_total_time = std::chrono::steady_clock::now();
0073 
0074         Data<FirstT> first_input_data {location_id};
0075         Data<FirstT> first_output_data {location_id};
0076         Data<SecondT> second_input_data {location_id};
0077         Data<SecondT> second_output_data {location_id};
0078 
0079         bool success = try_pull_all(first_input_data, first_output_data, second_input_data, second_output_data);
0080         if (success) {
0081 
0082             auto start_processing_time = std::chrono::steady_clock::now();
0083             auto process_status = static_cast<DerivedT*>(this)->process(first_input_data, first_output_data, second_input_data, second_output_data);
0084             auto end_processing_time = std::chrono::steady_clock::now();
0085             size_t events_processed = push_all(first_input_data, first_output_data, second_input_data, second_output_data);
0086 
0087             auto end_total_time = std::chrono::steady_clock::now();
0088             auto latency = (end_processing_time - start_processing_time);
0089             auto overhead = (end_total_time - start_total_time) - latency;
0090             result.update(process_status, events_processed, 1, latency, overhead);
0091             return;
0092         }
0093         else {
0094             auto end_total_time = std::chrono::steady_clock::now();
0095             result.update(JArrowMetrics::Status::ComeBackLater, 0, 1, std::chrono::milliseconds(0), end_total_time - start_total_time);
0096             return;
0097         }
0098     }
0099 };
0100 
0101