Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-02-22 10:41:08

0001 
0002 // Copyright 2024, Jefferson Science Associates, LLC.
0003 // Subject to the terms in the LICENSE file found in the top-level directory.
0004 
0005 #pragma once
0006 
0007 #include <JANA/JService.h>
0008 #include <JANA/Topology/JArrow.h>
0009 #include <JANA/Topology/JTopologyBuilder.h>
0010 #include <JANA/Utils/JBacktrace.h>
0011 
0012 #include <chrono>
0013 #include <condition_variable>
0014 #include <ctime>
0015 #include <exception>
0016 
0017 extern thread_local int jana2_worker_id;
0018 
0019 class JExecutionEngine : public JService {
0020 
0021 public:
0022     using clock_t = std::chrono::steady_clock;
0023 
0024     enum class RunStatus { Paused, Running, Pausing, Draining, Failed, Finished };
0025     enum class InterruptStatus { NoInterruptsSupervised, NoInterruptsUnsupervised, InspectRequested, InspectInProgress, PauseAndQuit };
0026 
0027     struct Perf {
0028         RunStatus runstatus;
0029         size_t thread_count;
0030         size_t event_count;
0031         size_t uptime_ms;
0032         double throughput_hz;
0033         JEventLevel event_level;
0034     };
0035 
0036     struct Worker {
0037         size_t worker_id;
0038         JBacktrace* backtrace;
0039     };
0040 
0041 #ifndef JANA2_TESTCASE
0042 private:
0043 #endif
0044 
0045     struct Task {
0046         JArrow* arrow = nullptr;
0047         JEvent* input_event = nullptr;
0048         int input_port = -1;
0049         JArrow::OutputData outputs;
0050         size_t output_count = 0;
0051         JArrow::FireResult status = JArrow::FireResult::NotRunYet;
0052     };
0053 
0054     struct ArrowState {
0055         enum class Status { Paused, Running, Finished };
0056         Status status = Status::Paused;
0057         bool is_parallel = false;
0058         bool is_source = false;
0059         bool is_sink = false;
0060         size_t next_input = 0;
0061         size_t active_tasks = 0;
0062         size_t events_processed;
0063         clock_t::duration total_processing_duration;
0064     };
0065 
0066     struct WorkerState {
0067         std::thread* thread = nullptr;
0068         size_t worker_id = 0;
0069         size_t cpu_id = 0;
0070         size_t location_id = 0;
0071         clock_t::time_point last_checkout_time = clock_t::now();
0072         std::exception_ptr stored_exception = nullptr;
0073         bool is_stop_requested = false;
0074         bool is_event_warmed_up = false;
0075         bool is_timed_out = false;
0076         uint64_t last_event_nr = 0;
0077         size_t last_arrow_id = 0;
0078         JBacktrace backtrace;
0079     };
0080 
0081 
0082 #ifndef JANA2_TESTCASE
0083 private:
0084 #endif
0085     // Services
0086     Service<JTopologyBuilder> m_topology {this};
0087 
0088     // Parameters
0089     bool m_show_ticker = true;
0090     bool m_enable_timeout = true;
0091     int m_backoff_ms = 10;
0092     int m_ticker_ms = 500;
0093     int m_timeout_s = 8;
0094     int m_warmup_timeout_s = 30;
0095     std::string m_path_to_named_pipe = "/tmp/jana_status";
0096 
0097     // Concurrency
0098     std::mutex m_mutex;
0099     std::condition_variable m_condvar;
0100     std::vector<std::unique_ptr<WorkerState>> m_worker_states;
0101     std::vector<ArrowState> m_arrow_states;
0102     RunStatus m_runstatus = RunStatus::Paused;
0103     std::atomic<InterruptStatus> m_interrupt_status { InterruptStatus::NoInterruptsUnsupervised };
0104     std::atomic_bool m_print_worker_report_requested {false};
0105     std::atomic_bool m_send_worker_report_requested {false};
0106     size_t m_next_arrow_id=0;
0107 
0108     // Metrics
0109     size_t m_event_count_at_start = 0;
0110     size_t m_event_count_at_finish = 0;
0111     clock_t::time_point m_time_at_start;
0112     clock_t::time_point m_time_at_finish;
0113     clock_t::duration m_total_idle_duration = clock_t::duration::zero();
0114     clock_t::duration m_total_scheduler_duration = clock_t::duration::zero();
0115 
0116 
0117 public:
0118 
0119     JExecutionEngine() {
0120         SetLoggerName("jana");
0121     }
0122 
0123     ~JExecutionEngine() {
0124         ScaleWorkers(0);
0125         // If we don't shut down the thread team, the condition variable will hang during destruction
0126     }
0127 
0128     void Init() override;
0129 
0130     void RunTopology();
0131     void PauseTopology();
0132     void DrainTopology();
0133     void FinishTopology();
0134 
0135     void ScaleWorkers(size_t nthreads);
0136     Worker RegisterWorker();
0137     void RunWorker(Worker);
0138     void RunSupervisor();
0139 
0140     JArrow::FireResult Fire(size_t arrow_id, size_t location_id=0);
0141 
0142     Perf GetPerf();
0143     RunStatus GetRunStatus();
0144     void SetTickerEnabled(bool ticker_on);
0145     bool IsTickerEnabled() const;
0146     void SetTimeoutEnabled(bool timeout_on);
0147     bool IsTimeoutEnabled() const;
0148 
0149     void HandleSIGINT();
0150     void HandleSIGUSR1();
0151     void HandleSIGUSR2();
0152     void HandleSIGTSTP();
0153 
0154 #ifndef JANA2_TESTCASE
0155 private:
0156 #endif
0157 
0158     void PrintWorkerReport(bool);
0159     void PrintFinalReport();
0160     bool CheckTimeout();
0161     void HandleFailures();
0162     void ExchangeTask(Task& task, size_t worker_id, bool nonblocking=false);
0163     void CheckinCompletedTask_Unsafe(Task& task, WorkerState& worker, clock_t::time_point checkin_time);
0164     void FindNextReadyTask_Unsafe(Task& task, WorkerState& worker);
0165 
0166 };
0167 
0168 
0169 std::string ToString(JExecutionEngine::RunStatus status);
0170 
0171