File indexing completed on 2025-01-18 10:01:40
0001
0002
0003
0004 #pragma once
0005
0006 #include <JANA/Topology/JArrow.h>
0007 #include <JANA/Topology/JMailbox.h>
0008 #include <JANA/Topology/JPool.h>
0009
0010
0011
0012 template <typename DerivedT, typename FirstT, typename SecondT>
0013 class JJunctionArrow : public JArrow {
0014
0015 protected:
0016 PlaceRef<FirstT> first_input {this};
0017 PlaceRef<FirstT> first_output {this};
0018 PlaceRef<SecondT> second_input {this};
0019 PlaceRef<SecondT> second_output {this};
0020
0021 public:
0022 using Status = JArrowMetrics::Status;
0023
0024 JJunctionArrow(std::string name,
0025 bool is_parallel,
0026 bool is_source,
0027 bool is_sink
0028 )
0029 : JArrow(std::move(name), is_parallel, is_source, is_sink)
0030 {
0031 }
0032
0033 bool try_pull_all(Data<FirstT>& fi, Data<FirstT>& fo, Data<SecondT>& si, Data<SecondT>& so) {
0034
0035 bool success;
0036 success = first_input.pull(fi);
0037 if (! success) {
0038 return false;
0039 }
0040 success = first_output.pull(fo);
0041 if (! success) {
0042 first_input.revert(fi);
0043 return false;
0044 }
0045 success = second_input.pull(si);
0046 if (! success) {
0047 first_input.revert(fi);
0048 first_output.revert(fo);
0049 return false;
0050 }
0051 success = second_output.pull(so);
0052 if (! success) {
0053 first_input.revert(fi);
0054 first_output.revert(fo);
0055 second_input.revert(si);
0056 return false;
0057 }
0058 return true;
0059 }
0060
0061 size_t push_all(Data<FirstT>& fi, Data<FirstT>& fo, Data<SecondT>& si, Data<SecondT>& so) {
0062 size_t message_count = 0;
0063 message_count += first_input.push(fi);
0064 message_count += first_output.push(fo);
0065 message_count += second_input.push(si);
0066 message_count += second_output.push(so);
0067 return message_count;
0068 }
0069
0070 void execute(JArrowMetrics& result, size_t location_id) final {
0071
0072 auto start_total_time = std::chrono::steady_clock::now();
0073
0074 Data<FirstT> first_input_data {location_id};
0075 Data<FirstT> first_output_data {location_id};
0076 Data<SecondT> second_input_data {location_id};
0077 Data<SecondT> second_output_data {location_id};
0078
0079 bool success = try_pull_all(first_input_data, first_output_data, second_input_data, second_output_data);
0080 if (success) {
0081
0082 auto start_processing_time = std::chrono::steady_clock::now();
0083 auto process_status = static_cast<DerivedT*>(this)->process(first_input_data, first_output_data, second_input_data, second_output_data);
0084 auto end_processing_time = std::chrono::steady_clock::now();
0085 size_t events_processed = push_all(first_input_data, first_output_data, second_input_data, second_output_data);
0086
0087 auto end_total_time = std::chrono::steady_clock::now();
0088 auto latency = (end_processing_time - start_processing_time);
0089 auto overhead = (end_total_time - start_total_time) - latency;
0090 result.update(process_status, events_processed, 1, latency, overhead);
0091 return;
0092 }
0093 else {
0094 auto end_total_time = std::chrono::steady_clock::now();
0095 result.update(JArrowMetrics::Status::ComeBackLater, 0, 1, std::chrono::milliseconds(0), end_total_time - start_total_time);
0096 return;
0097 }
0098 }
0099 };
0100
0101