File indexing completed on 2025-07-15 09:15:50
0001
0002 #include "JExecutionEngine.h"
0003 #include <JANA/Utils/JApplicationInspector.h>
0004
0005 #include <chrono>
0006 #include <cstddef>
0007 #include <cstdio>
0008 #include <ctime>
0009 #include <exception>
0010 #include <mutex>
0011 #include <csignal>
0012 #include <ostream>
0013 #include <sstream>
0014 #include <sys/stat.h>
0015 #include <fcntl.h>
0016 #include <unistd.h>
0017
0018 thread_local int jana2_worker_id = -1;
0019 thread_local JBacktrace* jana2_worker_backtrace = nullptr;
0020
0021 void JExecutionEngine::Init() {
0022 auto params = GetApplication()->GetJParameterManager();
0023
0024 params->SetDefaultParameter("jana:timeout", m_timeout_s,
0025 "Max time (in seconds) JANA will wait for a thread to update its heartbeat before hard-exiting. 0 to disable timeout completely.");
0026
0027 params->SetDefaultParameter("jana:warmup_timeout", m_warmup_timeout_s,
0028 "Max time (in seconds) JANA will wait for 'initial' events to complete before hard-exiting.");
0029
0030 params->SetDefaultParameter("jana:backoff_interval", m_backoff_ms,
0031 "Max time (in seconds) JANA will wait for 'initial' events to complete before hard-exiting.");
0032
0033 params->SetDefaultParameter("jana:show_ticker", m_show_ticker, "Controls whether the ticker is visible");
0034
0035 params->SetDefaultParameter("jana:ticker_interval", m_ticker_ms, "Controls the ticker interval (in ms)");
0036
0037 auto p = params->SetDefaultParameter("jana:status_fname", m_path_to_named_pipe,
0038 "Filename of named pipe for retrieving instantaneous status info");
0039
0040 size_t pid = getpid();
0041 mkfifo(m_path_to_named_pipe.c_str(), 0666);
0042
0043 LOG_WARN(GetLogger()) << "To pause processing and inspect, press Ctrl-C." << LOG_END;
0044 LOG_WARN(GetLogger()) << "For a clean shutdown, press Ctrl-C twice." << LOG_END;
0045 LOG_WARN(GetLogger()) << "For a hard shutdown, press Ctrl-C three times." << LOG_END;
0046
0047 if (p->IsDefault()) {
0048 LOG_WARN(GetLogger()) << "For worker status information, press Ctrl-Z, or run `jana-status " << pid << "`" << LOG_END;
0049 }
0050 else {
0051 LOG_WARN(GetLogger()) << "For worker status information, press Ctrl-Z, or run `jana-status " << pid << " " << m_path_to_named_pipe << "`" << LOG_END;
0052 }
0053
0054
0055
0056
0057
0058 for (JArrow* arrow : m_topology->arrows) {
0059
0060 arrow->initialize();
0061
0062 m_arrow_states.emplace_back();
0063 auto& arrow_state = m_arrow_states.back();
0064 arrow_state.is_source = arrow->is_source();
0065 arrow_state.is_sink = arrow->is_sink();
0066 arrow_state.is_parallel = arrow->is_parallel();
0067 arrow_state.next_input = arrow->get_next_port_index();
0068 }
0069 }
0070
0071 void JExecutionEngine::RequestInspector() {
0072 std::unique_lock<std::mutex> lock(m_mutex);
0073 m_interrupt_status = InterruptStatus::InspectRequested;
0074 }
0075
0076 void JExecutionEngine::RunTopology() {
0077 std::unique_lock<std::mutex> lock(m_mutex);
0078
0079 if (m_runstatus == RunStatus::Failed) {
0080 throw JException("Cannot switch topology runstatus to Running because it is already Failed");
0081 }
0082 if (m_runstatus == RunStatus::Finished) {
0083 throw JException("Cannot switch topology runstatus to Running because it is already Finished");
0084 }
0085 if (m_arrow_states.size() == 0) {
0086 throw JException("Cannot execute an empty topology! Hint: Have you provided an event source?");
0087 }
0088
0089
0090 m_time_at_start = clock_t::now();
0091 m_event_count_at_start = m_event_count_at_finish;
0092
0093
0094 for (auto& arrow: m_arrow_states) {
0095 if (arrow.status == ArrowState::Status::Paused) {
0096 arrow.status = ArrowState::Status::Running;
0097 }
0098 }
0099
0100 m_runstatus = RunStatus::Running;
0101
0102 lock.unlock();
0103 m_condvar.notify_one();
0104 }
0105
0106 void JExecutionEngine::ScaleWorkers(size_t nthreads) {
0107
0108
0109
0110
0111
0112
0113
0114
0115 std::unique_lock<std::mutex> lock(m_mutex);
0116
0117 if (nthreads != 0 && m_arrow_states.size() == 0) {
0118
0119
0120 throw JException("Cannot execute an empty topology! Hint: Have you provided an event source?");
0121 }
0122
0123 auto prev_nthreads = m_worker_states.size();
0124
0125 if (prev_nthreads < nthreads) {
0126
0127 LOG_DEBUG(GetLogger()) << "Scaling up to " << nthreads << " worker threads" << LOG_END;
0128 for (size_t worker_id=prev_nthreads; worker_id < nthreads; ++worker_id) {
0129 auto worker = std::make_unique<WorkerState>();
0130 worker->worker_id = worker_id;
0131 worker->is_stop_requested = false;
0132 worker->cpu_id = m_topology->mapping.get_cpu_id(worker_id);
0133 worker->location_id = m_topology->mapping.get_loc_id(worker_id);
0134 worker->thread = new std::thread(&JExecutionEngine::RunWorker, this, Worker{worker_id, &worker->backtrace});
0135 LOG_DEBUG(GetLogger()) << "Launching worker thread " << worker_id << " on cpu=" << worker->cpu_id << ", location=" << worker->location_id << LOG_END;
0136 m_worker_states.push_back(std::move(worker));
0137
0138 bool pin_to_cpu = (m_topology->mapping.get_affinity() != JProcessorMapping::AffinityStrategy::None);
0139 if (pin_to_cpu) {
0140 JCpuInfo::PinThreadToCpu(worker->thread, worker->cpu_id);
0141 }
0142 }
0143 }
0144
0145 else if (prev_nthreads > nthreads) {
0146
0147 LOG_DEBUG(GetLogger()) << "Scaling down to " << nthreads << " worker threads" << LOG_END;
0148
0149
0150 for (int worker_id=prev_nthreads-1; worker_id >= (int)nthreads; --worker_id) {
0151 LOG_DEBUG(GetLogger()) << "Stopping worker " << worker_id << LOG_END;
0152 m_worker_states[worker_id]->is_stop_requested = true;
0153 }
0154 m_condvar.notify_all();
0155 lock.unlock();
0156
0157
0158 for (int worker_id=prev_nthreads-1; worker_id >= (int) nthreads; --worker_id) {
0159 if (m_worker_states[worker_id]->thread != nullptr) {
0160 if (m_worker_states[worker_id]->is_timed_out) {
0161
0162
0163
0164 m_worker_states[worker_id]->thread->detach();
0165 LOG_DEBUG(GetLogger()) << "Detached worker " << worker_id << LOG_END;
0166 }
0167 else {
0168 LOG_DEBUG(GetLogger()) << "Joining worker " << worker_id << LOG_END;
0169 m_worker_states[worker_id]->thread->join();
0170 LOG_DEBUG(GetLogger()) << "Joined worker " << worker_id << LOG_END;
0171 }
0172 }
0173 else {
0174 LOG_DEBUG(GetLogger()) << "Skipping worker " << worker_id << LOG_END;
0175 }
0176 }
0177
0178 lock.lock();
0179
0180 for (int worker_id=prev_nthreads-1; worker_id >= (int)nthreads; --worker_id) {
0181 if (m_worker_states.back()->thread != nullptr) {
0182 delete m_worker_states.back()->thread;
0183 }
0184 m_worker_states.pop_back();
0185 }
0186 }
0187 }
0188
0189 void JExecutionEngine::PauseTopology() {
0190 std::unique_lock<std::mutex> lock(m_mutex);
0191 if (m_runstatus != RunStatus::Running) return;
0192 m_runstatus = RunStatus::Pausing;
0193 for (auto& arrow: m_arrow_states) {
0194 if (arrow.status == ArrowState::Status::Running) {
0195 arrow.status = ArrowState::Status::Paused;
0196 }
0197 }
0198 LOG_WARN(GetLogger()) << "Requested pause" << LOG_END;
0199 lock.unlock();
0200 m_condvar.notify_all();
0201 }
0202
0203 void JExecutionEngine::DrainTopology() {
0204 std::unique_lock<std::mutex> lock(m_mutex);
0205 if (m_runstatus != RunStatus::Running) return;
0206 m_runstatus = RunStatus::Draining;
0207 for (auto& arrow: m_arrow_states) {
0208 if (arrow.is_source) {
0209 if (arrow.status == ArrowState::Status::Running) {
0210 arrow.status = ArrowState::Status::Paused;
0211 }
0212 }
0213 }
0214 LOG_WARN(GetLogger()) << "Requested drain" << LOG_END;
0215 lock.unlock();
0216 m_condvar.notify_all();
0217 }
0218
0219 void JExecutionEngine::RunSupervisor() {
0220
0221 if (m_interrupt_status == InterruptStatus::NoInterruptsUnsupervised) {
0222 m_interrupt_status = InterruptStatus::NoInterruptsSupervised;
0223 }
0224 size_t last_event_count = 0;
0225 clock_t::time_point last_measurement_time = clock_t::now();
0226
0227 Perf perf;
0228 while (true) {
0229
0230 if (m_enable_timeout && m_timeout_s > 0) {
0231 CheckTimeout();
0232 }
0233
0234 if (m_print_worker_report_requested) {
0235 PrintWorkerReport(false);
0236 m_print_worker_report_requested = false;
0237 }
0238
0239 if (m_send_worker_report_requested) {
0240 PrintWorkerReport(true);
0241 m_send_worker_report_requested = false;
0242 }
0243
0244 perf = GetPerf();
0245 if ((perf.runstatus == RunStatus::Paused && m_interrupt_status != InterruptStatus::InspectRequested) ||
0246 perf.runstatus == RunStatus::Finished ||
0247 perf.runstatus == RunStatus::Failed) {
0248 break;
0249 }
0250
0251 if (m_interrupt_status == InterruptStatus::InspectRequested) {
0252 if (perf.runstatus == RunStatus::Paused) {
0253 LOG_INFO(GetLogger()) << "Entering inspector" << LOG_END;
0254 m_enable_timeout = false;
0255 m_interrupt_status = InterruptStatus::InspectInProgress;
0256 InspectApplication(GetApplication());
0257 m_interrupt_status = InterruptStatus::NoInterruptsSupervised;
0258
0259
0260 last_measurement_time = clock_t::now();
0261 last_event_count = 0;
0262 continue;
0263 }
0264 else if (perf.runstatus == RunStatus::Running) {
0265 PauseTopology();
0266 }
0267 }
0268 else if (m_interrupt_status == InterruptStatus::PauseAndQuit) {
0269 PauseTopology();
0270 }
0271
0272 if (m_show_ticker) {
0273 auto next_measurement_time = clock_t::now();
0274 auto last_measurement_duration_ms = std::chrono::duration_cast<std::chrono::milliseconds>(next_measurement_time - last_measurement_time).count();
0275 float latest_throughput_hz = (last_measurement_duration_ms == 0) ? 0 : (perf.event_count - last_event_count) * 1000.0 / last_measurement_duration_ms;
0276 last_measurement_time = next_measurement_time;
0277 last_event_count = perf.event_count;
0278
0279
0280 LOG_INFO(m_logger) << "Status: " << perf.event_count << " events processed at "
0281 << JTypeInfo::to_string_with_si_prefix(latest_throughput_hz) << "Hz ("
0282 << JTypeInfo::to_string_with_si_prefix(perf.throughput_hz) << "Hz avg)" << LOG_END;
0283 }
0284
0285 std::this_thread::sleep_for(std::chrono::milliseconds(m_ticker_ms));
0286 }
0287 LOG_INFO(GetLogger()) << "Processing paused." << LOG_END;
0288
0289 if (perf.runstatus == RunStatus::Failed) {
0290 HandleFailures();
0291 }
0292
0293 PrintFinalReport();
0294 }
0295
0296 bool JExecutionEngine::CheckTimeout() {
0297 std::unique_lock<std::mutex> lock(m_mutex);
0298 auto now = clock_t::now();
0299 bool timeout_detected = false;
0300 for (auto& worker: m_worker_states) {
0301 auto timeout_s = (worker->is_event_warmed_up) ? m_timeout_s : m_warmup_timeout_s;
0302 auto duration_s = std::chrono::duration_cast<std::chrono::seconds>(now - worker->last_checkout_time).count();
0303 if (duration_s > timeout_s) {
0304 worker->is_timed_out = true;
0305 timeout_detected = true;
0306 m_runstatus = RunStatus::Failed;
0307 }
0308 }
0309 return timeout_detected;
0310 }
0311
0312 void JExecutionEngine::HandleFailures() {
0313
0314 std::unique_lock<std::mutex> lock(m_mutex);
0315
0316
0317 for (auto& worker: m_worker_states) {
0318 const auto& arrow_name = m_topology->arrows[worker->last_arrow_id]->get_name();
0319 if (worker->is_timed_out) {
0320 LOG_FATAL(GetLogger()) << "Timeout in worker thread " << worker->worker_id << " while executing " << arrow_name << " on event #" << worker->last_event_nr << LOG_END;
0321 pthread_kill(worker->thread->native_handle(), SIGUSR2);
0322 LOG_INFO(GetLogger()) << "Worker thread signalled; waiting for backtrace capture." << LOG_END;
0323 worker->backtrace.WaitForCapture();
0324 }
0325 if (worker->stored_exception != nullptr) {
0326 LOG_FATAL(GetLogger()) << "Exception in worker thread " << worker->worker_id << " while executing " << arrow_name << " on event #" << worker->last_event_nr << LOG_END;
0327 }
0328 }
0329
0330
0331
0332 for (auto& worker: m_worker_states) {
0333 if (worker->stored_exception != nullptr) {
0334 GetApplication()->SetExitCode((int) JApplication::ExitCode::UnhandledException);
0335 std::rethrow_exception(worker->stored_exception);
0336 }
0337 if (worker->is_timed_out) {
0338 GetApplication()->SetExitCode((int) JApplication::ExitCode::Timeout);
0339 auto ex = JException("Timeout in worker thread");
0340 ex.backtrace = worker->backtrace;
0341 throw ex;
0342 }
0343 }
0344 }
0345
0346 void JExecutionEngine::FinishTopology() {
0347 std::unique_lock<std::mutex> lock(m_mutex);
0348 assert(m_runstatus == RunStatus::Paused);
0349
0350 LOG_DEBUG(GetLogger()) << "Finishing processing..." << LOG_END;
0351 for (auto* arrow : m_topology->arrows) {
0352 arrow->finalize();
0353 }
0354 for (auto* pool: m_topology->pools) {
0355 pool->Finalize();
0356 }
0357 m_runstatus = RunStatus::Finished;
0358 LOG_INFO(GetLogger()) << "Finished processing." << LOG_END;
0359 }
0360
0361 JExecutionEngine::RunStatus JExecutionEngine::GetRunStatus() {
0362 std::unique_lock<std::mutex> lock(m_mutex);
0363 return m_runstatus;
0364 }
0365
0366 JExecutionEngine::Perf JExecutionEngine::GetPerf() {
0367 std::unique_lock<std::mutex> lock(m_mutex);
0368 Perf result;
0369 if (m_runstatus == RunStatus::Paused || m_runstatus == RunStatus::Failed) {
0370 result.event_count = m_event_count_at_finish - m_event_count_at_start;
0371 result.uptime_ms = std::chrono::duration_cast<std::chrono::milliseconds>(m_time_at_finish - m_time_at_start).count();
0372 }
0373 else {
0374
0375 size_t current_event_count = 0;
0376 for (auto& state : m_arrow_states) {
0377 if (state.is_sink) {
0378 current_event_count += state.events_processed;
0379 }
0380 }
0381 result.event_count = current_event_count - m_event_count_at_start;
0382 result.uptime_ms = std::chrono::duration_cast<std::chrono::milliseconds>(clock_t::now() - m_time_at_start).count();
0383 }
0384 result.runstatus = m_runstatus;
0385 result.thread_count = m_worker_states.size();
0386 result.throughput_hz = (result.uptime_ms == 0) ? 0 : (result.event_count * 1000.0) / result.uptime_ms;
0387 result.event_level = JEventLevel::PhysicsEvent;
0388 return result;
0389 }
0390
0391 JExecutionEngine::Worker JExecutionEngine::RegisterWorker() {
0392 std::unique_lock<std::mutex> lock(m_mutex);
0393 auto worker_id = m_worker_states.size();
0394 auto worker = std::make_unique<WorkerState>();
0395 worker->worker_id = worker_id;
0396 worker->is_stop_requested = false;
0397 worker->cpu_id = m_topology->mapping.get_cpu_id(worker_id);
0398 worker->location_id = m_topology->mapping.get_loc_id(worker_id);
0399 worker->thread = nullptr;
0400 m_worker_states.push_back(std::move(worker));
0401
0402 bool pin_to_cpu = (m_topology->mapping.get_affinity() != JProcessorMapping::AffinityStrategy::None);
0403 if (pin_to_cpu) {
0404 JCpuInfo::PinThreadToCpu(worker->thread, worker->cpu_id);
0405 }
0406 return {worker_id, &worker->backtrace};
0407
0408 }
0409
0410
0411 void JExecutionEngine::RunWorker(Worker worker) {
0412
0413 LOG_DEBUG(GetLogger()) << "Launched worker thread " << worker.worker_id << LOG_END;
0414 jana2_worker_id = worker.worker_id;
0415 jana2_worker_backtrace = worker.backtrace;
0416 try {
0417 Task task;
0418 while (true) {
0419 ExchangeTask(task, worker.worker_id);
0420 if (task.arrow == nullptr) break;
0421 task.arrow->fire(task.input_event, task.outputs, task.output_count, task.status);
0422 }
0423 LOG_DEBUG(GetLogger()) << "Stopped worker thread " << worker.worker_id << LOG_END;
0424 }
0425 catch (...) {
0426 LOG_ERROR(GetLogger()) << "Exception on worker thread " << worker.worker_id << LOG_END;
0427 std::unique_lock<std::mutex> lock(m_mutex);
0428 m_runstatus = RunStatus::Failed;
0429 m_worker_states.at(worker.worker_id)->stored_exception = std::current_exception();
0430 }
0431 }
0432
0433
0434 void JExecutionEngine::ExchangeTask(Task& task, size_t worker_id, bool nonblocking) {
0435
0436 auto checkin_time = std::chrono::steady_clock::now();
0437
0438
0439
0440 std::unique_lock<std::mutex> lock(m_mutex);
0441
0442 auto& worker = *m_worker_states.at(worker_id);
0443
0444 if (task.arrow != nullptr) {
0445 CheckinCompletedTask_Unsafe(task, worker, checkin_time);
0446 }
0447
0448 if (worker.is_stop_requested) {
0449 return;
0450 }
0451
0452 FindNextReadyTask_Unsafe(task, worker);
0453
0454 if (nonblocking) { return; }
0455 auto idle_time_start = clock_t::now();
0456 m_total_scheduler_duration += (idle_time_start - checkin_time);
0457
0458 while (task.arrow == nullptr && !worker.is_stop_requested) {
0459 m_condvar.wait(lock);
0460 FindNextReadyTask_Unsafe(task, worker);
0461 }
0462 worker.last_checkout_time = clock_t::now();
0463
0464 if (task.input_event != nullptr) {
0465 worker.last_event_nr = task.input_event->GetEventNumber();
0466 }
0467 else {
0468 worker.last_event_nr = 0;
0469 }
0470 m_total_idle_duration += (worker.last_checkout_time - idle_time_start);
0471
0472
0473
0474
0475 m_condvar.notify_one();
0476 }
0477
0478
0479 void JExecutionEngine::CheckinCompletedTask_Unsafe(Task& task, WorkerState& worker, clock_t::time_point checkin_time) {
0480
0481 auto processing_duration = checkin_time - worker.last_checkout_time;
0482
0483 ArrowState& arrow_state = m_arrow_states.at(worker.last_arrow_id);
0484
0485 arrow_state.active_tasks -= 1;
0486 arrow_state.total_processing_duration += processing_duration;
0487
0488 for (size_t output=0; output<task.output_count; ++output) {
0489 if (!task.arrow->get_port(task.outputs[output].second).is_input) {
0490 arrow_state.events_processed++;
0491 }
0492 }
0493
0494
0495 task.arrow->push(task.outputs, task.output_count, worker.location_id);
0496
0497 if (task.status == JArrow::FireResult::Finished) {
0498
0499
0500
0501
0502 arrow_state.status = ArrowState::Status::Finished;
0503
0504
0505 if (m_runstatus == RunStatus::Running) {
0506 bool draining = true;
0507 for (auto& arrow: m_arrow_states) {
0508 if (arrow.is_source && arrow.status == ArrowState::Status::Running) {
0509 draining = false;
0510 }
0511 }
0512 if (draining) {
0513 m_runstatus = RunStatus::Draining;
0514 }
0515 }
0516 }
0517 worker.last_arrow_id = -1;
0518 worker.last_event_nr = 0;
0519
0520 task.arrow = nullptr;
0521 task.input_event = nullptr;
0522 task.output_count = 0;
0523 task.status = JArrow::FireResult::NotRunYet;
0524 };
0525
0526
0527 void JExecutionEngine::FindNextReadyTask_Unsafe(Task& task, WorkerState& worker) {
0528
0529 if (m_runstatus == RunStatus::Running || m_runstatus == RunStatus::Draining) {
0530
0531
0532
0533 size_t arrow_count = m_arrow_states.size();
0534 m_next_arrow_id += 1;
0535 m_next_arrow_id %= arrow_count;
0536
0537 for (size_t i=m_next_arrow_id; i<(m_next_arrow_id+arrow_count); ++i) {
0538 size_t arrow_id = i % arrow_count;
0539
0540 auto& state = m_arrow_states[arrow_id];
0541 if (!state.is_parallel && (state.active_tasks != 0)) {
0542
0543 LOG_TRACE(GetLogger()) << "Scheduler: Arrow with id " << arrow_id << " is unready: Sequential and already active." << LOG_END;
0544 continue;
0545 }
0546
0547 if (state.status != ArrowState::Status::Running) {
0548 LOG_TRACE(GetLogger()) << "Scheduler: Arrow with id " << arrow_id << " is unready: Arrow is either paused or finished." << LOG_END;
0549 continue;
0550 }
0551
0552
0553
0554 JArrow* arrow = m_topology->arrows[arrow_id];
0555
0556 auto port = arrow->get_next_port_index();
0557 JEvent* event = (port == -1) ? nullptr : arrow->pull(port, worker.location_id);
0558 if (event != nullptr || port == -1) {
0559 LOG_TRACE(GetLogger()) << "Scheduler: Found next ready arrow with id " << arrow_id << LOG_END;
0560
0561 state.active_tasks += 1;
0562
0563 task.arrow = arrow;
0564 task.input_port = port;
0565 task.input_event = event;
0566 task.output_count = 0;
0567 task.status = JArrow::FireResult::NotRunYet;
0568
0569 worker.last_arrow_id = arrow_id;
0570 if (event != nullptr) {
0571 worker.is_event_warmed_up = event->IsWarmedUp();
0572 worker.last_event_nr = event->GetEventNumber();
0573 }
0574 else {
0575 worker.is_event_warmed_up = true;
0576 worker.last_event_nr = 0;
0577 }
0578 return;
0579 }
0580 else {
0581 LOG_TRACE(GetLogger()) << "Scheduler: Arrow with id " << arrow_id << " is unready: Input event is needed but not on queue yet." << LOG_END;
0582 }
0583 }
0584 }
0585
0586
0587
0588
0589
0590
0591 if (m_runstatus == RunStatus::Running || m_runstatus == RunStatus::Pausing || m_runstatus == RunStatus::Draining) {
0592
0593
0594
0595 bool any_active_source_found = false;
0596 bool any_active_task_found = false;
0597
0598 LOG_DEBUG(GetLogger()) << "Scheduler: No tasks ready" << LOG_END;
0599
0600 for (size_t arrow_id = 0; arrow_id < m_arrow_states.size(); ++arrow_id) {
0601 auto& state = m_arrow_states[arrow_id];
0602 any_active_source_found |= (state.status == ArrowState::Status::Running && state.is_source);
0603 any_active_task_found |= (state.active_tasks != 0);
0604
0605 }
0606
0607 if (!any_active_source_found && !any_active_task_found) {
0608
0609 m_time_at_finish = clock_t::now();
0610 m_event_count_at_finish = 0;
0611 for (auto& arrow_state : m_arrow_states) {
0612 if (arrow_state.is_sink) {
0613 m_event_count_at_finish += arrow_state.events_processed;
0614 }
0615 }
0616 LOG_DEBUG(GetLogger()) << "Scheduler: Processing paused" << LOG_END;
0617 m_runstatus = RunStatus::Paused;
0618
0619 }
0620 }
0621
0622 worker.last_arrow_id = -1;
0623
0624 task.arrow = nullptr;
0625 task.input_port = -1;
0626 task.input_event = nullptr;
0627 task.output_count = 0;
0628 task.status = JArrow::FireResult::NotRunYet;
0629 }
0630
0631
0632 void JExecutionEngine::PrintFinalReport() {
0633
0634 std::unique_lock<std::mutex> lock(m_mutex);
0635 auto event_count = m_event_count_at_finish - m_event_count_at_start;
0636 auto uptime_ms = std::chrono::duration_cast<std::chrono::milliseconds>(m_time_at_finish - m_time_at_start).count();
0637 auto thread_count = m_worker_states.size();
0638 auto throughput_hz = (event_count * 1000.0) / uptime_ms;
0639
0640 LOG_INFO(GetLogger()) << "Detailed report:" << LOG_END;
0641 LOG_INFO(GetLogger()) << LOG_END;
0642 LOG_INFO(GetLogger()) << " Avg throughput [Hz]: " << std::setprecision(3) << throughput_hz << LOG_END;
0643 LOG_INFO(GetLogger()) << " Completed events [count]: " << event_count << LOG_END;
0644 LOG_INFO(GetLogger()) << " Total uptime [s]: " << std::setprecision(4) << uptime_ms/1000.0 << LOG_END;
0645 LOG_INFO(GetLogger()) << " Thread team size [count]: " << thread_count << LOG_END;
0646 LOG_INFO(GetLogger()) << LOG_END;
0647 LOG_INFO(GetLogger()) << " Arrow-level metrics:" << LOG_END;
0648 LOG_INFO(GetLogger()) << LOG_END;
0649
0650 size_t total_useful_ms = 0;
0651
0652 for (size_t arrow_id=0; arrow_id < m_arrow_states.size(); ++arrow_id) {
0653 auto* arrow = m_topology->arrows[arrow_id];
0654 auto& arrow_state = m_arrow_states[arrow_id];
0655 auto useful_ms = std::chrono::duration_cast<std::chrono::milliseconds>(arrow_state.total_processing_duration).count();
0656 total_useful_ms += useful_ms;
0657 auto avg_latency = useful_ms*1.0/arrow_state.events_processed;
0658 auto throughput_bottleneck = 1000.0 / avg_latency;
0659 if (arrow->is_parallel()) {
0660 throughput_bottleneck *= thread_count;
0661 }
0662
0663 LOG_INFO(GetLogger()) << " - Arrow name: " << arrow->get_name() << LOG_END;
0664 LOG_INFO(GetLogger()) << " Parallel: " << arrow->is_parallel() << LOG_END;
0665 LOG_INFO(GetLogger()) << " Events completed: " << arrow_state.events_processed << LOG_END;
0666 LOG_INFO(GetLogger()) << " Avg latency [ms/event]: " << avg_latency << LOG_END;
0667 LOG_INFO(GetLogger()) << " Throughput bottleneck [Hz]: " << throughput_bottleneck << LOG_END;
0668 LOG_INFO(GetLogger()) << LOG_END;
0669 }
0670
0671 auto total_scheduler_ms = std::chrono::duration_cast<std::chrono::milliseconds>(m_total_scheduler_duration).count();
0672 auto total_idle_ms = std::chrono::duration_cast<std::chrono::milliseconds>(m_total_idle_duration).count();
0673
0674 LOG_INFO(GetLogger()) << " Total useful time [s]: " << std::setprecision(6) << total_useful_ms/1000.0 << LOG_END;
0675 LOG_INFO(GetLogger()) << " Total scheduler time [s]: " << std::setprecision(6) << total_scheduler_ms/1000.0 << LOG_END;
0676 LOG_INFO(GetLogger()) << " Total idle time [s]: " << std::setprecision(6) << total_idle_ms/1000.0 << LOG_END;
0677
0678 LOG_INFO(GetLogger()) << LOG_END;
0679
0680 LOG_INFO(GetLogger()) << "Final report: " << event_count << " events processed at "
0681 << JTypeInfo::to_string_with_si_prefix(throughput_hz) << "Hz" << LOG_END;
0682
0683 }
0684
0685 void JExecutionEngine::SetTickerEnabled(bool show_ticker) {
0686 m_show_ticker = show_ticker;
0687 }
0688
0689 bool JExecutionEngine::IsTickerEnabled() const {
0690 return m_show_ticker;
0691 }
0692
0693 void JExecutionEngine::SetTimeoutEnabled(bool timeout_enabled) {
0694 m_enable_timeout = timeout_enabled;
0695 }
0696
0697 bool JExecutionEngine::IsTimeoutEnabled() const {
0698 return m_enable_timeout;
0699 }
0700
0701 JArrow::FireResult JExecutionEngine::Fire(size_t arrow_id, size_t location_id) {
0702
0703 std::unique_lock<std::mutex> lock(m_mutex);
0704 if (arrow_id >= m_topology->arrows.size()) {
0705 LOG_WARN(GetLogger()) << "Firing unsuccessful: No arrow exists with id=" << arrow_id << LOG_END;
0706 return JArrow::FireResult::NotRunYet;
0707 }
0708 JArrow* arrow = m_topology->arrows[arrow_id];
0709 LOG_WARN(GetLogger()) << "Attempting to fire arrow with name=" << arrow->get_name()
0710 << ", index=" << arrow_id << ", location=" << location_id << LOG_END;
0711
0712 ArrowState& arrow_state = m_arrow_states[arrow_id];
0713 if (arrow_state.status == ArrowState::Status::Finished) {
0714 LOG_WARN(GetLogger()) << "Firing unsuccessful: Arrow status is Finished." << arrow_id << LOG_END;
0715 return JArrow::FireResult::Finished;
0716 }
0717 if (!arrow_state.is_parallel && arrow_state.active_tasks != 0) {
0718 LOG_WARN(GetLogger()) << "Firing unsuccessful: Arrow is sequential and already has an active task." << arrow_id << LOG_END;
0719 return JArrow::FireResult::NotRunYet;
0720 }
0721 arrow_state.active_tasks += 1;
0722
0723 auto port = arrow->get_next_port_index();
0724 JEvent* event = nullptr;
0725 if (port != -1) {
0726 event = arrow->pull(port, location_id);
0727 if (event == nullptr) {
0728 LOG_WARN(GetLogger()) << "Firing unsuccessful: Arrow needs an input event from port " << port << ", but the queue or pool is empty." << LOG_END;
0729 arrow_state.active_tasks -= 1;
0730 return JArrow::FireResult::NotRunYet;
0731 }
0732 else {
0733 LOG_WARN(GetLogger()) << "Input event #" << event->GetEventNumber() << " from port " << port << LOG_END;
0734 }
0735 }
0736 else {
0737 LOG_WARN(GetLogger()) << "No input events" << LOG_END;
0738 }
0739 lock.unlock();
0740
0741 size_t output_count;
0742 JArrow::OutputData outputs;
0743 JArrow::FireResult result = JArrow::FireResult::NotRunYet;
0744
0745 LOG_WARN(GetLogger()) << "Firing arrow" << LOG_END;
0746 arrow->fire(event, outputs, output_count, result);
0747 LOG_WARN(GetLogger()) << "Fired arrow with result " << to_string(result) << LOG_END;
0748 if (output_count == 0) {
0749 LOG_WARN(GetLogger()) << "No output events" << LOG_END;
0750 }
0751 else {
0752 for (size_t i=0; i<output_count; ++i) {
0753 LOG_WARN(GetLogger()) << "Output event #" << outputs.at(i).first->GetEventNumber() << " on port " << outputs.at(i).second << LOG_END;
0754 }
0755 }
0756
0757 lock.lock();
0758 arrow->push(outputs, output_count, location_id);
0759 arrow_state.active_tasks -= 1;
0760 lock.unlock();
0761 return result;
0762 }
0763
0764
0765 void JExecutionEngine::HandleSIGINT() {
0766 InterruptStatus status = m_interrupt_status;
0767 std::cout << std::endl;
0768 switch (status) {
0769 case InterruptStatus::NoInterruptsSupervised: m_interrupt_status = InterruptStatus::InspectRequested; break;
0770 case InterruptStatus::InspectRequested: m_interrupt_status = InterruptStatus::PauseAndQuit; break;
0771 case InterruptStatus::NoInterruptsUnsupervised:
0772 case InterruptStatus::PauseAndQuit:
0773 case InterruptStatus::InspectInProgress:
0774 _exit(-2);
0775 }
0776 }
0777
0778 void JExecutionEngine::HandleSIGUSR1() {
0779 m_send_worker_report_requested = true;
0780 }
0781
0782 void JExecutionEngine::HandleSIGUSR2() {
0783 if (jana2_worker_backtrace != nullptr) {
0784 jana2_worker_backtrace->Capture(3);
0785 }
0786 }
0787
0788 void JExecutionEngine::HandleSIGTSTP() {
0789 std::cout << std::endl;
0790 m_print_worker_report_requested = true;
0791 }
0792
0793 void JExecutionEngine::PrintWorkerReport(bool send_to_pipe) {
0794
0795 std::unique_lock<std::mutex> lock(m_mutex);
0796 LOG_INFO(GetLogger()) << "Generating worker report. It may take some time to retrieve each symbol's debug information." << LOG_END;
0797 for (auto& worker: m_worker_states) {
0798 worker->backtrace.Reset();
0799 pthread_kill(worker->thread->native_handle(), SIGUSR2);
0800 }
0801 for (auto& worker: m_worker_states) {
0802 worker->backtrace.WaitForCapture();
0803 }
0804 std::ostringstream oss;
0805 oss << "Worker report" << std::endl;
0806 for (auto& worker: m_worker_states) {
0807 oss << "------------------------------" << std::endl
0808 << " Worker: " << worker->worker_id << std::endl
0809 << " Current arrow: " << worker->last_arrow_id << std::endl
0810 << " Current event: " << worker->last_event_nr << std::endl
0811 << " Backtrace:" << std::endl << std::endl
0812 << worker->backtrace.ToString();
0813 }
0814 auto s = oss.str();
0815 LOG_WARN(GetLogger()) << s << LOG_END;
0816
0817 if (send_to_pipe) {
0818
0819 int fd = open(m_path_to_named_pipe.c_str(), O_WRONLY);
0820 if (fd >= 0) {
0821 write(fd, s.c_str(), s.length()+1);
0822 close(fd);
0823 }
0824 else {
0825 LOG_ERROR(GetLogger()) << "Unable to open named pipe '" << m_path_to_named_pipe << "' for writing. \n"
0826 << " You can use a different named pipe for status info by setting the parameter `jana:status_fname`.\n"
0827 << " The status report will still show up in the log." << LOG_END;
0828 }
0829 }
0830 }
0831
0832
0833 std::string ToString(JExecutionEngine::RunStatus runstatus) {
0834 switch(runstatus) {
0835 case JExecutionEngine::RunStatus::Running: return "Running";
0836 case JExecutionEngine::RunStatus::Paused: return "Paused";
0837 case JExecutionEngine::RunStatus::Failed: return "Failed";
0838 case JExecutionEngine::RunStatus::Pausing: return "Pausing";
0839 case JExecutionEngine::RunStatus::Draining: return "Draining";
0840 case JExecutionEngine::RunStatus::Finished: return "Finished";
0841 default: return "CorruptedRunStatus";
0842 }
0843 }
0844