Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:17:34

0001 
0002 #include "JExecutionEngine.h"
0003 #include <JANA/Utils/JApplicationInspector.h>
0004 
0005 #include <chrono>
0006 #include <cstddef>
0007 #include <cstdio>
0008 #include <ctime>
0009 #include <exception>
0010 #include <mutex>
0011 #include <csignal>
0012 #include <ostream>
0013 #include <sstream>
0014 #include <sys/stat.h>
0015 #include <fcntl.h>
0016 #include <unistd.h>
0017 
0018 thread_local int jana2_worker_id = -1;
0019 thread_local JBacktrace* jana2_worker_backtrace = nullptr;
0020 
0021 void JExecutionEngine::Init() {
0022     auto params = GetApplication()->GetJParameterManager();
0023 
0024     params->SetDefaultParameter("jana:timeout", m_timeout_s, 
0025         "Max time (in seconds) JANA will wait for a thread to update its heartbeat before hard-exiting. 0 to disable timeout completely.");
0026 
0027     params->SetDefaultParameter("jana:warmup_timeout", m_warmup_timeout_s, 
0028         "Max time (in seconds) JANA will wait for 'initial' events to complete before hard-exiting.");
0029 
0030     params->SetDefaultParameter("jana:backoff_interval", m_backoff_ms, 
0031         "Max time (in seconds) JANA will wait for 'initial' events to complete before hard-exiting.");
0032 
0033     params->SetDefaultParameter("jana:show_ticker", m_show_ticker, "Controls whether the ticker is visible");
0034 
0035     params->SetDefaultParameter("jana:ticker_interval", m_ticker_ms, "Controls the ticker interval (in ms)");
0036 
0037     auto p = params->SetDefaultParameter("jana:status_fname", m_path_to_named_pipe,
0038         "Filename of named pipe for retrieving instantaneous status info");
0039 
0040     size_t pid = getpid();
0041     mkfifo(m_path_to_named_pipe.c_str(), 0666);
0042 
0043     LOG_WARN(GetLogger()) << "To pause processing and inspect, press Ctrl-C." << LOG_END;
0044     LOG_WARN(GetLogger()) << "For a clean shutdown, press Ctrl-C twice." << LOG_END;
0045     LOG_WARN(GetLogger()) << "For a hard shutdown, press Ctrl-C three times." << LOG_END;
0046 
0047     if (p->IsDefault()) {
0048         LOG_WARN(GetLogger()) << "For worker status information, press Ctrl-Z, or run `jana-status " << pid << "`" << LOG_END;
0049     }
0050     else {
0051         LOG_WARN(GetLogger()) << "For worker status information, press Ctrl-Z, or run `jana-status " << pid << " " << m_path_to_named_pipe << "`" << LOG_END;
0052     }
0053 
0054 
0055     // Not sure how I feel about putting this here yet, but I think it will at least work in both cases it needs to.
0056     // The reason this works is because JTopologyBuilder::create_topology() has already been called before 
0057     // JApplication::ProvideService<JExecutionEngine>().
0058     for (JArrow* arrow : m_topology->arrows) {
0059 
0060         arrow->initialize();
0061 
0062         m_arrow_states.emplace_back();
0063         auto& arrow_state = m_arrow_states.back();
0064         arrow_state.is_source = arrow->is_source();
0065         arrow_state.is_sink = arrow->is_sink();
0066         arrow_state.is_parallel = arrow->is_parallel();
0067         arrow_state.next_input = arrow->get_next_port_index();
0068     }
0069 }
0070 
0071 
0072 void JExecutionEngine::RunTopology() {
0073     std::unique_lock<std::mutex> lock(m_mutex);
0074 
0075     if (m_runstatus == RunStatus::Failed) {
0076         throw JException("Cannot switch topology runstatus to Running because it is already Failed");
0077     }
0078     if (m_runstatus == RunStatus::Finished) {
0079         throw JException("Cannot switch topology runstatus to Running because it is already Finished");
0080     }
0081 
0082     // Set start time and event count
0083     m_time_at_start = clock_t::now();
0084     m_event_count_at_start = m_event_count_at_finish;
0085 
0086     // Reactivate topology
0087     for (auto& arrow: m_arrow_states) {
0088         if (arrow.status == ArrowState::Status::Paused) {
0089             arrow.status = ArrowState::Status::Running;
0090         }
0091     }
0092 
0093     m_runstatus = RunStatus::Running;
0094 
0095     lock.unlock();
0096     m_condvar.notify_one();
0097 }
0098 
0099 void JExecutionEngine::ScaleWorkers(size_t nthreads) {
0100     // We both create and destroy the pool of workers here. They all sleep until they 
0101     // receive work from the scheduler, which won't happen until the runstatus <- {Running, 
0102     // Pausing, Draining} and there is a task ready to execute. This way worker creation/destruction
0103     // is decoupled from topology execution.
0104 
0105     // If we scale to zero, no workers will run. This is useful for testing, and also for using
0106     // an external thread team, should the need arise.
0107 
0108     std::unique_lock<std::mutex> lock(m_mutex);
0109 
0110     auto prev_nthreads = m_worker_states.size();
0111 
0112     if (prev_nthreads < nthreads) {
0113         // We are launching additional worker threads
0114         LOG_DEBUG(GetLogger()) << "Scaling up to " << nthreads << " worker threads" << LOG_END;
0115         for (size_t worker_id=prev_nthreads; worker_id < nthreads; ++worker_id) {
0116             auto worker = std::make_unique<WorkerState>();
0117             worker->worker_id = worker_id;
0118             worker->is_stop_requested = false;
0119             worker->cpu_id = m_topology->mapping.get_cpu_id(worker_id);
0120             worker->location_id = m_topology->mapping.get_loc_id(worker_id);
0121             worker->thread = new std::thread(&JExecutionEngine::RunWorker, this, Worker{worker_id, &worker->backtrace});
0122             LOG_DEBUG(GetLogger()) << "Launching worker thread " << worker_id << " on cpu=" << worker->cpu_id << ", location=" << worker->location_id << LOG_END;
0123             m_worker_states.push_back(std::move(worker));
0124 
0125             bool pin_to_cpu = (m_topology->mapping.get_affinity() != JProcessorMapping::AffinityStrategy::None);
0126             if (pin_to_cpu) {
0127                 JCpuInfo::PinThreadToCpu(worker->thread, worker->cpu_id);
0128             }
0129         }
0130     }
0131 
0132     else if (prev_nthreads > nthreads) {
0133         // We are destroying existing worker threads
0134         LOG_DEBUG(GetLogger()) << "Scaling down to " << nthreads << " worker threads" << LOG_END;
0135 
0136         // Signal to threads that they need to terminate.
0137         for (int worker_id=prev_nthreads-1; worker_id >= (int)nthreads; --worker_id) {
0138             LOG_DEBUG(GetLogger()) << "Stopping worker " << worker_id << LOG_END;
0139             m_worker_states[worker_id]->is_stop_requested = true;
0140         }
0141         m_condvar.notify_all(); // Wake up all threads so that they can exit the condvar wait loop
0142         lock.unlock();
0143 
0144         // We join all (eligible) threads _outside_ of the mutex
0145         for (int worker_id=prev_nthreads-1; worker_id >= (int) nthreads; --worker_id) {
0146             if (m_worker_states[worker_id]->thread != nullptr) {
0147                 if (m_worker_states[worker_id]->is_timed_out) {
0148                     // Thread has timed out. Rather than non-cooperatively killing it,
0149                     // we relinquish ownership of it but remember that it was ours once and
0150                     // is still out there, somewhere, biding its time
0151                     m_worker_states[worker_id]->thread->detach();
0152                     LOG_DEBUG(GetLogger()) << "Detached worker " << worker_id << LOG_END;
0153                 }
0154                 else {
0155                     LOG_DEBUG(GetLogger()) << "Joining worker " << worker_id << LOG_END;
0156                     m_worker_states[worker_id]->thread->join();
0157                     LOG_DEBUG(GetLogger()) << "Joined worker " << worker_id << LOG_END;
0158                 }
0159             }
0160             else {
0161                 LOG_DEBUG(GetLogger()) << "Skipping worker " << worker_id << LOG_END;
0162             }
0163         }
0164 
0165         lock.lock();
0166         // We retake the mutex so we can safely modify m_worker_states
0167         for (int worker_id=prev_nthreads-1; worker_id >= (int)nthreads; --worker_id) {
0168             if (m_worker_states.back()->thread != nullptr) {
0169                 delete m_worker_states.back()->thread;
0170             }
0171             m_worker_states.pop_back();
0172         }
0173     }
0174 }
0175 
0176 void JExecutionEngine::PauseTopology() {
0177     std::unique_lock<std::mutex> lock(m_mutex);
0178     if (m_runstatus != RunStatus::Running) return;
0179     m_runstatus = RunStatus::Pausing;
0180     for (auto& arrow: m_arrow_states) {
0181         if (arrow.status == ArrowState::Status::Running) {
0182             arrow.status = ArrowState::Status::Paused;
0183         }
0184     }
0185     LOG_WARN(GetLogger()) << "Requested pause" << LOG_END;
0186     lock.unlock();
0187     m_condvar.notify_all();
0188 }
0189 
0190 void JExecutionEngine::DrainTopology() {
0191     std::unique_lock<std::mutex> lock(m_mutex);
0192     if (m_runstatus != RunStatus::Running) return;
0193     m_runstatus = RunStatus::Draining;
0194     for (auto& arrow: m_arrow_states) {
0195         if (arrow.is_source) {
0196             if (arrow.status == ArrowState::Status::Running) {
0197                 arrow.status = ArrowState::Status::Paused;
0198             }
0199         }
0200     }
0201     LOG_WARN(GetLogger()) << "Requested drain" << LOG_END;
0202     lock.unlock();
0203     m_condvar.notify_all();
0204 }
0205 
0206 void JExecutionEngine::RunSupervisor() {
0207 
0208     m_interrupt_status = InterruptStatus::NoInterruptsSupervised;
0209     size_t last_event_count = 0;
0210     clock_t::time_point last_measurement_time = clock_t::now();
0211 
0212     Perf perf;
0213     while (true) {
0214 
0215         if (m_enable_timeout && m_timeout_s > 0) {
0216             CheckTimeout();
0217         }
0218 
0219         if (m_print_worker_report_requested) {
0220             PrintWorkerReport(false);
0221             m_print_worker_report_requested = false;
0222         }
0223 
0224         if (m_send_worker_report_requested) {
0225             PrintWorkerReport(true);
0226             m_send_worker_report_requested = false;
0227         }
0228 
0229         perf = GetPerf();
0230         if ((perf.runstatus == RunStatus::Paused && m_interrupt_status != InterruptStatus::InspectRequested) || 
0231             perf.runstatus == RunStatus::Finished || 
0232             perf.runstatus == RunStatus::Failed) {
0233             break;
0234         }
0235 
0236         if (m_interrupt_status == InterruptStatus::InspectRequested) {
0237             if (perf.runstatus == RunStatus::Paused) {
0238                 LOG_INFO(GetLogger()) << "Entering inspector" << LOG_END;
0239                 m_enable_timeout = false;
0240                 m_interrupt_status = InterruptStatus::InspectInProgress;
0241                 InspectApplication(GetApplication());
0242                 m_interrupt_status = InterruptStatus::NoInterruptsSupervised;
0243 
0244                 // Jump back to the top of the loop so that we have fresh event count data
0245                 last_measurement_time = clock_t::now();
0246                 last_event_count = 0;
0247                 continue; 
0248             }
0249             else if (perf.runstatus == RunStatus::Running) {
0250                 PauseTopology();
0251             }
0252         }
0253         else if (m_interrupt_status == InterruptStatus::PauseAndQuit) {
0254             PauseTopology();
0255         }
0256 
0257         if (m_show_ticker) {
0258             auto next_measurement_time = clock_t::now();
0259             auto last_measurement_duration_ms = std::chrono::duration_cast<std::chrono::milliseconds>(next_measurement_time - last_measurement_time).count();
0260             float latest_throughput_hz = (last_measurement_duration_ms == 0) ? 0 : (perf.event_count - last_event_count) * 1000.0 / last_measurement_duration_ms;
0261             last_measurement_time = next_measurement_time;
0262             last_event_count = perf.event_count;
0263 
0264             // Print rates
0265             LOG_WARN(m_logger) << "Status: " << perf.event_count << " events processed at "
0266                             << JTypeInfo::to_string_with_si_prefix(latest_throughput_hz) << "Hz ("
0267                             << JTypeInfo::to_string_with_si_prefix(perf.throughput_hz) << "Hz avg)" << LOG_END;
0268         }
0269 
0270         std::this_thread::sleep_for(std::chrono::milliseconds(m_ticker_ms));
0271     }
0272     LOG_INFO(GetLogger()) << "Processing paused." << LOG_END;
0273 
0274     if (perf.runstatus == RunStatus::Failed) {
0275         HandleFailures();
0276     }
0277 
0278     PrintFinalReport();
0279 }
0280 
0281 bool JExecutionEngine::CheckTimeout() {
0282     std::unique_lock<std::mutex> lock(m_mutex);
0283     auto now = clock_t::now();
0284     bool timeout_detected = false;
0285     for (auto& worker: m_worker_states) {
0286         auto timeout_s = (worker->is_event_warmed_up) ? m_timeout_s : m_warmup_timeout_s;
0287         auto duration_s = std::chrono::duration_cast<std::chrono::seconds>(now - worker->last_checkout_time).count();
0288         if (duration_s > timeout_s) {
0289             worker->is_timed_out = true;
0290             timeout_detected = true;
0291             m_runstatus = RunStatus::Failed;
0292         }
0293     }
0294     return timeout_detected;
0295 }
0296 
0297 void JExecutionEngine::HandleFailures() {
0298 
0299     std::unique_lock<std::mutex> lock(m_mutex);
0300 
0301     // First, we log all of the failures we've found
0302     for (auto& worker: m_worker_states) {
0303         const auto& arrow_name = m_topology->arrows[worker->last_arrow_id]->get_name();
0304         if (worker->is_timed_out) {
0305             LOG_FATAL(GetLogger()) << "Timeout in worker thread " << worker->worker_id << " while executing " << arrow_name << " on event #" << worker->last_event_nr << LOG_END;
0306             pthread_kill(worker->thread->native_handle(), SIGUSR2);
0307             LOG_INFO(GetLogger()) << "Worker thread signalled; waiting for backtrace capture." << LOG_END;
0308             worker->backtrace.WaitForCapture();
0309         }
0310         if (worker->stored_exception != nullptr) {
0311             LOG_FATAL(GetLogger()) << "Exception in worker thread " << worker->worker_id << " while executing " << arrow_name << " on event #" << worker->last_event_nr << LOG_END;
0312         }
0313     }
0314 
0315     // Now we throw each of these exceptions in order, in case the caller is going to attempt to catch them.
0316     // In reality all callers are going to print everything they can about the exception and exit.
0317     for (auto& worker: m_worker_states) {
0318         if (worker->stored_exception != nullptr) {
0319             GetApplication()->SetExitCode((int) JApplication::ExitCode::UnhandledException);
0320             std::rethrow_exception(worker->stored_exception);
0321         }
0322         if (worker->is_timed_out) {
0323             GetApplication()->SetExitCode((int) JApplication::ExitCode::Timeout);
0324             auto ex = JException("Timeout in worker thread");
0325             ex.backtrace = worker->backtrace;
0326             throw ex;
0327         }
0328     }
0329 }
0330 
0331 void JExecutionEngine::FinishTopology() {
0332     std::unique_lock<std::mutex> lock(m_mutex);
0333     assert(m_runstatus == RunStatus::Paused);
0334 
0335     LOG_DEBUG(GetLogger()) << "Finishing processing..." << LOG_END;
0336     for (auto* arrow : m_topology->arrows) {
0337         arrow->finalize();
0338     }
0339     for (auto* pool: m_topology->pools) {
0340         pool->Finalize();
0341     }
0342     m_runstatus = RunStatus::Finished;
0343     LOG_INFO(GetLogger()) << "Finished processing." << LOG_END;
0344 }
0345 
0346 JExecutionEngine::RunStatus JExecutionEngine::GetRunStatus() {
0347     std::unique_lock<std::mutex> lock(m_mutex);
0348     return m_runstatus;
0349 }
0350 
0351 JExecutionEngine::Perf JExecutionEngine::GetPerf() {
0352     std::unique_lock<std::mutex> lock(m_mutex);
0353     Perf result;
0354     if (m_runstatus == RunStatus::Paused || m_runstatus == RunStatus::Failed) {
0355         result.event_count = m_event_count_at_finish - m_event_count_at_start;
0356         result.uptime_ms = std::chrono::duration_cast<std::chrono::milliseconds>(m_time_at_finish - m_time_at_start).count();
0357     }
0358     else {
0359         // Obtain current event count
0360         size_t current_event_count = 0;
0361         for (auto& state : m_arrow_states) {
0362             if (state.is_sink) {
0363                 current_event_count += state.events_processed;
0364             }
0365         }
0366         result.event_count = current_event_count - m_event_count_at_start;
0367         result.uptime_ms = std::chrono::duration_cast<std::chrono::milliseconds>(clock_t::now() - m_time_at_start).count();
0368     }
0369     result.runstatus = m_runstatus;
0370     result.thread_count = m_worker_states.size();
0371     result.throughput_hz = (result.uptime_ms == 0) ? 0 : (result.event_count * 1000.0) / result.uptime_ms;
0372     result.event_level = JEventLevel::PhysicsEvent;
0373     return result;
0374 }
0375 
0376 JExecutionEngine::Worker JExecutionEngine::RegisterWorker() {
0377     std::unique_lock<std::mutex> lock(m_mutex);
0378     auto worker_id = m_worker_states.size();
0379     auto worker = std::make_unique<WorkerState>();
0380     worker->worker_id = worker_id;
0381     worker->is_stop_requested = false;
0382     worker->cpu_id = m_topology->mapping.get_cpu_id(worker_id);
0383     worker->location_id = m_topology->mapping.get_loc_id(worker_id);
0384     worker->thread = nullptr;
0385     m_worker_states.push_back(std::move(worker));
0386 
0387     bool pin_to_cpu = (m_topology->mapping.get_affinity() != JProcessorMapping::AffinityStrategy::None);
0388     if (pin_to_cpu) {
0389         JCpuInfo::PinThreadToCpu(worker->thread, worker->cpu_id);
0390     }
0391     return {worker_id, &worker->backtrace};
0392 
0393 }
0394 
0395 
0396 void JExecutionEngine::RunWorker(Worker worker) {
0397 
0398     LOG_DEBUG(GetLogger()) << "Launched worker thread " << worker.worker_id << LOG_END;
0399     jana2_worker_id = worker.worker_id;
0400     jana2_worker_backtrace = worker.backtrace;
0401     try {
0402         Task task;
0403         while (true) {
0404             ExchangeTask(task, worker.worker_id);
0405             if (task.arrow == nullptr) break; // Exit as soon as ExchangeTask() stops blocking
0406             task.arrow->fire(task.input_event, task.outputs, task.output_count, task.status);
0407         }
0408         LOG_DEBUG(GetLogger()) << "Stopped worker thread " << worker.worker_id << LOG_END;
0409     }
0410     catch (...) {
0411         LOG_ERROR(GetLogger()) << "Exception on worker thread " << worker.worker_id << LOG_END;
0412         std::unique_lock<std::mutex> lock(m_mutex);
0413         m_runstatus = RunStatus::Failed;
0414         m_worker_states.at(worker.worker_id)->stored_exception = std::current_exception();
0415     }
0416 }
0417 
0418 
0419 void JExecutionEngine::ExchangeTask(Task& task, size_t worker_id, bool nonblocking) {
0420 
0421     auto checkin_time = std::chrono::steady_clock::now();
0422     // It's important to start measuring this _before_ acquiring the lock because acquiring the lock
0423     // may be a big part of the scheduler overhead
0424 
0425     std::unique_lock<std::mutex> lock(m_mutex);
0426     
0427     auto& worker = *m_worker_states.at(worker_id);
0428 
0429     if (task.arrow != nullptr) {
0430         CheckinCompletedTask_Unsafe(task, worker, checkin_time);
0431     }
0432 
0433     if (worker.is_stop_requested) {
0434         return;
0435     }
0436 
0437     FindNextReadyTask_Unsafe(task, worker);
0438 
0439     if (nonblocking) { return; }
0440     auto idle_time_start = clock_t::now();
0441     m_total_scheduler_duration += (idle_time_start - checkin_time);
0442 
0443     while (task.arrow == nullptr && !worker.is_stop_requested) {
0444         m_condvar.wait(lock);
0445         FindNextReadyTask_Unsafe(task, worker);
0446     }
0447     worker.last_checkout_time = clock_t::now();
0448 
0449     if (task.input_event != nullptr) {
0450         worker.last_event_nr = task.input_event->GetEventNumber();
0451     }
0452     else {
0453         worker.last_event_nr = 0;
0454     }
0455     m_total_idle_duration += (worker.last_checkout_time - idle_time_start);
0456 
0457     // Notify one worker, who will notify the next, etc, as long as FindNextReadyTaskUnsafe() succeeds.
0458     // After FindNextReadyTaskUnsafe fails, all threads block until the next returning worker reactivates the
0459     // notification chain.
0460     m_condvar.notify_one();
0461 }
0462 
0463 
0464 void JExecutionEngine::CheckinCompletedTask_Unsafe(Task& task, WorkerState& worker, clock_t::time_point checkin_time) {
0465 
0466     auto processing_duration = checkin_time - worker.last_checkout_time;
0467 
0468     ArrowState& arrow_state = m_arrow_states.at(worker.last_arrow_id);
0469 
0470     arrow_state.active_tasks -= 1;
0471     arrow_state.total_processing_duration += processing_duration;
0472 
0473     for (size_t output=0; output<task.output_count; ++output) {
0474         if (!task.arrow->get_port(task.outputs[output].second).is_input) {
0475             arrow_state.events_processed++;
0476         }
0477     }
0478 
0479     // Put each output in its correct queue or pool
0480     task.arrow->push(task.outputs, task.output_count, worker.location_id);
0481 
0482     if (task.status == JArrow::FireResult::Finished) {
0483         // If this is an eventsource self-terminating (the only thing that returns Status::Finished right now) it will
0484         // have already called DoClose(). I'm tempted to always call DoClose() as part of JExecutionEngine::Finish() instead, however.
0485 
0486         // Mark arrow as finished
0487         arrow_state.status = ArrowState::Status::Finished;
0488 
0489         // Check if this switches the topology to Draining()
0490         if (m_runstatus == RunStatus::Running) {
0491             bool draining = true;
0492             for (auto& arrow: m_arrow_states) {
0493                 if (arrow.is_source && arrow.status == ArrowState::Status::Running) {
0494                     draining = false;
0495                 }
0496             }
0497             if (draining) {
0498                 m_runstatus = RunStatus::Draining;
0499             }
0500         }
0501     }
0502     worker.last_arrow_id = -1;
0503     worker.last_event_nr = 0;
0504 
0505     task.arrow = nullptr;
0506     task.input_event = nullptr;
0507     task.output_count = 0;
0508     task.status = JArrow::FireResult::NotRunYet;
0509 };
0510 
0511 
0512 void JExecutionEngine::FindNextReadyTask_Unsafe(Task& task, WorkerState& worker) {
0513 
0514     if (m_runstatus == RunStatus::Running || m_runstatus == RunStatus::Draining) {
0515         // We only pick up a new task if the topology is running or draining.
0516 
0517         // Each call to FindNextReadyTask_Unsafe() starts with a different m_next_arrow_id to ensure balanced arrow assignments
0518         size_t arrow_count = m_arrow_states.size();
0519         m_next_arrow_id += 1;
0520         m_next_arrow_id %= arrow_count;
0521 
0522         for (size_t i=m_next_arrow_id; i<(m_next_arrow_id+arrow_count); ++i) {
0523             size_t arrow_id = i % arrow_count;
0524 
0525             auto& state = m_arrow_states[arrow_id];
0526             if (!state.is_parallel && (state.active_tasks != 0)) {
0527                 // We've found a sequential arrow that is already active. Nothing we can do here.
0528                 LOG_TRACE(GetLogger()) << "Scheduler: Arrow with id " << arrow_id << " is unready: Sequential and already active." << LOG_END;
0529                 continue;
0530             }
0531 
0532             if (state.status != ArrowState::Status::Running) {
0533                 LOG_TRACE(GetLogger()) << "Scheduler: Arrow with id " << arrow_id << " is unready: Arrow is either paused or finished." << LOG_END;
0534                 continue;
0535             }
0536             // TODO: Support next_visit_time so that we don't hammer blocked event sources
0537 
0538             // See if we can obtain an input event (this is silly)
0539             JArrow* arrow = m_topology->arrows[arrow_id];
0540             // TODO: consider setting state.next_input, retrieving via fire()
0541             auto port = arrow->get_next_port_index();
0542             JEvent* event = (port == -1) ? nullptr : arrow->pull(port, worker.location_id);
0543             if (event != nullptr || port == -1) {
0544                 LOG_TRACE(GetLogger()) << "Scheduler: Found next ready arrow with id " << arrow_id << LOG_END;
0545                 // We've found a task that is ready!
0546                 state.active_tasks += 1;
0547 
0548                 task.arrow = arrow;
0549                 task.input_port = port;
0550                 task.input_event = event;
0551                 task.output_count = 0;
0552                 task.status = JArrow::FireResult::NotRunYet;
0553 
0554                 worker.last_arrow_id = arrow_id;
0555                 if (event != nullptr) {
0556                     worker.is_event_warmed_up = event->IsWarmedUp();
0557                     worker.last_event_nr = event->GetEventNumber();
0558                 }
0559                 else {
0560                     worker.is_event_warmed_up = true; // Use shorter timeout
0561                     worker.last_event_nr = 0;
0562                 }
0563                 return;
0564             }
0565             else {
0566                 LOG_TRACE(GetLogger()) << "Scheduler: Arrow with id " << arrow_id << " is unready: Input event is needed but not on queue yet." << LOG_END;
0567             }
0568         }
0569     }
0570 
0571     // Because we reached this point, we know that there aren't any tasks ready,
0572     // so we check whether more are potentially coming. If not, we can pause the topology.
0573     // Note that our worker threads will still wait at ExchangeTask() until they get
0574     // shut down separately during Scale().
0575     
0576     if (m_runstatus == RunStatus::Running || m_runstatus == RunStatus::Pausing || m_runstatus == RunStatus::Draining) {
0577         // We want to avoid scenarios such as where the topology already Finished but then gets reset to Paused
0578         // This also leaves a cleaner narrative in the logs. 
0579 
0580         bool any_active_source_found = false;
0581         bool any_active_task_found = false;
0582         
0583         LOG_DEBUG(GetLogger()) << "Scheduler: No tasks ready" << LOG_END;
0584 
0585         for (size_t arrow_id = 0; arrow_id < m_arrow_states.size(); ++arrow_id) {
0586             auto& state = m_arrow_states[arrow_id];
0587             any_active_source_found |= (state.status == ArrowState::Status::Running && state.is_source);
0588             any_active_task_found |= (state.active_tasks != 0);
0589             // A source might have been deactivated by RequestPause, Ctrl-C, etc, and might be inactive even though it still has active tasks
0590         }
0591 
0592         if (!any_active_source_found && !any_active_task_found) {
0593             // Pause the topology
0594             m_time_at_finish = clock_t::now();
0595             m_event_count_at_finish = 0;
0596             for (auto& arrow_state : m_arrow_states) {
0597                 if (arrow_state.is_sink) {
0598                     m_event_count_at_finish += arrow_state.events_processed;
0599                 }
0600             }
0601             LOG_DEBUG(GetLogger()) << "Scheduler: Processing paused" << LOG_END;
0602             m_runstatus = RunStatus::Paused;
0603             // I think this is the ONLY site where the topology gets paused. Verify this?
0604         }
0605     }
0606 
0607     worker.last_arrow_id = -1;
0608 
0609     task.arrow = nullptr;
0610     task.input_port = -1;
0611     task.input_event = nullptr;
0612     task.output_count = 0;
0613     task.status = JArrow::FireResult::NotRunYet;
0614 }
0615 
0616 
0617 void JExecutionEngine::PrintFinalReport() {
0618 
0619     std::unique_lock<std::mutex> lock(m_mutex);
0620     auto event_count = m_event_count_at_finish - m_event_count_at_start;
0621     auto uptime_ms = std::chrono::duration_cast<std::chrono::milliseconds>(m_time_at_finish - m_time_at_start).count();
0622     auto thread_count = m_worker_states.size();
0623     auto throughput_hz = (event_count * 1000.0) / uptime_ms;
0624 
0625     LOG_INFO(GetLogger()) << "Detailed report:" << LOG_END;
0626     LOG_INFO(GetLogger()) << LOG_END;
0627     LOG_INFO(GetLogger()) << "  Avg throughput [Hz]:         " << std::setprecision(3) << throughput_hz << LOG_END;
0628     LOG_INFO(GetLogger()) << "  Completed events [count]:    " << event_count << LOG_END;
0629     LOG_INFO(GetLogger()) << "  Total uptime [s]:            " << std::setprecision(4) << uptime_ms/1000.0 << LOG_END;
0630     LOG_INFO(GetLogger()) << "  Thread team size [count]:    " << thread_count << LOG_END;
0631     LOG_INFO(GetLogger()) << LOG_END;
0632     LOG_INFO(GetLogger()) << "  Arrow-level metrics:" << LOG_END;
0633     LOG_INFO(GetLogger()) << LOG_END;
0634 
0635     size_t total_useful_ms = 0;
0636 
0637     for (size_t arrow_id=0; arrow_id < m_arrow_states.size(); ++arrow_id) {
0638         auto* arrow = m_topology->arrows[arrow_id];
0639         auto& arrow_state = m_arrow_states[arrow_id];
0640         auto useful_ms = std::chrono::duration_cast<std::chrono::milliseconds>(arrow_state.total_processing_duration).count();
0641         total_useful_ms += useful_ms;
0642         auto avg_latency = useful_ms*1.0/arrow_state.events_processed;
0643         auto throughput_bottleneck = 1000.0 / avg_latency;
0644         if (arrow->is_parallel()) {
0645             throughput_bottleneck *= thread_count;
0646         }
0647 
0648         LOG_INFO(GetLogger()) << "  - Arrow name:                 " << arrow->get_name() << LOG_END;
0649         LOG_INFO(GetLogger()) << "    Parallel:                   " << arrow->is_parallel() << LOG_END;
0650         LOG_INFO(GetLogger()) << "    Events completed:           " << arrow_state.events_processed << LOG_END;
0651         LOG_INFO(GetLogger()) << "    Avg latency [ms/event]:     " << avg_latency << LOG_END;
0652         LOG_INFO(GetLogger()) << "    Throughput bottleneck [Hz]: " << throughput_bottleneck << LOG_END;
0653         LOG_INFO(GetLogger()) << LOG_END;
0654     }
0655 
0656     auto total_scheduler_ms = std::chrono::duration_cast<std::chrono::milliseconds>(m_total_scheduler_duration).count();
0657     auto total_idle_ms = std::chrono::duration_cast<std::chrono::milliseconds>(m_total_idle_duration).count();
0658 
0659     LOG_INFO(GetLogger()) << "  Total useful time [s]:     " << std::setprecision(6) << total_useful_ms/1000.0 << LOG_END;
0660     LOG_INFO(GetLogger()) << "  Total scheduler time [s]:  " << std::setprecision(6) << total_scheduler_ms/1000.0 << LOG_END;
0661     LOG_INFO(GetLogger()) << "  Total idle time [s]:       " << std::setprecision(6) << total_idle_ms/1000.0 << LOG_END;
0662 
0663     LOG_INFO(GetLogger()) << LOG_END;
0664 
0665     LOG_WARN(GetLogger()) << "Final report: " << event_count << " events processed at "
0666                           << JTypeInfo::to_string_with_si_prefix(throughput_hz) << "Hz" << LOG_END;
0667 
0668 }
0669 
0670 void JExecutionEngine::SetTickerEnabled(bool show_ticker) {
0671     m_show_ticker = show_ticker;
0672 }
0673 
0674 bool JExecutionEngine::IsTickerEnabled() const {
0675     return m_show_ticker;
0676 }
0677 
0678 void JExecutionEngine::SetTimeoutEnabled(bool timeout_enabled) {
0679     m_enable_timeout = timeout_enabled;
0680 }
0681 
0682 bool JExecutionEngine::IsTimeoutEnabled() const {
0683     return m_enable_timeout;
0684 }
0685 
0686 JArrow::FireResult JExecutionEngine::Fire(size_t arrow_id, size_t location_id) {
0687 
0688     std::unique_lock<std::mutex> lock(m_mutex);
0689     if (arrow_id >= m_topology->arrows.size()) {
0690         LOG_WARN(GetLogger()) << "Firing unsuccessful: No arrow exists with id=" << arrow_id << LOG_END;
0691         return JArrow::FireResult::NotRunYet;
0692     }
0693     JArrow* arrow = m_topology->arrows[arrow_id];
0694     LOG_WARN(GetLogger()) << "Attempting to fire arrow with name=" << arrow->get_name() 
0695                           << ", index=" << arrow_id << ", location=" << location_id << LOG_END;
0696 
0697     ArrowState& arrow_state = m_arrow_states[arrow_id];
0698     if (arrow_state.status == ArrowState::Status::Finished) {
0699         LOG_WARN(GetLogger()) << "Firing unsuccessful: Arrow status is Finished." << arrow_id << LOG_END;
0700         return JArrow::FireResult::Finished;
0701     }
0702     if (!arrow_state.is_parallel && arrow_state.active_tasks != 0) {
0703         LOG_WARN(GetLogger()) << "Firing unsuccessful: Arrow is sequential and already has an active task." << arrow_id << LOG_END;
0704         return JArrow::FireResult::NotRunYet;
0705     }
0706     arrow_state.active_tasks += 1;
0707 
0708     auto port = arrow->get_next_port_index();
0709     JEvent* event = nullptr;
0710     if (port != -1) {
0711         event = arrow->pull(port, location_id);
0712         if (event == nullptr) {
0713             LOG_WARN(GetLogger()) << "Firing unsuccessful: Arrow needs an input event from port " << port << ", but the queue or pool is empty." << LOG_END;
0714             arrow_state.active_tasks -= 1;
0715             return JArrow::FireResult::NotRunYet;
0716         }
0717         else {
0718             LOG_WARN(GetLogger()) << "Input event #" << event->GetEventNumber() << " from port " << port << LOG_END;
0719         }
0720     }
0721     else {
0722         LOG_WARN(GetLogger()) << "No input events" << LOG_END;
0723     }
0724     lock.unlock();
0725 
0726     size_t output_count;
0727     JArrow::OutputData outputs;
0728     JArrow::FireResult result = JArrow::FireResult::NotRunYet;
0729 
0730     LOG_WARN(GetLogger()) << "Firing arrow" << LOG_END;
0731     arrow->fire(event, outputs, output_count, result);
0732     LOG_WARN(GetLogger()) << "Fired arrow with result " << to_string(result) << LOG_END;
0733     if (output_count == 0) {
0734         LOG_WARN(GetLogger()) << "No output events" << LOG_END;
0735     }
0736     else {
0737         for (size_t i=0; i<output_count; ++i) {
0738             LOG_WARN(GetLogger()) << "Output event #" << outputs.at(i).first->GetEventNumber() << " on port " << outputs.at(i).second << LOG_END;
0739         }
0740     }
0741 
0742     lock.lock();
0743     arrow->push(outputs, output_count, location_id);
0744     arrow_state.active_tasks -= 1;
0745     lock.unlock();
0746     return result;
0747 }
0748 
0749 
0750 void JExecutionEngine::HandleSIGINT() {
0751     InterruptStatus status = m_interrupt_status;
0752     std::cout << std::endl;
0753     switch (status) {
0754         case InterruptStatus::NoInterruptsSupervised: m_interrupt_status = InterruptStatus::InspectRequested; break;
0755         case InterruptStatus::InspectRequested: m_interrupt_status = InterruptStatus::PauseAndQuit; break;
0756         case InterruptStatus::NoInterruptsUnsupervised:
0757         case InterruptStatus::PauseAndQuit:
0758         case InterruptStatus::InspectInProgress: 
0759             _exit(-2);
0760     }
0761 }
0762 
0763 void JExecutionEngine::HandleSIGUSR1() {
0764     m_send_worker_report_requested = true;
0765 }
0766 
0767 void JExecutionEngine::HandleSIGUSR2() {
0768     if (jana2_worker_backtrace != nullptr) {
0769         jana2_worker_backtrace->Capture(3);
0770     }
0771 }
0772 
0773 void JExecutionEngine::HandleSIGTSTP() {
0774     std::cout << std::endl;
0775     m_print_worker_report_requested = true;
0776 }
0777 
0778 void JExecutionEngine::PrintWorkerReport(bool send_to_pipe) {
0779 
0780     std::unique_lock<std::mutex> lock(m_mutex);
0781     LOG_INFO(GetLogger()) << "Generating worker report. It may take some time to retrieve each symbol's debug information." << LOG_END;
0782     for (auto& worker: m_worker_states) {
0783         worker->backtrace.Reset();
0784         pthread_kill(worker->thread->native_handle(), SIGUSR2);
0785     }
0786     for (auto& worker: m_worker_states) {
0787         worker->backtrace.WaitForCapture();
0788     }
0789     std::ostringstream oss;
0790     oss << "Worker report" << std::endl;
0791     for (auto& worker: m_worker_states) {
0792         oss << "------------------------------" << std::endl 
0793             << "  Worker:        " << worker->worker_id << std::endl
0794             << "  Current arrow: " << worker->last_arrow_id << std::endl
0795             << "  Current event: " << worker->last_event_nr << std::endl
0796             << "  Backtrace:" << std::endl << std::endl
0797             << worker->backtrace.ToString();
0798     }
0799     auto s = oss.str();
0800     LOG_WARN(GetLogger()) << s << LOG_END;
0801 
0802     if (send_to_pipe) {
0803 
0804         int fd = open(m_path_to_named_pipe.c_str(), O_WRONLY);
0805         if (fd >= 0) {
0806             write(fd, s.c_str(), s.length()+1);
0807             close(fd);
0808         }
0809         else {
0810             LOG_ERROR(GetLogger()) << "Unable to open named pipe '" << m_path_to_named_pipe << "' for writing. \n"
0811             << "  You can use a different named pipe for status info by setting the parameter `jana:status_fname`.\n"
0812             << "  The status report will still show up in the log." << LOG_END;
0813         }
0814     }
0815 }
0816 
0817 
0818 std::string ToString(JExecutionEngine::RunStatus runstatus) {
0819     switch(runstatus) {
0820         case JExecutionEngine::RunStatus::Running: return "Running";
0821         case JExecutionEngine::RunStatus::Paused: return "Paused";
0822         case JExecutionEngine::RunStatus::Failed: return "Failed";
0823         case JExecutionEngine::RunStatus::Pausing: return "Pausing";
0824         case JExecutionEngine::RunStatus::Draining: return "Draining";
0825         case JExecutionEngine::RunStatus::Finished: return "Finished";
0826         default: return "CorruptedRunStatus";
0827     }
0828 }
0829