File indexing completed on 2025-10-28 09:03:51
0001
0002 #include "JExecutionEngine.h"
0003 #include <JANA/Utils/JApplicationInspector.h>
0004
0005 #include <chrono>
0006 #include <cstddef>
0007 #include <cstdio>
0008 #include <ctime>
0009 #include <exception>
0010 #include <mutex>
0011 #include <csignal>
0012 #include <ostream>
0013 #include <sstream>
0014 #include <sys/stat.h>
0015 #include <fcntl.h>
0016 #include <unistd.h>
0017
0018 thread_local int jana2_worker_id = -1;
0019 thread_local JBacktrace* jana2_worker_backtrace = nullptr;
0020
0021 void JExecutionEngine::Init() {
0022 auto params = GetApplication()->GetJParameterManager();
0023
0024 params->SetDefaultParameter("jana:timeout", m_timeout_s,
0025 "Max time (in seconds) JANA will wait for a thread to update its heartbeat before hard-exiting. 0 to disable timeout completely.");
0026
0027 params->SetDefaultParameter("jana:warmup_timeout", m_warmup_timeout_s,
0028 "Max time (in seconds) JANA will wait for 'initial' events to complete before hard-exiting.");
0029
0030 params->SetDefaultParameter("jana:backoff_interval", m_backoff_ms,
0031 "Max time (in seconds) JANA will wait for 'initial' events to complete before hard-exiting.");
0032
0033 params->SetDefaultParameter("jana:show_ticker", m_show_ticker, "Controls whether the ticker is visible");
0034
0035 params->SetDefaultParameter("jana:ticker_interval", m_ticker_ms, "Controls the ticker interval (in ms)");
0036
0037 auto p = params->SetDefaultParameter("jana:status_fname", m_path_to_named_pipe,
0038 "Filename of named pipe for retrieving instantaneous status info");
0039
0040 size_t pid = getpid();
0041 mkfifo(m_path_to_named_pipe.c_str(), 0666);
0042
0043 LOG_WARN(GetLogger()) << "To pause processing and inspect, press Ctrl-C." << LOG_END;
0044 LOG_WARN(GetLogger()) << "For a clean shutdown, press Ctrl-C twice." << LOG_END;
0045 LOG_WARN(GetLogger()) << "For a hard shutdown, press Ctrl-C three times." << LOG_END;
0046
0047 if (p->IsDefault()) {
0048 LOG_WARN(GetLogger()) << "For worker status information, press Ctrl-Z, or run `jana-status " << pid << "`" << LOG_END;
0049 }
0050 else {
0051 LOG_WARN(GetLogger()) << "For worker status information, press Ctrl-Z, or run `jana-status " << pid << " " << m_path_to_named_pipe << "`" << LOG_END;
0052 }
0053
0054
0055
0056
0057
0058 for (JArrow* arrow : m_topology->arrows) {
0059
0060 arrow->initialize();
0061
0062 m_arrow_states.emplace_back();
0063 auto& arrow_state = m_arrow_states.back();
0064 arrow_state.is_source = arrow->is_source();
0065 arrow_state.is_sink = arrow->is_sink();
0066 arrow_state.is_parallel = arrow->is_parallel();
0067 arrow_state.next_input = arrow->get_next_port_index();
0068 }
0069 }
0070
0071 void JExecutionEngine::RequestInspector() {
0072 std::unique_lock<std::mutex> lock(m_mutex);
0073 m_interrupt_status = InterruptStatus::InspectRequested;
0074 }
0075
0076 void JExecutionEngine::RunTopology() {
0077 std::unique_lock<std::mutex> lock(m_mutex);
0078
0079 if (m_runstatus == RunStatus::Failed) {
0080 throw JException("Cannot switch topology runstatus to Running because it is already Failed");
0081 }
0082 if (m_runstatus == RunStatus::Finished) {
0083 throw JException("Cannot switch topology runstatus to Running because it is already Finished");
0084 }
0085 if (m_arrow_states.size() == 0) {
0086 throw JException("Cannot execute an empty topology! Hint: Have you provided an event source?");
0087 }
0088
0089
0090 m_time_at_start = clock_t::now();
0091 m_event_count_at_start = m_event_count_at_finish;
0092
0093
0094 for (auto& arrow: m_arrow_states) {
0095 if (arrow.status == ArrowState::Status::Paused) {
0096 arrow.status = ArrowState::Status::Running;
0097 }
0098 }
0099
0100 m_runstatus = RunStatus::Running;
0101
0102 lock.unlock();
0103 m_condvar.notify_one();
0104 }
0105
0106 void JExecutionEngine::ScaleWorkers(size_t nthreads) {
0107
0108
0109
0110
0111
0112
0113
0114
0115 std::unique_lock<std::mutex> lock(m_mutex);
0116
0117 if (nthreads != 0 && m_arrow_states.size() == 0) {
0118
0119
0120 throw JException("Cannot execute an empty topology! Hint: Have you provided an event source?");
0121 }
0122
0123 auto prev_nthreads = m_worker_states.size();
0124
0125 if (prev_nthreads < nthreads) {
0126
0127 LOG_DEBUG(GetLogger()) << "Scaling up to " << nthreads << " worker threads" << LOG_END;
0128 for (size_t worker_id=prev_nthreads; worker_id < nthreads; ++worker_id) {
0129 auto worker = std::make_unique<WorkerState>();
0130 worker->worker_id = worker_id;
0131 worker->is_stop_requested = false;
0132 worker->cpu_id = m_topology->mapping.get_cpu_id(worker_id);
0133 worker->location_id = m_topology->mapping.get_loc_id(worker_id);
0134 worker->thread = new std::thread(&JExecutionEngine::RunWorker, this, Worker{worker_id, &worker->backtrace});
0135 LOG_DEBUG(GetLogger()) << "Launching worker thread " << worker_id << " on cpu=" << worker->cpu_id << ", location=" << worker->location_id << LOG_END;
0136 m_worker_states.push_back(std::move(worker));
0137
0138 bool pin_to_cpu = (m_topology->mapping.get_affinity() != JProcessorMapping::AffinityStrategy::None);
0139 if (pin_to_cpu) {
0140 JCpuInfo::PinThreadToCpu(worker->thread, worker->cpu_id);
0141 }
0142 }
0143 }
0144
0145 else if (prev_nthreads > nthreads) {
0146
0147 LOG_DEBUG(GetLogger()) << "Scaling down to " << nthreads << " worker threads" << LOG_END;
0148
0149
0150 for (int worker_id=prev_nthreads-1; worker_id >= (int)nthreads; --worker_id) {
0151 LOG_DEBUG(GetLogger()) << "Stopping worker " << worker_id << LOG_END;
0152 m_worker_states[worker_id]->is_stop_requested = true;
0153 }
0154 m_condvar.notify_all();
0155 lock.unlock();
0156
0157
0158 for (int worker_id=prev_nthreads-1; worker_id >= (int) nthreads; --worker_id) {
0159 if (m_worker_states[worker_id]->thread != nullptr) {
0160 if (m_worker_states[worker_id]->is_timed_out) {
0161
0162
0163
0164 m_worker_states[worker_id]->thread->detach();
0165 LOG_DEBUG(GetLogger()) << "Detached worker " << worker_id << LOG_END;
0166 }
0167 else {
0168 LOG_DEBUG(GetLogger()) << "Joining worker " << worker_id << LOG_END;
0169 m_worker_states[worker_id]->thread->join();
0170 LOG_DEBUG(GetLogger()) << "Joined worker " << worker_id << LOG_END;
0171 }
0172 }
0173 else {
0174 LOG_DEBUG(GetLogger()) << "Skipping worker " << worker_id << LOG_END;
0175 }
0176 }
0177
0178 lock.lock();
0179
0180 for (int worker_id=prev_nthreads-1; worker_id >= (int)nthreads; --worker_id) {
0181 if (m_worker_states.back()->thread != nullptr) {
0182 delete m_worker_states.back()->thread;
0183 }
0184 m_worker_states.pop_back();
0185 }
0186 }
0187 }
0188
0189 void JExecutionEngine::PauseTopology() {
0190 std::unique_lock<std::mutex> lock(m_mutex);
0191 if (m_runstatus != RunStatus::Running) return;
0192 m_runstatus = RunStatus::Pausing;
0193 for (auto& arrow: m_arrow_states) {
0194 if (arrow.status == ArrowState::Status::Running) {
0195 arrow.status = ArrowState::Status::Paused;
0196 }
0197 }
0198 LOG_WARN(GetLogger()) << "Requested pause" << LOG_END;
0199 lock.unlock();
0200 m_condvar.notify_all();
0201 }
0202
0203 void JExecutionEngine::DrainTopology() {
0204 std::unique_lock<std::mutex> lock(m_mutex);
0205 if (m_runstatus != RunStatus::Running) return;
0206 m_runstatus = RunStatus::Draining;
0207 for (auto& arrow: m_arrow_states) {
0208 if (arrow.is_source) {
0209 if (arrow.status == ArrowState::Status::Running) {
0210 arrow.status = ArrowState::Status::Paused;
0211 }
0212 }
0213 }
0214 LOG_WARN(GetLogger()) << "Requested drain" << LOG_END;
0215 lock.unlock();
0216 m_condvar.notify_all();
0217 }
0218
0219 void JExecutionEngine::RunSupervisor() {
0220
0221 if (m_interrupt_status == InterruptStatus::NoInterruptsUnsupervised) {
0222 m_interrupt_status = InterruptStatus::NoInterruptsSupervised;
0223 }
0224 size_t last_event_count = 0;
0225 clock_t::time_point last_measurement_time = clock_t::now();
0226
0227 Perf perf;
0228 while (true) {
0229
0230 if (m_enable_timeout && m_timeout_s > 0) {
0231 CheckTimeout();
0232 }
0233
0234 if (m_print_worker_report_requested) {
0235 PrintWorkerReport(false);
0236 m_print_worker_report_requested = false;
0237 }
0238
0239 if (m_send_worker_report_requested) {
0240 PrintWorkerReport(true);
0241 m_send_worker_report_requested = false;
0242 }
0243
0244 perf = GetPerf();
0245 if ((perf.runstatus == RunStatus::Paused && m_interrupt_status != InterruptStatus::InspectRequested) ||
0246 perf.runstatus == RunStatus::Finished ||
0247 perf.runstatus == RunStatus::Failed) {
0248 break;
0249 }
0250
0251 if (m_interrupt_status == InterruptStatus::InspectRequested) {
0252 if (perf.runstatus == RunStatus::Paused) {
0253 LOG_INFO(GetLogger()) << "Entering inspector" << LOG_END;
0254 m_enable_timeout = false;
0255 m_interrupt_status = InterruptStatus::InspectInProgress;
0256 InspectApplication(GetApplication());
0257 m_interrupt_status = InterruptStatus::NoInterruptsSupervised;
0258
0259
0260 last_measurement_time = clock_t::now();
0261 last_event_count = 0;
0262 continue;
0263 }
0264 else if (perf.runstatus == RunStatus::Running) {
0265 PauseTopology();
0266 }
0267 }
0268 else if (m_interrupt_status == InterruptStatus::PauseAndQuit) {
0269 PauseTopology();
0270 }
0271
0272 if (m_show_ticker) {
0273 auto next_measurement_time = clock_t::now();
0274 auto last_measurement_duration_ms = std::chrono::duration_cast<std::chrono::milliseconds>(next_measurement_time - last_measurement_time).count();
0275 float latest_throughput_hz = (last_measurement_duration_ms == 0) ? 0 : (perf.event_count - last_event_count) * 1000.0 / last_measurement_duration_ms;
0276 last_measurement_time = next_measurement_time;
0277 last_event_count = perf.event_count;
0278
0279
0280 LOG_INFO(m_logger) << "Status: " << perf.event_count << " events processed at "
0281 << JTypeInfo::to_string_with_si_prefix(latest_throughput_hz) << "Hz ("
0282 << JTypeInfo::to_string_with_si_prefix(perf.throughput_hz) << "Hz avg)" << LOG_END;
0283 }
0284
0285 std::this_thread::sleep_for(std::chrono::milliseconds(m_ticker_ms));
0286 }
0287 LOG_INFO(GetLogger()) << "Processing paused." << LOG_END;
0288
0289 if (perf.runstatus == RunStatus::Failed) {
0290 HandleFailures();
0291 }
0292
0293 PrintFinalReport();
0294 }
0295
0296 bool JExecutionEngine::CheckTimeout() {
0297 std::unique_lock<std::mutex> lock(m_mutex);
0298 auto now = clock_t::now();
0299 bool timeout_detected = false;
0300 for (auto& worker: m_worker_states) {
0301 auto timeout_s = (worker->is_event_warmed_up) ? m_timeout_s : m_warmup_timeout_s;
0302 auto duration_s = std::chrono::duration_cast<std::chrono::seconds>(now - worker->last_checkout_time).count();
0303 if (duration_s > timeout_s && worker->last_arrow_id != static_cast<uint64_t>(-1)) {
0304 worker->is_timed_out = true;
0305 timeout_detected = true;
0306 m_runstatus = RunStatus::Failed;
0307 }
0308 }
0309 return timeout_detected;
0310 }
0311
0312 void JExecutionEngine::HandleFailures() {
0313
0314 std::unique_lock<std::mutex> lock(m_mutex);
0315
0316
0317 for (auto& worker: m_worker_states) {
0318 if (worker->is_timed_out) {
0319 std::string arrow_name = (worker->last_arrow_id == static_cast<uint64_t>(-1)) ? "(none)" : m_topology->arrows[worker->last_arrow_id]->get_name();
0320 LOG_FATAL(GetLogger()) << "Timeout in worker thread " << worker->worker_id << " while executing " << arrow_name << " on event #" << worker->last_event_nr << LOG_END;
0321 pthread_kill(worker->thread->native_handle(), SIGUSR2);
0322 LOG_INFO(GetLogger()) << "Worker thread signalled; waiting for backtrace capture." << LOG_END;
0323 worker->backtrace.WaitForCapture();
0324 }
0325 if (worker->stored_exception != nullptr) {
0326 std::string arrow_name = (worker->last_arrow_id == static_cast<uint64_t>(-1)) ? "(none)" : m_topology->arrows[worker->last_arrow_id]->get_name();
0327 LOG_FATAL(GetLogger()) << "Exception in worker thread " << worker->worker_id << " while executing " << arrow_name << " on event #" << worker->last_event_nr << LOG_END;
0328 }
0329 }
0330
0331
0332
0333 for (auto& worker: m_worker_states) {
0334 if (worker->stored_exception != nullptr) {
0335 GetApplication()->SetExitCode((int) JApplication::ExitCode::UnhandledException);
0336 std::rethrow_exception(worker->stored_exception);
0337 }
0338 if (worker->is_timed_out) {
0339 GetApplication()->SetExitCode((int) JApplication::ExitCode::Timeout);
0340 auto ex = JException("Timeout in worker thread");
0341 ex.backtrace = worker->backtrace;
0342 throw ex;
0343 }
0344 }
0345 }
0346
0347 void JExecutionEngine::FinishTopology() {
0348 std::unique_lock<std::mutex> lock(m_mutex);
0349 assert(m_runstatus == RunStatus::Paused);
0350
0351 LOG_DEBUG(GetLogger()) << "Finishing processing..." << LOG_END;
0352 for (auto* arrow : m_topology->arrows) {
0353 arrow->finalize();
0354 }
0355 for (auto* pool: m_topology->pools) {
0356 pool->Finalize();
0357 }
0358 m_runstatus = RunStatus::Finished;
0359 LOG_INFO(GetLogger()) << "Finished processing." << LOG_END;
0360 }
0361
0362 JExecutionEngine::RunStatus JExecutionEngine::GetRunStatus() {
0363 std::unique_lock<std::mutex> lock(m_mutex);
0364 return m_runstatus;
0365 }
0366
0367 JExecutionEngine::Perf JExecutionEngine::GetPerf() {
0368 std::unique_lock<std::mutex> lock(m_mutex);
0369 Perf result;
0370 if (m_runstatus == RunStatus::Paused || m_runstatus == RunStatus::Failed) {
0371 result.event_count = m_event_count_at_finish - m_event_count_at_start;
0372 result.uptime_ms = std::chrono::duration_cast<std::chrono::milliseconds>(m_time_at_finish - m_time_at_start).count();
0373 }
0374 else {
0375
0376 size_t current_event_count = 0;
0377 for (auto& state : m_arrow_states) {
0378 if (state.is_sink) {
0379 current_event_count += state.events_processed;
0380 }
0381 }
0382 result.event_count = current_event_count - m_event_count_at_start;
0383 result.uptime_ms = std::chrono::duration_cast<std::chrono::milliseconds>(clock_t::now() - m_time_at_start).count();
0384 }
0385 result.runstatus = m_runstatus;
0386 result.thread_count = m_worker_states.size();
0387 result.throughput_hz = (result.uptime_ms == 0) ? 0 : (result.event_count * 1000.0) / result.uptime_ms;
0388 result.event_level = JEventLevel::PhysicsEvent;
0389 return result;
0390 }
0391
0392 JExecutionEngine::Worker JExecutionEngine::RegisterWorker() {
0393 std::unique_lock<std::mutex> lock(m_mutex);
0394 auto worker_id = m_worker_states.size();
0395 auto worker = std::make_unique<WorkerState>();
0396 worker->worker_id = worker_id;
0397 worker->is_stop_requested = false;
0398 worker->cpu_id = m_topology->mapping.get_cpu_id(worker_id);
0399 worker->location_id = m_topology->mapping.get_loc_id(worker_id);
0400 worker->thread = nullptr;
0401 m_worker_states.push_back(std::move(worker));
0402
0403 bool pin_to_cpu = (m_topology->mapping.get_affinity() != JProcessorMapping::AffinityStrategy::None);
0404 if (pin_to_cpu) {
0405 JCpuInfo::PinThreadToCpu(worker->thread, worker->cpu_id);
0406 }
0407 return {worker_id, &worker->backtrace};
0408
0409 }
0410
0411
0412 void JExecutionEngine::RunWorker(Worker worker) {
0413
0414 LOG_DEBUG(GetLogger()) << "Launched worker thread " << worker.worker_id << LOG_END;
0415 jana2_worker_id = worker.worker_id;
0416 jana2_worker_backtrace = worker.backtrace;
0417 try {
0418 Task task;
0419 while (true) {
0420 ExchangeTask(task, worker.worker_id);
0421 if (task.arrow == nullptr) break;
0422 task.arrow->fire(task.input_event, task.outputs, task.output_count, task.status);
0423 }
0424 LOG_DEBUG(GetLogger()) << "Stopped worker thread " << worker.worker_id << LOG_END;
0425 }
0426 catch(JException& ex) {
0427 LOG_ERROR(GetLogger()) << "Exception on worker thread " << worker.worker_id << ": " << ex.GetMessage();
0428 std::unique_lock<std::mutex> lock(m_mutex);
0429 m_runstatus = RunStatus::Failed;
0430 m_worker_states.at(worker.worker_id)->stored_exception = std::current_exception();
0431 }
0432 catch (...) {
0433 LOG_ERROR(GetLogger()) << "Exception on worker thread " << worker.worker_id << LOG_END;
0434 std::unique_lock<std::mutex> lock(m_mutex);
0435 m_runstatus = RunStatus::Failed;
0436 m_worker_states.at(worker.worker_id)->stored_exception = std::current_exception();
0437 }
0438 }
0439
0440
0441 void JExecutionEngine::ExchangeTask(Task& task, size_t worker_id, bool nonblocking) {
0442
0443 auto checkin_time = std::chrono::steady_clock::now();
0444
0445
0446
0447 std::unique_lock<std::mutex> lock(m_mutex);
0448
0449 auto& worker = *m_worker_states.at(worker_id);
0450
0451 if (task.arrow != nullptr) {
0452 CheckinCompletedTask_Unsafe(task, worker, checkin_time);
0453 }
0454
0455 if (worker.is_stop_requested) {
0456 return;
0457 }
0458
0459 FindNextReadyTask_Unsafe(task, worker);
0460
0461 if (nonblocking) { return; }
0462 auto idle_time_start = clock_t::now();
0463 m_total_scheduler_duration += (idle_time_start - checkin_time);
0464
0465 while (task.arrow == nullptr && !worker.is_stop_requested) {
0466 m_condvar.wait(lock);
0467 FindNextReadyTask_Unsafe(task, worker);
0468 }
0469 worker.last_checkout_time = clock_t::now();
0470
0471 if (task.input_event != nullptr) {
0472 worker.last_event_nr = task.input_event->GetEventNumber();
0473 }
0474 else {
0475 worker.last_event_nr = 0;
0476 }
0477 m_total_idle_duration += (worker.last_checkout_time - idle_time_start);
0478
0479
0480
0481
0482 m_condvar.notify_one();
0483 }
0484
0485
0486 void JExecutionEngine::CheckinCompletedTask_Unsafe(Task& task, WorkerState& worker, clock_t::time_point checkin_time) {
0487
0488 auto processing_duration = checkin_time - worker.last_checkout_time;
0489
0490 ArrowState& arrow_state = m_arrow_states.at(worker.last_arrow_id);
0491
0492 arrow_state.active_tasks -= 1;
0493 arrow_state.total_processing_duration += processing_duration;
0494
0495 for (size_t output=0; output<task.output_count; ++output) {
0496 if (!task.arrow->get_port(task.outputs[output].second).is_input) {
0497 arrow_state.events_processed++;
0498 }
0499 }
0500
0501
0502 task.arrow->push(task.outputs, task.output_count, worker.location_id);
0503
0504 if (task.status == JArrow::FireResult::Finished) {
0505
0506
0507
0508
0509 arrow_state.status = ArrowState::Status::Finished;
0510
0511
0512 if (m_runstatus == RunStatus::Running) {
0513 bool draining = true;
0514 for (auto& arrow: m_arrow_states) {
0515 if (arrow.is_source && arrow.status == ArrowState::Status::Running) {
0516 draining = false;
0517 }
0518 }
0519 if (draining) {
0520 m_runstatus = RunStatus::Draining;
0521 }
0522 }
0523 }
0524 worker.last_arrow_id = -1;
0525 worker.last_event_nr = 0;
0526
0527 task.arrow = nullptr;
0528 task.input_event = nullptr;
0529 task.output_count = 0;
0530 task.status = JArrow::FireResult::NotRunYet;
0531 };
0532
0533
0534 void JExecutionEngine::FindNextReadyTask_Unsafe(Task& task, WorkerState& worker) {
0535
0536 if (m_runstatus == RunStatus::Running || m_runstatus == RunStatus::Draining) {
0537
0538
0539
0540 size_t arrow_count = m_arrow_states.size();
0541 m_next_arrow_id += 1;
0542 m_next_arrow_id %= arrow_count;
0543
0544 for (size_t i=m_next_arrow_id; i<(m_next_arrow_id+arrow_count); ++i) {
0545 size_t arrow_id = i % arrow_count;
0546
0547 auto& state = m_arrow_states[arrow_id];
0548 if (!state.is_parallel && (state.active_tasks != 0)) {
0549
0550 LOG_TRACE(GetLogger()) << "Scheduler: Arrow with id " << arrow_id << " is unready: Sequential and already active." << LOG_END;
0551 continue;
0552 }
0553
0554 if (state.status != ArrowState::Status::Running) {
0555 LOG_TRACE(GetLogger()) << "Scheduler: Arrow with id " << arrow_id << " is unready: Arrow is either paused or finished." << LOG_END;
0556 continue;
0557 }
0558
0559
0560
0561 JArrow* arrow = m_topology->arrows[arrow_id];
0562
0563 auto port = arrow->get_next_port_index();
0564 JEvent* event = (port == -1) ? nullptr : arrow->pull(port, worker.location_id);
0565 if (event != nullptr || port == -1) {
0566 LOG_TRACE(GetLogger()) << "Scheduler: Found next ready arrow with id " << arrow_id << LOG_END;
0567
0568 state.active_tasks += 1;
0569
0570 task.arrow = arrow;
0571 task.input_port = port;
0572 task.input_event = event;
0573 task.output_count = 0;
0574 task.status = JArrow::FireResult::NotRunYet;
0575
0576 worker.last_arrow_id = arrow_id;
0577 if (event != nullptr) {
0578 worker.is_event_warmed_up = event->IsWarmedUp();
0579 worker.last_event_nr = event->GetEventNumber();
0580 }
0581 else {
0582 worker.is_event_warmed_up = true;
0583 worker.last_event_nr = 0;
0584 }
0585 return;
0586 }
0587 else {
0588 LOG_TRACE(GetLogger()) << "Scheduler: Arrow with id " << arrow_id << " is unready: Input event is needed but not on queue yet." << LOG_END;
0589 }
0590 }
0591 }
0592
0593
0594
0595
0596
0597
0598 if (m_runstatus == RunStatus::Running || m_runstatus == RunStatus::Pausing || m_runstatus == RunStatus::Draining) {
0599
0600
0601
0602 bool any_active_source_found = false;
0603 bool any_active_task_found = false;
0604
0605 LOG_DEBUG(GetLogger()) << "Scheduler: No tasks ready" << LOG_END;
0606
0607 for (size_t arrow_id = 0; arrow_id < m_arrow_states.size(); ++arrow_id) {
0608 auto& state = m_arrow_states[arrow_id];
0609 any_active_source_found |= (state.status == ArrowState::Status::Running && state.is_source);
0610 any_active_task_found |= (state.active_tasks != 0);
0611
0612 }
0613
0614 if (!any_active_source_found && !any_active_task_found) {
0615
0616 m_time_at_finish = clock_t::now();
0617 m_event_count_at_finish = 0;
0618 for (auto& arrow_state : m_arrow_states) {
0619 if (arrow_state.is_sink) {
0620 m_event_count_at_finish += arrow_state.events_processed;
0621 }
0622 }
0623 LOG_DEBUG(GetLogger()) << "Scheduler: Processing paused" << LOG_END;
0624 m_runstatus = RunStatus::Paused;
0625
0626 }
0627 }
0628
0629 worker.last_arrow_id = -1;
0630
0631 task.arrow = nullptr;
0632 task.input_port = -1;
0633 task.input_event = nullptr;
0634 task.output_count = 0;
0635 task.status = JArrow::FireResult::NotRunYet;
0636 }
0637
0638
0639 void JExecutionEngine::PrintFinalReport() {
0640
0641 std::unique_lock<std::mutex> lock(m_mutex);
0642 auto event_count = m_event_count_at_finish - m_event_count_at_start;
0643 auto uptime_ms = std::chrono::duration_cast<std::chrono::milliseconds>(m_time_at_finish - m_time_at_start).count();
0644 auto thread_count = m_worker_states.size();
0645 auto throughput_hz = (event_count * 1000.0) / uptime_ms;
0646
0647 LOG_INFO(GetLogger()) << "Detailed report:" << LOG_END;
0648 LOG_INFO(GetLogger()) << LOG_END;
0649 LOG_INFO(GetLogger()) << " Avg throughput [Hz]: " << std::setprecision(3) << throughput_hz << LOG_END;
0650 LOG_INFO(GetLogger()) << " Completed events [count]: " << event_count << LOG_END;
0651 LOG_INFO(GetLogger()) << " Total uptime [s]: " << std::setprecision(4) << uptime_ms/1000.0 << LOG_END;
0652 LOG_INFO(GetLogger()) << " Thread team size [count]: " << thread_count << LOG_END;
0653 LOG_INFO(GetLogger()) << LOG_END;
0654 LOG_INFO(GetLogger()) << " Arrow-level metrics:" << LOG_END;
0655 LOG_INFO(GetLogger()) << LOG_END;
0656
0657 size_t total_useful_ms = 0;
0658
0659 for (size_t arrow_id=0; arrow_id < m_arrow_states.size(); ++arrow_id) {
0660 auto* arrow = m_topology->arrows[arrow_id];
0661 auto& arrow_state = m_arrow_states[arrow_id];
0662 auto useful_ms = std::chrono::duration_cast<std::chrono::milliseconds>(arrow_state.total_processing_duration).count();
0663 total_useful_ms += useful_ms;
0664 auto avg_latency = useful_ms*1.0/arrow_state.events_processed;
0665 auto throughput_bottleneck = 1000.0 / avg_latency;
0666 if (arrow->is_parallel()) {
0667 throughput_bottleneck *= thread_count;
0668 }
0669
0670 LOG_INFO(GetLogger()) << " - Arrow name: " << arrow->get_name() << LOG_END;
0671 LOG_INFO(GetLogger()) << " Parallel: " << arrow->is_parallel() << LOG_END;
0672 LOG_INFO(GetLogger()) << " Events completed: " << arrow_state.events_processed << LOG_END;
0673 LOG_INFO(GetLogger()) << " Avg latency [ms/event]: " << avg_latency << LOG_END;
0674 LOG_INFO(GetLogger()) << " Throughput bottleneck [Hz]: " << throughput_bottleneck << LOG_END;
0675 LOG_INFO(GetLogger()) << LOG_END;
0676 }
0677
0678 auto total_scheduler_ms = std::chrono::duration_cast<std::chrono::milliseconds>(m_total_scheduler_duration).count();
0679 auto total_idle_ms = std::chrono::duration_cast<std::chrono::milliseconds>(m_total_idle_duration).count();
0680
0681 LOG_INFO(GetLogger()) << " Total useful time [s]: " << std::setprecision(6) << total_useful_ms/1000.0 << LOG_END;
0682 LOG_INFO(GetLogger()) << " Total scheduler time [s]: " << std::setprecision(6) << total_scheduler_ms/1000.0 << LOG_END;
0683 LOG_INFO(GetLogger()) << " Total idle time [s]: " << std::setprecision(6) << total_idle_ms/1000.0 << LOG_END;
0684
0685 LOG_INFO(GetLogger()) << LOG_END;
0686
0687 LOG_INFO(GetLogger()) << "Final report: " << event_count << " events processed at "
0688 << JTypeInfo::to_string_with_si_prefix(throughput_hz) << "Hz" << LOG_END;
0689
0690 }
0691
0692 void JExecutionEngine::SetTickerEnabled(bool show_ticker) {
0693 m_show_ticker = show_ticker;
0694 }
0695
0696 bool JExecutionEngine::IsTickerEnabled() const {
0697 return m_show_ticker;
0698 }
0699
0700 void JExecutionEngine::SetTimeoutEnabled(bool timeout_enabled) {
0701 m_enable_timeout = timeout_enabled;
0702 }
0703
0704 bool JExecutionEngine::IsTimeoutEnabled() const {
0705 return m_enable_timeout;
0706 }
0707
0708 JArrow::FireResult JExecutionEngine::Fire(size_t arrow_id, size_t location_id) {
0709
0710 std::unique_lock<std::mutex> lock(m_mutex);
0711 if (arrow_id >= m_topology->arrows.size()) {
0712 LOG_WARN(GetLogger()) << "Firing unsuccessful: No arrow exists with id=" << arrow_id << LOG_END;
0713 return JArrow::FireResult::NotRunYet;
0714 }
0715 JArrow* arrow = m_topology->arrows[arrow_id];
0716 LOG_WARN(GetLogger()) << "Attempting to fire arrow with name=" << arrow->get_name()
0717 << ", index=" << arrow_id << ", location=" << location_id << LOG_END;
0718
0719 ArrowState& arrow_state = m_arrow_states[arrow_id];
0720 if (arrow_state.status == ArrowState::Status::Finished) {
0721 LOG_WARN(GetLogger()) << "Firing unsuccessful: Arrow status is Finished." << arrow_id << LOG_END;
0722 return JArrow::FireResult::Finished;
0723 }
0724 if (!arrow_state.is_parallel && arrow_state.active_tasks != 0) {
0725 LOG_WARN(GetLogger()) << "Firing unsuccessful: Arrow is sequential and already has an active task." << arrow_id << LOG_END;
0726 return JArrow::FireResult::NotRunYet;
0727 }
0728 arrow_state.active_tasks += 1;
0729
0730 auto port = arrow->get_next_port_index();
0731 JEvent* event = nullptr;
0732 if (port != -1) {
0733 event = arrow->pull(port, location_id);
0734 if (event == nullptr) {
0735 LOG_WARN(GetLogger()) << "Firing unsuccessful: Arrow needs an input event from port " << port << ", but the queue or pool is empty." << LOG_END;
0736 arrow_state.active_tasks -= 1;
0737 return JArrow::FireResult::NotRunYet;
0738 }
0739 else {
0740 LOG_WARN(GetLogger()) << "Input event #" << event->GetEventNumber() << " from port " << port << LOG_END;
0741 }
0742 }
0743 else {
0744 LOG_WARN(GetLogger()) << "No input events" << LOG_END;
0745 }
0746 lock.unlock();
0747
0748 size_t output_count;
0749 JArrow::OutputData outputs;
0750 JArrow::FireResult result = JArrow::FireResult::NotRunYet;
0751
0752 LOG_WARN(GetLogger()) << "Firing arrow" << LOG_END;
0753 arrow->fire(event, outputs, output_count, result);
0754 LOG_WARN(GetLogger()) << "Fired arrow with result " << to_string(result) << LOG_END;
0755 if (output_count == 0) {
0756 LOG_WARN(GetLogger()) << "No output events" << LOG_END;
0757 }
0758 else {
0759 for (size_t i=0; i<output_count; ++i) {
0760 LOG_WARN(GetLogger()) << "Output event #" << outputs.at(i).first->GetEventNumber() << " on port " << outputs.at(i).second << LOG_END;
0761 }
0762 }
0763
0764 lock.lock();
0765 arrow->push(outputs, output_count, location_id);
0766 arrow_state.active_tasks -= 1;
0767 lock.unlock();
0768 return result;
0769 }
0770
0771
0772 void JExecutionEngine::HandleSIGINT() {
0773 InterruptStatus status = m_interrupt_status;
0774 std::cout << std::endl;
0775 switch (status) {
0776 case InterruptStatus::NoInterruptsSupervised: m_interrupt_status = InterruptStatus::InspectRequested; break;
0777 case InterruptStatus::InspectRequested: m_interrupt_status = InterruptStatus::PauseAndQuit; break;
0778 case InterruptStatus::NoInterruptsUnsupervised:
0779 case InterruptStatus::PauseAndQuit:
0780 case InterruptStatus::InspectInProgress:
0781 _exit(-2);
0782 }
0783 }
0784
0785 void JExecutionEngine::HandleSIGUSR1() {
0786 m_send_worker_report_requested = true;
0787 }
0788
0789 void JExecutionEngine::HandleSIGUSR2() {
0790 if (jana2_worker_backtrace != nullptr) {
0791 jana2_worker_backtrace->Capture(3);
0792 }
0793 }
0794
0795 void JExecutionEngine::HandleSIGTSTP() {
0796 std::cout << std::endl;
0797 m_print_worker_report_requested = true;
0798 }
0799
0800 void JExecutionEngine::PrintWorkerReport(bool send_to_pipe) {
0801
0802 std::unique_lock<std::mutex> lock(m_mutex);
0803 LOG_INFO(GetLogger()) << "Generating worker report. It may take some time to retrieve each symbol's debug information." << LOG_END;
0804 for (auto& worker: m_worker_states) {
0805 worker->backtrace.Reset();
0806 pthread_kill(worker->thread->native_handle(), SIGUSR2);
0807 }
0808 for (auto& worker: m_worker_states) {
0809 worker->backtrace.WaitForCapture();
0810 }
0811 std::ostringstream oss;
0812 oss << "Worker report" << std::endl;
0813 for (auto& worker: m_worker_states) {
0814 oss << "------------------------------" << std::endl
0815 << " Worker: " << worker->worker_id << std::endl
0816 << " Current arrow: " << worker->last_arrow_id << std::endl
0817 << " Current event: " << worker->last_event_nr << std::endl
0818 << " Backtrace:" << std::endl << std::endl
0819 << worker->backtrace.ToString();
0820 }
0821 auto s = oss.str();
0822 LOG_WARN(GetLogger()) << s << LOG_END;
0823
0824 if (send_to_pipe) {
0825
0826 int fd = open(m_path_to_named_pipe.c_str(), O_WRONLY);
0827 if (fd >= 0) {
0828 write(fd, s.c_str(), s.length()+1);
0829 close(fd);
0830 }
0831 else {
0832 LOG_ERROR(GetLogger()) << "Unable to open named pipe '" << m_path_to_named_pipe << "' for writing. \n"
0833 << " You can use a different named pipe for status info by setting the parameter `jana:status_fname`.\n"
0834 << " The status report will still show up in the log." << LOG_END;
0835 }
0836 }
0837 }
0838
0839
0840 std::string ToString(JExecutionEngine::RunStatus runstatus) {
0841 switch(runstatus) {
0842 case JExecutionEngine::RunStatus::Running: return "Running";
0843 case JExecutionEngine::RunStatus::Paused: return "Paused";
0844 case JExecutionEngine::RunStatus::Failed: return "Failed";
0845 case JExecutionEngine::RunStatus::Pausing: return "Pausing";
0846 case JExecutionEngine::RunStatus::Draining: return "Draining";
0847 case JExecutionEngine::RunStatus::Finished: return "Finished";
0848 default: return "CorruptedRunStatus";
0849 }
0850 }
0851