Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2024-11-15 09:43:37

0001 
0002 // Copyright 2020, Jefferson Science Associates, LLC.
0003 // Subject to the terms in the LICENSE file found in the top-level directory.
0004 
0005 #pragma once
0006 #include <thread>
0007 #include <JANA/Engine/JScheduler.h>
0008 #include <JANA/Engine/JWorkerMetrics.h>
0009 #include <JANA/Engine/JPerfSummary.h>
0010 #include <atomic>
0011 
0012 
0013 class JArrowProcessingController;
0014 
0015 class JWorker {
0016     /// Designed so that the Worker checks in with the Scheduler on his own terms;
0017     /// i.e. nobody will update the worker's assignment externally. This eliminates
0018     /// a whole lot of synchronization since we can assume
0019     /// that the Worker's internal state won't be updated by another thread.
0020 
0021 public:
0022     enum class RunState { Running, Stopping, Stopped, TimedOut, Excepted };
0023 
0024     enum class BackoffStrategy { Constant, Linear, Exponential };
0025 
0026     using duration_t = std::chrono::steady_clock::duration;
0027 
0028     /// The logger is made public so that somebody else may set it
0029     JLogger logger;
0030 
0031 private:
0032     /// Machinery that nobody else should modify. These should be protected eventually.
0033     /// Probably simply make them private and expose via get_status() -> Worker::Status
0034     JScheduler* m_scheduler;
0035     JArrowProcessingController* m_japc; // This is used to turn off the other workers if an exception happens
0036     unsigned m_worker_id;
0037     unsigned m_cpu_id;
0038     unsigned m_location_id;
0039     bool m_pin_to_cpu;
0040     std::atomic<RunState> m_run_state;
0041     JArrow* m_assignment;
0042     std::thread* m_thread;    // JWorker encapsulates a thread of some kind. Nothing else should care how.
0043     JWorkerMetrics m_worker_metrics;
0044     JArrowMetrics m_arrow_metrics;
0045     std::mutex m_assignment_mutex;
0046     JException m_exception;
0047 
0048     BackoffStrategy m_backoff_strategy = BackoffStrategy::Exponential;
0049     duration_t m_initial_backoff_time = std::chrono::microseconds(1);
0050     duration_t m_checkin_time = std::chrono::milliseconds(500);
0051     unsigned m_backoff_tries = 4;
0052 
0053 public:
0054     JWorker(JArrowProcessingController* japc, JScheduler* scheduler, unsigned worker_id, unsigned cpu_id, unsigned domain_id, bool pin_to_cpu);
0055     ~JWorker();
0056 
0057     /// If we copy or move the Worker, the underlying std::thread will be left with a
0058     /// dangling pointer back to `this`. So we forbid copying, assigning, and moving.
0059 
0060     JWorker(const JWorker &other) = delete;
0061     JWorker(JWorker &&other) = delete;
0062     JWorker &operator=(const JWorker &other) = delete;
0063 
0064     RunState get_runstate() { return m_run_state; };
0065 
0066     void start();
0067     void request_stop();
0068     void wait_for_stop();
0069     void declare_timeout();
0070     const JException& get_exception() const;
0071 
0072     /// This is what the encapsulated thread is supposed to be doing
0073     void loop();
0074 
0075     /// Summarize what/how this Worker is doing. This is meant to be called from
0076     /// JProcessingController::measure_perf()
0077     void measure_perf(WorkerSummary& result);
0078 
0079     inline void set_backoff_tries(unsigned backoff_tries) { m_backoff_tries = backoff_tries; }
0080 
0081     inline unsigned get_backoff_tries() const { return m_backoff_tries; }
0082 
0083     inline BackoffStrategy get_backoff_strategy() const { return m_backoff_strategy; }
0084 
0085     inline void set_backoff_strategy(BackoffStrategy backoff_strategy) { m_backoff_strategy = backoff_strategy; }
0086 
0087     inline duration_t get_initial_backoff_time() const { return m_initial_backoff_time; }
0088 
0089     inline void set_initial_backoff_time(duration_t initial_backoff_time) { m_initial_backoff_time = initial_backoff_time; }
0090 
0091     inline duration_t get_checkin_time() const { return m_checkin_time; }
0092 
0093     inline void set_checkin_time(duration_t checkin_time) { m_checkin_time = checkin_time; }
0094 
0095 };
0096