File indexing completed on 2025-02-22 10:41:08
0001
0002
0003
0004
0005 #pragma once
0006
0007 #include <JANA/JService.h>
0008 #include <JANA/Topology/JArrow.h>
0009 #include <JANA/Topology/JTopologyBuilder.h>
0010 #include <JANA/Utils/JBacktrace.h>
0011
0012 #include <chrono>
0013 #include <condition_variable>
0014 #include <ctime>
0015 #include <exception>
0016
0017 extern thread_local int jana2_worker_id;
0018
0019 class JExecutionEngine : public JService {
0020
0021 public:
0022 using clock_t = std::chrono::steady_clock;
0023
0024 enum class RunStatus { Paused, Running, Pausing, Draining, Failed, Finished };
0025 enum class InterruptStatus { NoInterruptsSupervised, NoInterruptsUnsupervised, InspectRequested, InspectInProgress, PauseAndQuit };
0026
0027 struct Perf {
0028 RunStatus runstatus;
0029 size_t thread_count;
0030 size_t event_count;
0031 size_t uptime_ms;
0032 double throughput_hz;
0033 JEventLevel event_level;
0034 };
0035
0036 struct Worker {
0037 size_t worker_id;
0038 JBacktrace* backtrace;
0039 };
0040
0041 #ifndef JANA2_TESTCASE
0042 private:
0043 #endif
0044
0045 struct Task {
0046 JArrow* arrow = nullptr;
0047 JEvent* input_event = nullptr;
0048 int input_port = -1;
0049 JArrow::OutputData outputs;
0050 size_t output_count = 0;
0051 JArrow::FireResult status = JArrow::FireResult::NotRunYet;
0052 };
0053
0054 struct ArrowState {
0055 enum class Status { Paused, Running, Finished };
0056 Status status = Status::Paused;
0057 bool is_parallel = false;
0058 bool is_source = false;
0059 bool is_sink = false;
0060 size_t next_input = 0;
0061 size_t active_tasks = 0;
0062 size_t events_processed;
0063 clock_t::duration total_processing_duration;
0064 };
0065
0066 struct WorkerState {
0067 std::thread* thread = nullptr;
0068 size_t worker_id = 0;
0069 size_t cpu_id = 0;
0070 size_t location_id = 0;
0071 clock_t::time_point last_checkout_time = clock_t::now();
0072 std::exception_ptr stored_exception = nullptr;
0073 bool is_stop_requested = false;
0074 bool is_event_warmed_up = false;
0075 bool is_timed_out = false;
0076 uint64_t last_event_nr = 0;
0077 size_t last_arrow_id = 0;
0078 JBacktrace backtrace;
0079 };
0080
0081
0082 #ifndef JANA2_TESTCASE
0083 private:
0084 #endif
0085
0086 Service<JTopologyBuilder> m_topology {this};
0087
0088
0089 bool m_show_ticker = true;
0090 bool m_enable_timeout = true;
0091 int m_backoff_ms = 10;
0092 int m_ticker_ms = 500;
0093 int m_timeout_s = 8;
0094 int m_warmup_timeout_s = 30;
0095 std::string m_path_to_named_pipe = "/tmp/jana_status";
0096
0097
0098 std::mutex m_mutex;
0099 std::condition_variable m_condvar;
0100 std::vector<std::unique_ptr<WorkerState>> m_worker_states;
0101 std::vector<ArrowState> m_arrow_states;
0102 RunStatus m_runstatus = RunStatus::Paused;
0103 std::atomic<InterruptStatus> m_interrupt_status { InterruptStatus::NoInterruptsUnsupervised };
0104 std::atomic_bool m_print_worker_report_requested {false};
0105 std::atomic_bool m_send_worker_report_requested {false};
0106 size_t m_next_arrow_id=0;
0107
0108
0109 size_t m_event_count_at_start = 0;
0110 size_t m_event_count_at_finish = 0;
0111 clock_t::time_point m_time_at_start;
0112 clock_t::time_point m_time_at_finish;
0113 clock_t::duration m_total_idle_duration = clock_t::duration::zero();
0114 clock_t::duration m_total_scheduler_duration = clock_t::duration::zero();
0115
0116
0117 public:
0118
0119 JExecutionEngine() {
0120 SetLoggerName("jana");
0121 }
0122
0123 ~JExecutionEngine() {
0124 ScaleWorkers(0);
0125
0126 }
0127
0128 void Init() override;
0129
0130 void RunTopology();
0131 void PauseTopology();
0132 void DrainTopology();
0133 void FinishTopology();
0134
0135 void ScaleWorkers(size_t nthreads);
0136 Worker RegisterWorker();
0137 void RunWorker(Worker);
0138 void RunSupervisor();
0139
0140 JArrow::FireResult Fire(size_t arrow_id, size_t location_id=0);
0141
0142 Perf GetPerf();
0143 RunStatus GetRunStatus();
0144 void SetTickerEnabled(bool ticker_on);
0145 bool IsTickerEnabled() const;
0146 void SetTimeoutEnabled(bool timeout_on);
0147 bool IsTimeoutEnabled() const;
0148
0149 void HandleSIGINT();
0150 void HandleSIGUSR1();
0151 void HandleSIGUSR2();
0152 void HandleSIGTSTP();
0153
0154 #ifndef JANA2_TESTCASE
0155 private:
0156 #endif
0157
0158 void PrintWorkerReport(bool);
0159 void PrintFinalReport();
0160 bool CheckTimeout();
0161 void HandleFailures();
0162 void ExchangeTask(Task& task, size_t worker_id, bool nonblocking=false);
0163 void CheckinCompletedTask_Unsafe(Task& task, WorkerState& worker, clock_t::time_point checkin_time);
0164 void FindNextReadyTask_Unsafe(Task& task, WorkerState& worker);
0165
0166 };
0167
0168
0169 std::string ToString(JExecutionEngine::RunStatus status);
0170
0171