File indexing completed on 2024-11-15 09:43:37
0001
0002
0003
0004
0005 #pragma once
0006 #include <thread>
0007 #include <JANA/Engine/JScheduler.h>
0008 #include <JANA/Engine/JWorkerMetrics.h>
0009 #include <JANA/Engine/JPerfSummary.h>
0010 #include <atomic>
0011
0012
0013 class JArrowProcessingController;
0014
0015 class JWorker {
0016
0017
0018
0019
0020
0021 public:
0022 enum class RunState { Running, Stopping, Stopped, TimedOut, Excepted };
0023
0024 enum class BackoffStrategy { Constant, Linear, Exponential };
0025
0026 using duration_t = std::chrono::steady_clock::duration;
0027
0028
0029 JLogger logger;
0030
0031 private:
0032
0033
0034 JScheduler* m_scheduler;
0035 JArrowProcessingController* m_japc;
0036 unsigned m_worker_id;
0037 unsigned m_cpu_id;
0038 unsigned m_location_id;
0039 bool m_pin_to_cpu;
0040 std::atomic<RunState> m_run_state;
0041 JArrow* m_assignment;
0042 std::thread* m_thread;
0043 JWorkerMetrics m_worker_metrics;
0044 JArrowMetrics m_arrow_metrics;
0045 std::mutex m_assignment_mutex;
0046 JException m_exception;
0047
0048 BackoffStrategy m_backoff_strategy = BackoffStrategy::Exponential;
0049 duration_t m_initial_backoff_time = std::chrono::microseconds(1);
0050 duration_t m_checkin_time = std::chrono::milliseconds(500);
0051 unsigned m_backoff_tries = 4;
0052
0053 public:
0054 JWorker(JArrowProcessingController* japc, JScheduler* scheduler, unsigned worker_id, unsigned cpu_id, unsigned domain_id, bool pin_to_cpu);
0055 ~JWorker();
0056
0057
0058
0059
0060 JWorker(const JWorker &other) = delete;
0061 JWorker(JWorker &&other) = delete;
0062 JWorker &operator=(const JWorker &other) = delete;
0063
0064 RunState get_runstate() { return m_run_state; };
0065
0066 void start();
0067 void request_stop();
0068 void wait_for_stop();
0069 void declare_timeout();
0070 const JException& get_exception() const;
0071
0072
0073 void loop();
0074
0075
0076
0077 void measure_perf(WorkerSummary& result);
0078
0079 inline void set_backoff_tries(unsigned backoff_tries) { m_backoff_tries = backoff_tries; }
0080
0081 inline unsigned get_backoff_tries() const { return m_backoff_tries; }
0082
0083 inline BackoffStrategy get_backoff_strategy() const { return m_backoff_strategy; }
0084
0085 inline void set_backoff_strategy(BackoffStrategy backoff_strategy) { m_backoff_strategy = backoff_strategy; }
0086
0087 inline duration_t get_initial_backoff_time() const { return m_initial_backoff_time; }
0088
0089 inline void set_initial_backoff_time(duration_t initial_backoff_time) { m_initial_backoff_time = initial_backoff_time; }
0090
0091 inline duration_t get_checkin_time() const { return m_checkin_time; }
0092
0093 inline void set_checkin_time(duration_t checkin_time) { m_checkin_time = checkin_time; }
0094
0095 };
0096