Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-07-15 09:15:50

0001 
0002 #include "JExecutionEngine.h"
0003 #include <JANA/Utils/JApplicationInspector.h>
0004 
0005 #include <chrono>
0006 #include <cstddef>
0007 #include <cstdio>
0008 #include <ctime>
0009 #include <exception>
0010 #include <mutex>
0011 #include <csignal>
0012 #include <ostream>
0013 #include <sstream>
0014 #include <sys/stat.h>
0015 #include <fcntl.h>
0016 #include <unistd.h>
0017 
0018 thread_local int jana2_worker_id = -1;
0019 thread_local JBacktrace* jana2_worker_backtrace = nullptr;
0020 
0021 void JExecutionEngine::Init() {
0022     auto params = GetApplication()->GetJParameterManager();
0023 
0024     params->SetDefaultParameter("jana:timeout", m_timeout_s, 
0025         "Max time (in seconds) JANA will wait for a thread to update its heartbeat before hard-exiting. 0 to disable timeout completely.");
0026 
0027     params->SetDefaultParameter("jana:warmup_timeout", m_warmup_timeout_s, 
0028         "Max time (in seconds) JANA will wait for 'initial' events to complete before hard-exiting.");
0029 
0030     params->SetDefaultParameter("jana:backoff_interval", m_backoff_ms, 
0031         "Max time (in seconds) JANA will wait for 'initial' events to complete before hard-exiting.");
0032 
0033     params->SetDefaultParameter("jana:show_ticker", m_show_ticker, "Controls whether the ticker is visible");
0034 
0035     params->SetDefaultParameter("jana:ticker_interval", m_ticker_ms, "Controls the ticker interval (in ms)");
0036 
0037     auto p = params->SetDefaultParameter("jana:status_fname", m_path_to_named_pipe,
0038         "Filename of named pipe for retrieving instantaneous status info");
0039 
0040     size_t pid = getpid();
0041     mkfifo(m_path_to_named_pipe.c_str(), 0666);
0042 
0043     LOG_WARN(GetLogger()) << "To pause processing and inspect, press Ctrl-C." << LOG_END;
0044     LOG_WARN(GetLogger()) << "For a clean shutdown, press Ctrl-C twice." << LOG_END;
0045     LOG_WARN(GetLogger()) << "For a hard shutdown, press Ctrl-C three times." << LOG_END;
0046 
0047     if (p->IsDefault()) {
0048         LOG_WARN(GetLogger()) << "For worker status information, press Ctrl-Z, or run `jana-status " << pid << "`" << LOG_END;
0049     }
0050     else {
0051         LOG_WARN(GetLogger()) << "For worker status information, press Ctrl-Z, or run `jana-status " << pid << " " << m_path_to_named_pipe << "`" << LOG_END;
0052     }
0053 
0054 
0055     // Not sure how I feel about putting this here yet, but I think it will at least work in both cases it needs to.
0056     // The reason this works is because JTopologyBuilder::create_topology() has already been called before 
0057     // JApplication::ProvideService<JExecutionEngine>().
0058     for (JArrow* arrow : m_topology->arrows) {
0059 
0060         arrow->initialize();
0061 
0062         m_arrow_states.emplace_back();
0063         auto& arrow_state = m_arrow_states.back();
0064         arrow_state.is_source = arrow->is_source();
0065         arrow_state.is_sink = arrow->is_sink();
0066         arrow_state.is_parallel = arrow->is_parallel();
0067         arrow_state.next_input = arrow->get_next_port_index();
0068     }
0069 }
0070 
0071 void JExecutionEngine::RequestInspector() {
0072     std::unique_lock<std::mutex> lock(m_mutex);
0073     m_interrupt_status = InterruptStatus::InspectRequested;
0074 }
0075 
0076 void JExecutionEngine::RunTopology() {
0077     std::unique_lock<std::mutex> lock(m_mutex);
0078 
0079     if (m_runstatus == RunStatus::Failed) {
0080         throw JException("Cannot switch topology runstatus to Running because it is already Failed");
0081     }
0082     if (m_runstatus == RunStatus::Finished) {
0083         throw JException("Cannot switch topology runstatus to Running because it is already Finished");
0084     }
0085     if (m_arrow_states.size() == 0) {
0086         throw JException("Cannot execute an empty topology! Hint: Have you provided an event source?");
0087     }
0088 
0089     // Set start time and event count
0090     m_time_at_start = clock_t::now();
0091     m_event_count_at_start = m_event_count_at_finish;
0092 
0093     // Reactivate topology
0094     for (auto& arrow: m_arrow_states) {
0095         if (arrow.status == ArrowState::Status::Paused) {
0096             arrow.status = ArrowState::Status::Running;
0097         }
0098     }
0099 
0100     m_runstatus = RunStatus::Running;
0101 
0102     lock.unlock();
0103     m_condvar.notify_one();
0104 }
0105 
0106 void JExecutionEngine::ScaleWorkers(size_t nthreads) {
0107     // We both create and destroy the pool of workers here. They all sleep until they 
0108     // receive work from the scheduler, which won't happen until the runstatus <- {Running, 
0109     // Pausing, Draining} and there is a task ready to execute. This way worker creation/destruction
0110     // is decoupled from topology execution.
0111 
0112     // If we scale to zero, no workers will run. This is useful for testing, and also for using
0113     // an external thread team, should the need arise.
0114 
0115     std::unique_lock<std::mutex> lock(m_mutex);
0116 
0117     if (nthreads != 0 && m_arrow_states.size() == 0) {
0118         // We check that (nthreads != 0) because this gets called at shutdown even if the topology wasn't run
0119         // Remember, we want JApplication::Initialize() to succeed and JMain to shut down cleanly even when the topology is empty
0120         throw JException("Cannot execute an empty topology! Hint: Have you provided an event source?");
0121     }
0122 
0123     auto prev_nthreads = m_worker_states.size();
0124 
0125     if (prev_nthreads < nthreads) {
0126         // We are launching additional worker threads
0127         LOG_DEBUG(GetLogger()) << "Scaling up to " << nthreads << " worker threads" << LOG_END;
0128         for (size_t worker_id=prev_nthreads; worker_id < nthreads; ++worker_id) {
0129             auto worker = std::make_unique<WorkerState>();
0130             worker->worker_id = worker_id;
0131             worker->is_stop_requested = false;
0132             worker->cpu_id = m_topology->mapping.get_cpu_id(worker_id);
0133             worker->location_id = m_topology->mapping.get_loc_id(worker_id);
0134             worker->thread = new std::thread(&JExecutionEngine::RunWorker, this, Worker{worker_id, &worker->backtrace});
0135             LOG_DEBUG(GetLogger()) << "Launching worker thread " << worker_id << " on cpu=" << worker->cpu_id << ", location=" << worker->location_id << LOG_END;
0136             m_worker_states.push_back(std::move(worker));
0137 
0138             bool pin_to_cpu = (m_topology->mapping.get_affinity() != JProcessorMapping::AffinityStrategy::None);
0139             if (pin_to_cpu) {
0140                 JCpuInfo::PinThreadToCpu(worker->thread, worker->cpu_id);
0141             }
0142         }
0143     }
0144 
0145     else if (prev_nthreads > nthreads) {
0146         // We are destroying existing worker threads
0147         LOG_DEBUG(GetLogger()) << "Scaling down to " << nthreads << " worker threads" << LOG_END;
0148 
0149         // Signal to threads that they need to terminate.
0150         for (int worker_id=prev_nthreads-1; worker_id >= (int)nthreads; --worker_id) {
0151             LOG_DEBUG(GetLogger()) << "Stopping worker " << worker_id << LOG_END;
0152             m_worker_states[worker_id]->is_stop_requested = true;
0153         }
0154         m_condvar.notify_all(); // Wake up all threads so that they can exit the condvar wait loop
0155         lock.unlock();
0156 
0157         // We join all (eligible) threads _outside_ of the mutex
0158         for (int worker_id=prev_nthreads-1; worker_id >= (int) nthreads; --worker_id) {
0159             if (m_worker_states[worker_id]->thread != nullptr) {
0160                 if (m_worker_states[worker_id]->is_timed_out) {
0161                     // Thread has timed out. Rather than non-cooperatively killing it,
0162                     // we relinquish ownership of it but remember that it was ours once and
0163                     // is still out there, somewhere, biding its time
0164                     m_worker_states[worker_id]->thread->detach();
0165                     LOG_DEBUG(GetLogger()) << "Detached worker " << worker_id << LOG_END;
0166                 }
0167                 else {
0168                     LOG_DEBUG(GetLogger()) << "Joining worker " << worker_id << LOG_END;
0169                     m_worker_states[worker_id]->thread->join();
0170                     LOG_DEBUG(GetLogger()) << "Joined worker " << worker_id << LOG_END;
0171                 }
0172             }
0173             else {
0174                 LOG_DEBUG(GetLogger()) << "Skipping worker " << worker_id << LOG_END;
0175             }
0176         }
0177 
0178         lock.lock();
0179         // We retake the mutex so we can safely modify m_worker_states
0180         for (int worker_id=prev_nthreads-1; worker_id >= (int)nthreads; --worker_id) {
0181             if (m_worker_states.back()->thread != nullptr) {
0182                 delete m_worker_states.back()->thread;
0183             }
0184             m_worker_states.pop_back();
0185         }
0186     }
0187 }
0188 
0189 void JExecutionEngine::PauseTopology() {
0190     std::unique_lock<std::mutex> lock(m_mutex);
0191     if (m_runstatus != RunStatus::Running) return;
0192     m_runstatus = RunStatus::Pausing;
0193     for (auto& arrow: m_arrow_states) {
0194         if (arrow.status == ArrowState::Status::Running) {
0195             arrow.status = ArrowState::Status::Paused;
0196         }
0197     }
0198     LOG_WARN(GetLogger()) << "Requested pause" << LOG_END;
0199     lock.unlock();
0200     m_condvar.notify_all();
0201 }
0202 
0203 void JExecutionEngine::DrainTopology() {
0204     std::unique_lock<std::mutex> lock(m_mutex);
0205     if (m_runstatus != RunStatus::Running) return;
0206     m_runstatus = RunStatus::Draining;
0207     for (auto& arrow: m_arrow_states) {
0208         if (arrow.is_source) {
0209             if (arrow.status == ArrowState::Status::Running) {
0210                 arrow.status = ArrowState::Status::Paused;
0211             }
0212         }
0213     }
0214     LOG_WARN(GetLogger()) << "Requested drain" << LOG_END;
0215     lock.unlock();
0216     m_condvar.notify_all();
0217 }
0218 
0219 void JExecutionEngine::RunSupervisor() {
0220 
0221     if (m_interrupt_status == InterruptStatus::NoInterruptsUnsupervised) {
0222         m_interrupt_status = InterruptStatus::NoInterruptsSupervised;
0223     }
0224     size_t last_event_count = 0;
0225     clock_t::time_point last_measurement_time = clock_t::now();
0226 
0227     Perf perf;
0228     while (true) {
0229 
0230         if (m_enable_timeout && m_timeout_s > 0) {
0231             CheckTimeout();
0232         }
0233 
0234         if (m_print_worker_report_requested) {
0235             PrintWorkerReport(false);
0236             m_print_worker_report_requested = false;
0237         }
0238 
0239         if (m_send_worker_report_requested) {
0240             PrintWorkerReport(true);
0241             m_send_worker_report_requested = false;
0242         }
0243 
0244         perf = GetPerf();
0245         if ((perf.runstatus == RunStatus::Paused && m_interrupt_status != InterruptStatus::InspectRequested) || 
0246             perf.runstatus == RunStatus::Finished || 
0247             perf.runstatus == RunStatus::Failed) {
0248             break;
0249         }
0250 
0251         if (m_interrupt_status == InterruptStatus::InspectRequested) {
0252             if (perf.runstatus == RunStatus::Paused) {
0253                 LOG_INFO(GetLogger()) << "Entering inspector" << LOG_END;
0254                 m_enable_timeout = false;
0255                 m_interrupt_status = InterruptStatus::InspectInProgress;
0256                 InspectApplication(GetApplication());
0257                 m_interrupt_status = InterruptStatus::NoInterruptsSupervised;
0258 
0259                 // Jump back to the top of the loop so that we have fresh event count data
0260                 last_measurement_time = clock_t::now();
0261                 last_event_count = 0;
0262                 continue; 
0263             }
0264             else if (perf.runstatus == RunStatus::Running) {
0265                 PauseTopology();
0266             }
0267         }
0268         else if (m_interrupt_status == InterruptStatus::PauseAndQuit) {
0269             PauseTopology();
0270         }
0271 
0272         if (m_show_ticker) {
0273             auto next_measurement_time = clock_t::now();
0274             auto last_measurement_duration_ms = std::chrono::duration_cast<std::chrono::milliseconds>(next_measurement_time - last_measurement_time).count();
0275             float latest_throughput_hz = (last_measurement_duration_ms == 0) ? 0 : (perf.event_count - last_event_count) * 1000.0 / last_measurement_duration_ms;
0276             last_measurement_time = next_measurement_time;
0277             last_event_count = perf.event_count;
0278 
0279             // Print rates
0280             LOG_INFO(m_logger) << "Status: " << perf.event_count << " events processed at "
0281                             << JTypeInfo::to_string_with_si_prefix(latest_throughput_hz) << "Hz ("
0282                             << JTypeInfo::to_string_with_si_prefix(perf.throughput_hz) << "Hz avg)" << LOG_END;
0283         }
0284 
0285         std::this_thread::sleep_for(std::chrono::milliseconds(m_ticker_ms));
0286     }
0287     LOG_INFO(GetLogger()) << "Processing paused." << LOG_END;
0288 
0289     if (perf.runstatus == RunStatus::Failed) {
0290         HandleFailures();
0291     }
0292 
0293     PrintFinalReport();
0294 }
0295 
0296 bool JExecutionEngine::CheckTimeout() {
0297     std::unique_lock<std::mutex> lock(m_mutex);
0298     auto now = clock_t::now();
0299     bool timeout_detected = false;
0300     for (auto& worker: m_worker_states) {
0301         auto timeout_s = (worker->is_event_warmed_up) ? m_timeout_s : m_warmup_timeout_s;
0302         auto duration_s = std::chrono::duration_cast<std::chrono::seconds>(now - worker->last_checkout_time).count();
0303         if (duration_s > timeout_s) {
0304             worker->is_timed_out = true;
0305             timeout_detected = true;
0306             m_runstatus = RunStatus::Failed;
0307         }
0308     }
0309     return timeout_detected;
0310 }
0311 
0312 void JExecutionEngine::HandleFailures() {
0313 
0314     std::unique_lock<std::mutex> lock(m_mutex);
0315 
0316     // First, we log all of the failures we've found
0317     for (auto& worker: m_worker_states) {
0318         const auto& arrow_name = m_topology->arrows[worker->last_arrow_id]->get_name();
0319         if (worker->is_timed_out) {
0320             LOG_FATAL(GetLogger()) << "Timeout in worker thread " << worker->worker_id << " while executing " << arrow_name << " on event #" << worker->last_event_nr << LOG_END;
0321             pthread_kill(worker->thread->native_handle(), SIGUSR2);
0322             LOG_INFO(GetLogger()) << "Worker thread signalled; waiting for backtrace capture." << LOG_END;
0323             worker->backtrace.WaitForCapture();
0324         }
0325         if (worker->stored_exception != nullptr) {
0326             LOG_FATAL(GetLogger()) << "Exception in worker thread " << worker->worker_id << " while executing " << arrow_name << " on event #" << worker->last_event_nr << LOG_END;
0327         }
0328     }
0329 
0330     // Now we throw each of these exceptions in order, in case the caller is going to attempt to catch them.
0331     // In reality all callers are going to print everything they can about the exception and exit.
0332     for (auto& worker: m_worker_states) {
0333         if (worker->stored_exception != nullptr) {
0334             GetApplication()->SetExitCode((int) JApplication::ExitCode::UnhandledException);
0335             std::rethrow_exception(worker->stored_exception);
0336         }
0337         if (worker->is_timed_out) {
0338             GetApplication()->SetExitCode((int) JApplication::ExitCode::Timeout);
0339             auto ex = JException("Timeout in worker thread");
0340             ex.backtrace = worker->backtrace;
0341             throw ex;
0342         }
0343     }
0344 }
0345 
0346 void JExecutionEngine::FinishTopology() {
0347     std::unique_lock<std::mutex> lock(m_mutex);
0348     assert(m_runstatus == RunStatus::Paused);
0349 
0350     LOG_DEBUG(GetLogger()) << "Finishing processing..." << LOG_END;
0351     for (auto* arrow : m_topology->arrows) {
0352         arrow->finalize();
0353     }
0354     for (auto* pool: m_topology->pools) {
0355         pool->Finalize();
0356     }
0357     m_runstatus = RunStatus::Finished;
0358     LOG_INFO(GetLogger()) << "Finished processing." << LOG_END;
0359 }
0360 
0361 JExecutionEngine::RunStatus JExecutionEngine::GetRunStatus() {
0362     std::unique_lock<std::mutex> lock(m_mutex);
0363     return m_runstatus;
0364 }
0365 
0366 JExecutionEngine::Perf JExecutionEngine::GetPerf() {
0367     std::unique_lock<std::mutex> lock(m_mutex);
0368     Perf result;
0369     if (m_runstatus == RunStatus::Paused || m_runstatus == RunStatus::Failed) {
0370         result.event_count = m_event_count_at_finish - m_event_count_at_start;
0371         result.uptime_ms = std::chrono::duration_cast<std::chrono::milliseconds>(m_time_at_finish - m_time_at_start).count();
0372     }
0373     else {
0374         // Obtain current event count
0375         size_t current_event_count = 0;
0376         for (auto& state : m_arrow_states) {
0377             if (state.is_sink) {
0378                 current_event_count += state.events_processed;
0379             }
0380         }
0381         result.event_count = current_event_count - m_event_count_at_start;
0382         result.uptime_ms = std::chrono::duration_cast<std::chrono::milliseconds>(clock_t::now() - m_time_at_start).count();
0383     }
0384     result.runstatus = m_runstatus;
0385     result.thread_count = m_worker_states.size();
0386     result.throughput_hz = (result.uptime_ms == 0) ? 0 : (result.event_count * 1000.0) / result.uptime_ms;
0387     result.event_level = JEventLevel::PhysicsEvent;
0388     return result;
0389 }
0390 
0391 JExecutionEngine::Worker JExecutionEngine::RegisterWorker() {
0392     std::unique_lock<std::mutex> lock(m_mutex);
0393     auto worker_id = m_worker_states.size();
0394     auto worker = std::make_unique<WorkerState>();
0395     worker->worker_id = worker_id;
0396     worker->is_stop_requested = false;
0397     worker->cpu_id = m_topology->mapping.get_cpu_id(worker_id);
0398     worker->location_id = m_topology->mapping.get_loc_id(worker_id);
0399     worker->thread = nullptr;
0400     m_worker_states.push_back(std::move(worker));
0401 
0402     bool pin_to_cpu = (m_topology->mapping.get_affinity() != JProcessorMapping::AffinityStrategy::None);
0403     if (pin_to_cpu) {
0404         JCpuInfo::PinThreadToCpu(worker->thread, worker->cpu_id);
0405     }
0406     return {worker_id, &worker->backtrace};
0407 
0408 }
0409 
0410 
0411 void JExecutionEngine::RunWorker(Worker worker) {
0412 
0413     LOG_DEBUG(GetLogger()) << "Launched worker thread " << worker.worker_id << LOG_END;
0414     jana2_worker_id = worker.worker_id;
0415     jana2_worker_backtrace = worker.backtrace;
0416     try {
0417         Task task;
0418         while (true) {
0419             ExchangeTask(task, worker.worker_id);
0420             if (task.arrow == nullptr) break; // Exit as soon as ExchangeTask() stops blocking
0421             task.arrow->fire(task.input_event, task.outputs, task.output_count, task.status);
0422         }
0423         LOG_DEBUG(GetLogger()) << "Stopped worker thread " << worker.worker_id << LOG_END;
0424     }
0425     catch (...) {
0426         LOG_ERROR(GetLogger()) << "Exception on worker thread " << worker.worker_id << LOG_END;
0427         std::unique_lock<std::mutex> lock(m_mutex);
0428         m_runstatus = RunStatus::Failed;
0429         m_worker_states.at(worker.worker_id)->stored_exception = std::current_exception();
0430     }
0431 }
0432 
0433 
0434 void JExecutionEngine::ExchangeTask(Task& task, size_t worker_id, bool nonblocking) {
0435 
0436     auto checkin_time = std::chrono::steady_clock::now();
0437     // It's important to start measuring this _before_ acquiring the lock because acquiring the lock
0438     // may be a big part of the scheduler overhead
0439 
0440     std::unique_lock<std::mutex> lock(m_mutex);
0441     
0442     auto& worker = *m_worker_states.at(worker_id);
0443 
0444     if (task.arrow != nullptr) {
0445         CheckinCompletedTask_Unsafe(task, worker, checkin_time);
0446     }
0447 
0448     if (worker.is_stop_requested) {
0449         return;
0450     }
0451 
0452     FindNextReadyTask_Unsafe(task, worker);
0453 
0454     if (nonblocking) { return; }
0455     auto idle_time_start = clock_t::now();
0456     m_total_scheduler_duration += (idle_time_start - checkin_time);
0457 
0458     while (task.arrow == nullptr && !worker.is_stop_requested) {
0459         m_condvar.wait(lock);
0460         FindNextReadyTask_Unsafe(task, worker);
0461     }
0462     worker.last_checkout_time = clock_t::now();
0463 
0464     if (task.input_event != nullptr) {
0465         worker.last_event_nr = task.input_event->GetEventNumber();
0466     }
0467     else {
0468         worker.last_event_nr = 0;
0469     }
0470     m_total_idle_duration += (worker.last_checkout_time - idle_time_start);
0471 
0472     // Notify one worker, who will notify the next, etc, as long as FindNextReadyTaskUnsafe() succeeds.
0473     // After FindNextReadyTaskUnsafe fails, all threads block until the next returning worker reactivates the
0474     // notification chain.
0475     m_condvar.notify_one();
0476 }
0477 
0478 
0479 void JExecutionEngine::CheckinCompletedTask_Unsafe(Task& task, WorkerState& worker, clock_t::time_point checkin_time) {
0480 
0481     auto processing_duration = checkin_time - worker.last_checkout_time;
0482 
0483     ArrowState& arrow_state = m_arrow_states.at(worker.last_arrow_id);
0484 
0485     arrow_state.active_tasks -= 1;
0486     arrow_state.total_processing_duration += processing_duration;
0487 
0488     for (size_t output=0; output<task.output_count; ++output) {
0489         if (!task.arrow->get_port(task.outputs[output].second).is_input) {
0490             arrow_state.events_processed++;
0491         }
0492     }
0493 
0494     // Put each output in its correct queue or pool
0495     task.arrow->push(task.outputs, task.output_count, worker.location_id);
0496 
0497     if (task.status == JArrow::FireResult::Finished) {
0498         // If this is an eventsource self-terminating (the only thing that returns Status::Finished right now) it will
0499         // have already called DoClose(). I'm tempted to always call DoClose() as part of JExecutionEngine::Finish() instead, however.
0500 
0501         // Mark arrow as finished
0502         arrow_state.status = ArrowState::Status::Finished;
0503 
0504         // Check if this switches the topology to Draining()
0505         if (m_runstatus == RunStatus::Running) {
0506             bool draining = true;
0507             for (auto& arrow: m_arrow_states) {
0508                 if (arrow.is_source && arrow.status == ArrowState::Status::Running) {
0509                     draining = false;
0510                 }
0511             }
0512             if (draining) {
0513                 m_runstatus = RunStatus::Draining;
0514             }
0515         }
0516     }
0517     worker.last_arrow_id = -1;
0518     worker.last_event_nr = 0;
0519 
0520     task.arrow = nullptr;
0521     task.input_event = nullptr;
0522     task.output_count = 0;
0523     task.status = JArrow::FireResult::NotRunYet;
0524 };
0525 
0526 
0527 void JExecutionEngine::FindNextReadyTask_Unsafe(Task& task, WorkerState& worker) {
0528 
0529     if (m_runstatus == RunStatus::Running || m_runstatus == RunStatus::Draining) {
0530         // We only pick up a new task if the topology is running or draining.
0531 
0532         // Each call to FindNextReadyTask_Unsafe() starts with a different m_next_arrow_id to ensure balanced arrow assignments
0533         size_t arrow_count = m_arrow_states.size();
0534         m_next_arrow_id += 1;
0535         m_next_arrow_id %= arrow_count;
0536 
0537         for (size_t i=m_next_arrow_id; i<(m_next_arrow_id+arrow_count); ++i) {
0538             size_t arrow_id = i % arrow_count;
0539 
0540             auto& state = m_arrow_states[arrow_id];
0541             if (!state.is_parallel && (state.active_tasks != 0)) {
0542                 // We've found a sequential arrow that is already active. Nothing we can do here.
0543                 LOG_TRACE(GetLogger()) << "Scheduler: Arrow with id " << arrow_id << " is unready: Sequential and already active." << LOG_END;
0544                 continue;
0545             }
0546 
0547             if (state.status != ArrowState::Status::Running) {
0548                 LOG_TRACE(GetLogger()) << "Scheduler: Arrow with id " << arrow_id << " is unready: Arrow is either paused or finished." << LOG_END;
0549                 continue;
0550             }
0551             // TODO: Support next_visit_time so that we don't hammer blocked event sources
0552 
0553             // See if we can obtain an input event (this is silly)
0554             JArrow* arrow = m_topology->arrows[arrow_id];
0555             // TODO: consider setting state.next_input, retrieving via fire()
0556             auto port = arrow->get_next_port_index();
0557             JEvent* event = (port == -1) ? nullptr : arrow->pull(port, worker.location_id);
0558             if (event != nullptr || port == -1) {
0559                 LOG_TRACE(GetLogger()) << "Scheduler: Found next ready arrow with id " << arrow_id << LOG_END;
0560                 // We've found a task that is ready!
0561                 state.active_tasks += 1;
0562 
0563                 task.arrow = arrow;
0564                 task.input_port = port;
0565                 task.input_event = event;
0566                 task.output_count = 0;
0567                 task.status = JArrow::FireResult::NotRunYet;
0568 
0569                 worker.last_arrow_id = arrow_id;
0570                 if (event != nullptr) {
0571                     worker.is_event_warmed_up = event->IsWarmedUp();
0572                     worker.last_event_nr = event->GetEventNumber();
0573                 }
0574                 else {
0575                     worker.is_event_warmed_up = true; // Use shorter timeout
0576                     worker.last_event_nr = 0;
0577                 }
0578                 return;
0579             }
0580             else {
0581                 LOG_TRACE(GetLogger()) << "Scheduler: Arrow with id " << arrow_id << " is unready: Input event is needed but not on queue yet." << LOG_END;
0582             }
0583         }
0584     }
0585 
0586     // Because we reached this point, we know that there aren't any tasks ready,
0587     // so we check whether more are potentially coming. If not, we can pause the topology.
0588     // Note that our worker threads will still wait at ExchangeTask() until they get
0589     // shut down separately during Scale().
0590     
0591     if (m_runstatus == RunStatus::Running || m_runstatus == RunStatus::Pausing || m_runstatus == RunStatus::Draining) {
0592         // We want to avoid scenarios such as where the topology already Finished but then gets reset to Paused
0593         // This also leaves a cleaner narrative in the logs. 
0594 
0595         bool any_active_source_found = false;
0596         bool any_active_task_found = false;
0597         
0598         LOG_DEBUG(GetLogger()) << "Scheduler: No tasks ready" << LOG_END;
0599 
0600         for (size_t arrow_id = 0; arrow_id < m_arrow_states.size(); ++arrow_id) {
0601             auto& state = m_arrow_states[arrow_id];
0602             any_active_source_found |= (state.status == ArrowState::Status::Running && state.is_source);
0603             any_active_task_found |= (state.active_tasks != 0);
0604             // A source might have been deactivated by RequestPause, Ctrl-C, etc, and might be inactive even though it still has active tasks
0605         }
0606 
0607         if (!any_active_source_found && !any_active_task_found) {
0608             // Pause the topology
0609             m_time_at_finish = clock_t::now();
0610             m_event_count_at_finish = 0;
0611             for (auto& arrow_state : m_arrow_states) {
0612                 if (arrow_state.is_sink) {
0613                     m_event_count_at_finish += arrow_state.events_processed;
0614                 }
0615             }
0616             LOG_DEBUG(GetLogger()) << "Scheduler: Processing paused" << LOG_END;
0617             m_runstatus = RunStatus::Paused;
0618             // I think this is the ONLY site where the topology gets paused. Verify this?
0619         }
0620     }
0621 
0622     worker.last_arrow_id = -1;
0623 
0624     task.arrow = nullptr;
0625     task.input_port = -1;
0626     task.input_event = nullptr;
0627     task.output_count = 0;
0628     task.status = JArrow::FireResult::NotRunYet;
0629 }
0630 
0631 
0632 void JExecutionEngine::PrintFinalReport() {
0633 
0634     std::unique_lock<std::mutex> lock(m_mutex);
0635     auto event_count = m_event_count_at_finish - m_event_count_at_start;
0636     auto uptime_ms = std::chrono::duration_cast<std::chrono::milliseconds>(m_time_at_finish - m_time_at_start).count();
0637     auto thread_count = m_worker_states.size();
0638     auto throughput_hz = (event_count * 1000.0) / uptime_ms;
0639 
0640     LOG_INFO(GetLogger()) << "Detailed report:" << LOG_END;
0641     LOG_INFO(GetLogger()) << LOG_END;
0642     LOG_INFO(GetLogger()) << "  Avg throughput [Hz]:         " << std::setprecision(3) << throughput_hz << LOG_END;
0643     LOG_INFO(GetLogger()) << "  Completed events [count]:    " << event_count << LOG_END;
0644     LOG_INFO(GetLogger()) << "  Total uptime [s]:            " << std::setprecision(4) << uptime_ms/1000.0 << LOG_END;
0645     LOG_INFO(GetLogger()) << "  Thread team size [count]:    " << thread_count << LOG_END;
0646     LOG_INFO(GetLogger()) << LOG_END;
0647     LOG_INFO(GetLogger()) << "  Arrow-level metrics:" << LOG_END;
0648     LOG_INFO(GetLogger()) << LOG_END;
0649 
0650     size_t total_useful_ms = 0;
0651 
0652     for (size_t arrow_id=0; arrow_id < m_arrow_states.size(); ++arrow_id) {
0653         auto* arrow = m_topology->arrows[arrow_id];
0654         auto& arrow_state = m_arrow_states[arrow_id];
0655         auto useful_ms = std::chrono::duration_cast<std::chrono::milliseconds>(arrow_state.total_processing_duration).count();
0656         total_useful_ms += useful_ms;
0657         auto avg_latency = useful_ms*1.0/arrow_state.events_processed;
0658         auto throughput_bottleneck = 1000.0 / avg_latency;
0659         if (arrow->is_parallel()) {
0660             throughput_bottleneck *= thread_count;
0661         }
0662 
0663         LOG_INFO(GetLogger()) << "  - Arrow name:                 " << arrow->get_name() << LOG_END;
0664         LOG_INFO(GetLogger()) << "    Parallel:                   " << arrow->is_parallel() << LOG_END;
0665         LOG_INFO(GetLogger()) << "    Events completed:           " << arrow_state.events_processed << LOG_END;
0666         LOG_INFO(GetLogger()) << "    Avg latency [ms/event]:     " << avg_latency << LOG_END;
0667         LOG_INFO(GetLogger()) << "    Throughput bottleneck [Hz]: " << throughput_bottleneck << LOG_END;
0668         LOG_INFO(GetLogger()) << LOG_END;
0669     }
0670 
0671     auto total_scheduler_ms = std::chrono::duration_cast<std::chrono::milliseconds>(m_total_scheduler_duration).count();
0672     auto total_idle_ms = std::chrono::duration_cast<std::chrono::milliseconds>(m_total_idle_duration).count();
0673 
0674     LOG_INFO(GetLogger()) << "  Total useful time [s]:     " << std::setprecision(6) << total_useful_ms/1000.0 << LOG_END;
0675     LOG_INFO(GetLogger()) << "  Total scheduler time [s]:  " << std::setprecision(6) << total_scheduler_ms/1000.0 << LOG_END;
0676     LOG_INFO(GetLogger()) << "  Total idle time [s]:       " << std::setprecision(6) << total_idle_ms/1000.0 << LOG_END;
0677 
0678     LOG_INFO(GetLogger()) << LOG_END;
0679 
0680     LOG_INFO(GetLogger()) << "Final report: " << event_count << " events processed at "
0681                           << JTypeInfo::to_string_with_si_prefix(throughput_hz) << "Hz" << LOG_END;
0682 
0683 }
0684 
0685 void JExecutionEngine::SetTickerEnabled(bool show_ticker) {
0686     m_show_ticker = show_ticker;
0687 }
0688 
0689 bool JExecutionEngine::IsTickerEnabled() const {
0690     return m_show_ticker;
0691 }
0692 
0693 void JExecutionEngine::SetTimeoutEnabled(bool timeout_enabled) {
0694     m_enable_timeout = timeout_enabled;
0695 }
0696 
0697 bool JExecutionEngine::IsTimeoutEnabled() const {
0698     return m_enable_timeout;
0699 }
0700 
0701 JArrow::FireResult JExecutionEngine::Fire(size_t arrow_id, size_t location_id) {
0702 
0703     std::unique_lock<std::mutex> lock(m_mutex);
0704     if (arrow_id >= m_topology->arrows.size()) {
0705         LOG_WARN(GetLogger()) << "Firing unsuccessful: No arrow exists with id=" << arrow_id << LOG_END;
0706         return JArrow::FireResult::NotRunYet;
0707     }
0708     JArrow* arrow = m_topology->arrows[arrow_id];
0709     LOG_WARN(GetLogger()) << "Attempting to fire arrow with name=" << arrow->get_name() 
0710                           << ", index=" << arrow_id << ", location=" << location_id << LOG_END;
0711 
0712     ArrowState& arrow_state = m_arrow_states[arrow_id];
0713     if (arrow_state.status == ArrowState::Status::Finished) {
0714         LOG_WARN(GetLogger()) << "Firing unsuccessful: Arrow status is Finished." << arrow_id << LOG_END;
0715         return JArrow::FireResult::Finished;
0716     }
0717     if (!arrow_state.is_parallel && arrow_state.active_tasks != 0) {
0718         LOG_WARN(GetLogger()) << "Firing unsuccessful: Arrow is sequential and already has an active task." << arrow_id << LOG_END;
0719         return JArrow::FireResult::NotRunYet;
0720     }
0721     arrow_state.active_tasks += 1;
0722 
0723     auto port = arrow->get_next_port_index();
0724     JEvent* event = nullptr;
0725     if (port != -1) {
0726         event = arrow->pull(port, location_id);
0727         if (event == nullptr) {
0728             LOG_WARN(GetLogger()) << "Firing unsuccessful: Arrow needs an input event from port " << port << ", but the queue or pool is empty." << LOG_END;
0729             arrow_state.active_tasks -= 1;
0730             return JArrow::FireResult::NotRunYet;
0731         }
0732         else {
0733             LOG_WARN(GetLogger()) << "Input event #" << event->GetEventNumber() << " from port " << port << LOG_END;
0734         }
0735     }
0736     else {
0737         LOG_WARN(GetLogger()) << "No input events" << LOG_END;
0738     }
0739     lock.unlock();
0740 
0741     size_t output_count;
0742     JArrow::OutputData outputs;
0743     JArrow::FireResult result = JArrow::FireResult::NotRunYet;
0744 
0745     LOG_WARN(GetLogger()) << "Firing arrow" << LOG_END;
0746     arrow->fire(event, outputs, output_count, result);
0747     LOG_WARN(GetLogger()) << "Fired arrow with result " << to_string(result) << LOG_END;
0748     if (output_count == 0) {
0749         LOG_WARN(GetLogger()) << "No output events" << LOG_END;
0750     }
0751     else {
0752         for (size_t i=0; i<output_count; ++i) {
0753             LOG_WARN(GetLogger()) << "Output event #" << outputs.at(i).first->GetEventNumber() << " on port " << outputs.at(i).second << LOG_END;
0754         }
0755     }
0756 
0757     lock.lock();
0758     arrow->push(outputs, output_count, location_id);
0759     arrow_state.active_tasks -= 1;
0760     lock.unlock();
0761     return result;
0762 }
0763 
0764 
0765 void JExecutionEngine::HandleSIGINT() {
0766     InterruptStatus status = m_interrupt_status;
0767     std::cout << std::endl;
0768     switch (status) {
0769         case InterruptStatus::NoInterruptsSupervised: m_interrupt_status = InterruptStatus::InspectRequested; break;
0770         case InterruptStatus::InspectRequested: m_interrupt_status = InterruptStatus::PauseAndQuit; break;
0771         case InterruptStatus::NoInterruptsUnsupervised:
0772         case InterruptStatus::PauseAndQuit:
0773         case InterruptStatus::InspectInProgress: 
0774             _exit(-2);
0775     }
0776 }
0777 
0778 void JExecutionEngine::HandleSIGUSR1() {
0779     m_send_worker_report_requested = true;
0780 }
0781 
0782 void JExecutionEngine::HandleSIGUSR2() {
0783     if (jana2_worker_backtrace != nullptr) {
0784         jana2_worker_backtrace->Capture(3);
0785     }
0786 }
0787 
0788 void JExecutionEngine::HandleSIGTSTP() {
0789     std::cout << std::endl;
0790     m_print_worker_report_requested = true;
0791 }
0792 
0793 void JExecutionEngine::PrintWorkerReport(bool send_to_pipe) {
0794 
0795     std::unique_lock<std::mutex> lock(m_mutex);
0796     LOG_INFO(GetLogger()) << "Generating worker report. It may take some time to retrieve each symbol's debug information." << LOG_END;
0797     for (auto& worker: m_worker_states) {
0798         worker->backtrace.Reset();
0799         pthread_kill(worker->thread->native_handle(), SIGUSR2);
0800     }
0801     for (auto& worker: m_worker_states) {
0802         worker->backtrace.WaitForCapture();
0803     }
0804     std::ostringstream oss;
0805     oss << "Worker report" << std::endl;
0806     for (auto& worker: m_worker_states) {
0807         oss << "------------------------------" << std::endl 
0808             << "  Worker:        " << worker->worker_id << std::endl
0809             << "  Current arrow: " << worker->last_arrow_id << std::endl
0810             << "  Current event: " << worker->last_event_nr << std::endl
0811             << "  Backtrace:" << std::endl << std::endl
0812             << worker->backtrace.ToString();
0813     }
0814     auto s = oss.str();
0815     LOG_WARN(GetLogger()) << s << LOG_END;
0816 
0817     if (send_to_pipe) {
0818 
0819         int fd = open(m_path_to_named_pipe.c_str(), O_WRONLY);
0820         if (fd >= 0) {
0821             write(fd, s.c_str(), s.length()+1);
0822             close(fd);
0823         }
0824         else {
0825             LOG_ERROR(GetLogger()) << "Unable to open named pipe '" << m_path_to_named_pipe << "' for writing. \n"
0826             << "  You can use a different named pipe for status info by setting the parameter `jana:status_fname`.\n"
0827             << "  The status report will still show up in the log." << LOG_END;
0828         }
0829     }
0830 }
0831 
0832 
0833 std::string ToString(JExecutionEngine::RunStatus runstatus) {
0834     switch(runstatus) {
0835         case JExecutionEngine::RunStatus::Running: return "Running";
0836         case JExecutionEngine::RunStatus::Paused: return "Paused";
0837         case JExecutionEngine::RunStatus::Failed: return "Failed";
0838         case JExecutionEngine::RunStatus::Pausing: return "Pausing";
0839         case JExecutionEngine::RunStatus::Draining: return "Draining";
0840         case JExecutionEngine::RunStatus::Finished: return "Finished";
0841         default: return "CorruptedRunStatus";
0842     }
0843 }
0844