File indexing completed on 2026-05-27 07:36:29
0001
0002 #include "JExecutionEngine.h"
0003 #include <JANA/Utils/JApplicationInspector.h>
0004
0005 #include <chrono>
0006 #include <cstddef>
0007 #include <cstdio>
0008 #include <ctime>
0009 #include <exception>
0010 #include <mutex>
0011 #include <csignal>
0012 #include <ostream>
0013 #include <sstream>
0014 #include <sys/stat.h>
0015 #include <fcntl.h>
0016 #include <unistd.h>
0017
0018 thread_local int jana2_worker_id = -1;
0019 thread_local JBacktrace* jana2_worker_backtrace = nullptr;
0020
0021 void JExecutionEngine::Init() {
0022 auto params = GetApplication()->GetJParameterManager();
0023
0024 params->SetDefaultParameter("jana:timeout", m_timeout_s,
0025 "Max time (in seconds) JANA will wait for a thread to update its heartbeat before hard-exiting. 0 to disable timeout completely.");
0026
0027 params->SetDefaultParameter("jana:warmup_timeout", m_warmup_timeout_s,
0028 "Max time (in seconds) JANA will wait for 'initial' events to complete before hard-exiting.");
0029
0030 params->SetDefaultParameter("jana:backoff_interval", m_backoff_ms,
0031 "Max time (in seconds) JANA will wait for 'initial' events to complete before hard-exiting.");
0032
0033 params->SetDefaultParameter("jana:show_ticker", m_show_ticker, "Controls whether the ticker is visible");
0034
0035 params->SetDefaultParameter("jana:ticker_interval", m_ticker_ms, "Controls the ticker interval (in ms)");
0036
0037 auto p = params->SetDefaultParameter("jana:status_fname", m_path_to_named_pipe,
0038 "Filename of named pipe for retrieving instantaneous status info");
0039
0040 size_t pid = getpid();
0041 mkfifo(m_path_to_named_pipe.c_str(), 0666);
0042
0043 LOG_WARN(GetLogger()) << "To pause processing and inspect, press Ctrl-C." << LOG_END;
0044 LOG_WARN(GetLogger()) << "For a clean shutdown, press Ctrl-C twice." << LOG_END;
0045 LOG_WARN(GetLogger()) << "For a hard shutdown, press Ctrl-C three times." << LOG_END;
0046
0047 if (p->IsDefault()) {
0048 LOG_WARN(GetLogger()) << "For worker status information, press Ctrl-Z, or run `jana-status " << pid << "`" << LOG_END;
0049 }
0050 else {
0051 LOG_WARN(GetLogger()) << "For worker status information, press Ctrl-Z, or run `jana-status " << pid << " " << m_path_to_named_pipe << "`" << LOG_END;
0052 }
0053
0054
0055
0056
0057
0058 for (JArrow* arrow : m_topology->GetArrows()) {
0059
0060 arrow->Initialize();
0061
0062 m_arrow_states.emplace_back();
0063 auto& arrow_state = m_arrow_states.back();
0064 arrow_state.is_source = arrow->IsSource();
0065 arrow_state.is_sink = arrow->IsSink();
0066 arrow_state.is_parallel = arrow->IsParallel();
0067 arrow_state.next_input = arrow->GetNextPortIndex();
0068 }
0069 }
0070
0071 void JExecutionEngine::RequestInspector() {
0072 std::unique_lock<std::mutex> lock(m_mutex);
0073 m_interrupt_status = InterruptStatus::InspectRequested;
0074 }
0075
0076 void JExecutionEngine::RunTopology() {
0077 std::unique_lock<std::mutex> lock(m_mutex);
0078
0079 if (m_runstatus == RunStatus::Failed) {
0080 throw JException("Cannot switch topology runstatus to Running because it is already Failed");
0081 }
0082 if (m_runstatus == RunStatus::Finished) {
0083 throw JException("Cannot switch topology runstatus to Running because it is already Finished");
0084 }
0085 if (m_arrow_states.size() == 0) {
0086 throw JException("Cannot execute an empty topology! Hint: Have you provided an event source?");
0087 }
0088
0089
0090 m_time_at_start = clock_t::now();
0091 m_event_count_at_start = m_event_count_at_finish;
0092
0093
0094 for (auto& arrow: m_arrow_states) {
0095 if (arrow.status == ArrowState::Status::Paused) {
0096 arrow.status = ArrowState::Status::Running;
0097 }
0098 }
0099
0100 m_runstatus = RunStatus::Running;
0101
0102 lock.unlock();
0103 m_condvar.notify_one();
0104 }
0105
0106 void JExecutionEngine::ScaleWorkers(size_t nthreads) {
0107
0108
0109
0110
0111
0112
0113
0114
0115 std::unique_lock<std::mutex> lock(m_mutex);
0116
0117 if (nthreads != 0 && m_arrow_states.size() == 0) {
0118
0119
0120 throw JException("Cannot execute an empty topology! Hint: Have you provided an event source?");
0121 }
0122
0123 auto prev_nthreads = m_worker_states.size();
0124
0125 if (prev_nthreads < nthreads) {
0126
0127 LOG_DEBUG(GetLogger()) << "Scaling up to " << nthreads << " worker threads" << LOG_END;
0128 auto mapping = m_topology->GetProcessorMapping();
0129 for (size_t worker_id=prev_nthreads; worker_id < nthreads; ++worker_id) {
0130 auto worker = std::make_unique<WorkerState>();
0131 worker->worker_id = worker_id;
0132 worker->is_stop_requested = false;
0133 worker->cpu_id = mapping.get_cpu_id(worker_id);
0134 worker->location_id = mapping.get_loc_id(worker_id);
0135 worker->thread = new std::thread(&JExecutionEngine::RunWorker, this, Worker{worker_id, &worker->backtrace});
0136 LOG_DEBUG(GetLogger()) << "Launching worker thread " << worker_id << " on cpu=" << worker->cpu_id << ", location=" << worker->location_id << LOG_END;
0137 m_worker_states.push_back(std::move(worker));
0138
0139 bool pin_to_cpu = (mapping.get_affinity() != JProcessorMapping::AffinityStrategy::None);
0140 if (pin_to_cpu) {
0141 JCpuInfo::PinThreadToCpu(worker->thread, worker->cpu_id);
0142 }
0143 }
0144 }
0145
0146 else if (prev_nthreads > nthreads) {
0147
0148 LOG_DEBUG(GetLogger()) << "Scaling down to " << nthreads << " worker threads" << LOG_END;
0149
0150
0151 for (int worker_id=prev_nthreads-1; worker_id >= (int)nthreads; --worker_id) {
0152 LOG_DEBUG(GetLogger()) << "Stopping worker " << worker_id << LOG_END;
0153 m_worker_states[worker_id]->is_stop_requested = true;
0154 }
0155 m_condvar.notify_all();
0156 lock.unlock();
0157
0158
0159 for (int worker_id=prev_nthreads-1; worker_id >= (int) nthreads; --worker_id) {
0160 if (m_worker_states[worker_id]->thread != nullptr) {
0161 if (m_worker_states[worker_id]->is_timed_out) {
0162
0163
0164
0165 m_worker_states[worker_id]->thread->detach();
0166 LOG_DEBUG(GetLogger()) << "Detached worker " << worker_id << LOG_END;
0167 }
0168 else {
0169 LOG_DEBUG(GetLogger()) << "Joining worker " << worker_id << LOG_END;
0170 m_worker_states[worker_id]->thread->join();
0171 LOG_DEBUG(GetLogger()) << "Joined worker " << worker_id << LOG_END;
0172 }
0173 }
0174 else {
0175 LOG_DEBUG(GetLogger()) << "Skipping worker " << worker_id << LOG_END;
0176 }
0177 }
0178
0179 lock.lock();
0180
0181 for (int worker_id=prev_nthreads-1; worker_id >= (int)nthreads; --worker_id) {
0182 if (m_worker_states.back()->thread != nullptr) {
0183 delete m_worker_states.back()->thread;
0184 }
0185 m_worker_states.pop_back();
0186 }
0187 }
0188 }
0189
0190 void JExecutionEngine::PauseTopology() {
0191 std::unique_lock<std::mutex> lock(m_mutex);
0192 if (m_runstatus != RunStatus::Running) return;
0193 m_runstatus = RunStatus::Pausing;
0194 for (auto& arrow: m_arrow_states) {
0195 if (arrow.status == ArrowState::Status::Running) {
0196 arrow.status = ArrowState::Status::Paused;
0197 }
0198 }
0199 LOG_WARN(GetLogger()) << "Requested pause" << LOG_END;
0200 lock.unlock();
0201 m_condvar.notify_all();
0202 }
0203
0204 void JExecutionEngine::DrainTopology() {
0205 std::unique_lock<std::mutex> lock(m_mutex);
0206 if (m_runstatus != RunStatus::Running) return;
0207 m_runstatus = RunStatus::Draining;
0208 for (auto& arrow: m_arrow_states) {
0209 if (arrow.is_source) {
0210 if (arrow.status == ArrowState::Status::Running) {
0211 arrow.status = ArrowState::Status::Paused;
0212 }
0213 }
0214 }
0215 LOG_WARN(GetLogger()) << "Requested drain" << LOG_END;
0216 lock.unlock();
0217 m_condvar.notify_all();
0218 }
0219
0220 void JExecutionEngine::RunSupervisor() {
0221
0222 if (m_interrupt_status == InterruptStatus::NoInterruptsUnsupervised) {
0223 m_interrupt_status = InterruptStatus::NoInterruptsSupervised;
0224 }
0225 size_t last_event_count = 0;
0226 clock_t::time_point last_measurement_time = clock_t::now();
0227
0228 Perf perf;
0229 while (true) {
0230
0231 if (m_enable_timeout && m_timeout_s > 0) {
0232 CheckTimeout();
0233 }
0234
0235 if (m_print_worker_report_requested) {
0236 PrintWorkerReport(false);
0237 m_print_worker_report_requested = false;
0238 }
0239
0240 if (m_send_worker_report_requested) {
0241 PrintWorkerReport(true);
0242 m_send_worker_report_requested = false;
0243 }
0244
0245 perf = GetPerf();
0246 if ((perf.runstatus == RunStatus::Paused && m_interrupt_status != InterruptStatus::InspectRequested) ||
0247 perf.runstatus == RunStatus::Finished ||
0248 perf.runstatus == RunStatus::Failed) {
0249 break;
0250 }
0251
0252 if (m_interrupt_status == InterruptStatus::InspectRequested) {
0253 if (perf.runstatus == RunStatus::Paused) {
0254 LOG_INFO(GetLogger()) << "Entering inspector" << LOG_END;
0255 m_enable_timeout = false;
0256 m_interrupt_status = InterruptStatus::InspectInProgress;
0257 InspectApplication(GetApplication());
0258 m_interrupt_status = InterruptStatus::NoInterruptsSupervised;
0259
0260
0261 last_measurement_time = clock_t::now();
0262 last_event_count = 0;
0263 continue;
0264 }
0265 else if (perf.runstatus == RunStatus::Running) {
0266 PauseTopology();
0267 }
0268 }
0269 else if (m_interrupt_status == InterruptStatus::PauseAndQuit) {
0270 PauseTopology();
0271 }
0272
0273 if (m_show_ticker) {
0274 auto next_measurement_time = clock_t::now();
0275 auto last_measurement_duration_ms = std::chrono::duration_cast<std::chrono::milliseconds>(next_measurement_time - last_measurement_time).count();
0276 float latest_throughput_hz = (last_measurement_duration_ms == 0) ? 0 : (perf.event_count - last_event_count) * 1000.0 / last_measurement_duration_ms;
0277 last_measurement_time = next_measurement_time;
0278 last_event_count = perf.event_count;
0279
0280
0281 LOG_INFO(m_logger) << "Status: " << perf.event_count << " events processed at "
0282 << JTypeInfo::to_string_with_si_prefix(latest_throughput_hz) << "Hz ("
0283 << JTypeInfo::to_string_with_si_prefix(perf.throughput_hz) << "Hz avg)" << LOG_END;
0284 }
0285
0286 std::this_thread::sleep_for(std::chrono::milliseconds(m_ticker_ms));
0287 }
0288 LOG_INFO(GetLogger()) << "Processing paused." << LOG_END;
0289
0290 if (perf.runstatus == RunStatus::Failed) {
0291 HandleFailures();
0292 }
0293
0294 PrintFinalReport();
0295 }
0296
0297 bool JExecutionEngine::CheckTimeout() {
0298 std::unique_lock<std::mutex> lock(m_mutex);
0299 auto now = clock_t::now();
0300 bool timeout_detected = false;
0301 for (auto& worker: m_worker_states) {
0302 auto timeout_s = (worker->is_event_warmed_up) ? m_timeout_s : m_warmup_timeout_s;
0303 auto duration_s = std::chrono::duration_cast<std::chrono::seconds>(now - worker->last_checkout_time).count();
0304 if (duration_s > timeout_s && worker->last_arrow_id != static_cast<uint64_t>(-1)) {
0305 worker->is_timed_out = true;
0306 timeout_detected = true;
0307 m_runstatus = RunStatus::Failed;
0308 }
0309 }
0310 return timeout_detected;
0311 }
0312
0313 void JExecutionEngine::HandleFailures() {
0314
0315 std::unique_lock<std::mutex> lock(m_mutex);
0316
0317
0318 for (auto& worker: m_worker_states) {
0319 if (worker->is_timed_out) {
0320 std::string arrow_name = (worker->last_arrow_id == static_cast<uint64_t>(-1)) ? "(none)" : m_topology->GetArrows()[worker->last_arrow_id]->GetName();
0321 LOG_FATAL(GetLogger()) << "Timeout in worker thread " << worker->worker_id << " while executing " << arrow_name << " on event #" << worker->last_event_nr << LOG_END;
0322 pthread_kill(worker->thread->native_handle(), SIGUSR2);
0323 LOG_INFO(GetLogger()) << "Worker thread signalled; waiting for backtrace capture." << LOG_END;
0324 worker->backtrace.WaitForCapture();
0325 }
0326 if (worker->stored_exception != nullptr) {
0327 std::string arrow_name = (worker->last_arrow_id == static_cast<uint64_t>(-1)) ? "(none)" : m_topology->GetArrows()[worker->last_arrow_id]->GetName();
0328 LOG_FATAL(GetLogger()) << "Exception in worker thread " << worker->worker_id << " while executing " << arrow_name << " on event #" << worker->last_event_nr << LOG_END;
0329 }
0330 }
0331
0332
0333
0334 for (auto& worker: m_worker_states) {
0335 if (worker->stored_exception != nullptr) {
0336 GetApplication()->SetExitCode((int) JApplication::ExitCode::UnhandledException);
0337 std::rethrow_exception(worker->stored_exception);
0338 }
0339 if (worker->is_timed_out) {
0340 GetApplication()->SetExitCode((int) JApplication::ExitCode::Timeout);
0341 auto ex = JException("Timeout in worker thread");
0342 ex.backtrace = worker->backtrace;
0343 throw ex;
0344 }
0345 }
0346 }
0347
0348 void JExecutionEngine::FinishTopology() {
0349 std::unique_lock<std::mutex> lock(m_mutex);
0350 assert(m_runstatus == RunStatus::Paused);
0351
0352 LOG_DEBUG(GetLogger()) << "Finishing processing..." << LOG_END;
0353 for (auto* arrow : m_topology->GetArrows()) {
0354 arrow->Finalize();
0355 }
0356 for (auto* pool: m_topology->GetPools()) {
0357 pool->Finalize();
0358 }
0359 m_runstatus = RunStatus::Finished;
0360 LOG_INFO(GetLogger()) << "Finished processing." << LOG_END;
0361 }
0362
0363 JExecutionEngine::RunStatus JExecutionEngine::GetRunStatus() {
0364 std::unique_lock<std::mutex> lock(m_mutex);
0365 return m_runstatus;
0366 }
0367
0368 JExecutionEngine::Perf JExecutionEngine::GetPerf() {
0369 std::unique_lock<std::mutex> lock(m_mutex);
0370 Perf result;
0371 if (m_runstatus == RunStatus::Paused || m_runstatus == RunStatus::Failed) {
0372 result.event_count = m_event_count_at_finish - m_event_count_at_start;
0373 result.uptime_ms = std::chrono::duration_cast<std::chrono::milliseconds>(m_time_at_finish - m_time_at_start).count();
0374 }
0375 else {
0376
0377 size_t current_event_count = 0;
0378 for (auto& state : m_arrow_states) {
0379 if (state.is_sink) {
0380 current_event_count += state.events_processed;
0381 }
0382 }
0383 result.event_count = current_event_count - m_event_count_at_start;
0384 result.uptime_ms = std::chrono::duration_cast<std::chrono::milliseconds>(clock_t::now() - m_time_at_start).count();
0385 }
0386 result.runstatus = m_runstatus;
0387 result.thread_count = m_worker_states.size();
0388 result.throughput_hz = (result.uptime_ms == 0) ? 0 : (result.event_count * 1000.0) / result.uptime_ms;
0389 result.event_level = JEventLevel::PhysicsEvent;
0390 return result;
0391 }
0392
0393 JExecutionEngine::Worker JExecutionEngine::RegisterWorker() {
0394 std::unique_lock<std::mutex> lock(m_mutex);
0395 auto mapping = m_topology->GetProcessorMapping();
0396 auto worker_id = m_worker_states.size();
0397 auto worker = std::make_unique<WorkerState>();
0398 worker->worker_id = worker_id;
0399 worker->is_stop_requested = false;
0400 worker->cpu_id = mapping.get_cpu_id(worker_id);
0401 worker->location_id = mapping.get_loc_id(worker_id);
0402 worker->thread = nullptr;
0403 m_worker_states.push_back(std::move(worker));
0404
0405 bool pin_to_cpu = (mapping.get_affinity() != JProcessorMapping::AffinityStrategy::None);
0406 if (pin_to_cpu) {
0407 JCpuInfo::PinThreadToCpu(worker->thread, worker->cpu_id);
0408 }
0409 return {worker_id, &worker->backtrace};
0410
0411 }
0412
0413
0414 void JExecutionEngine::RunWorker(Worker worker) {
0415
0416 LOG_DEBUG(GetLogger()) << "Launched worker thread " << worker.worker_id << LOG_END;
0417 jana2_worker_id = worker.worker_id;
0418 jana2_worker_backtrace = worker.backtrace;
0419 try {
0420 Task task;
0421 while (true) {
0422 ExchangeTask(task, worker.worker_id);
0423 if (task.arrow == nullptr) break;
0424 task.arrow->Fire(task.input_event, task.outputs, task.output_count, task.status);
0425 }
0426 LOG_DEBUG(GetLogger()) << "Stopped worker thread " << worker.worker_id << LOG_END;
0427 }
0428 catch(JException& ex) {
0429 LOG_ERROR(GetLogger()) << "Exception on worker thread " << worker.worker_id << ": " << ex.GetMessage();
0430 std::unique_lock<std::mutex> lock(m_mutex);
0431 m_runstatus = RunStatus::Failed;
0432 m_worker_states.at(worker.worker_id)->stored_exception = std::current_exception();
0433 }
0434 catch (...) {
0435 LOG_ERROR(GetLogger()) << "Exception on worker thread " << worker.worker_id << LOG_END;
0436 std::unique_lock<std::mutex> lock(m_mutex);
0437 m_runstatus = RunStatus::Failed;
0438 m_worker_states.at(worker.worker_id)->stored_exception = std::current_exception();
0439 }
0440 }
0441
0442
0443 void JExecutionEngine::ExchangeTask(Task& task, size_t worker_id, bool nonblocking) {
0444
0445 auto checkin_time = std::chrono::steady_clock::now();
0446
0447
0448
0449 std::unique_lock<std::mutex> lock(m_mutex);
0450
0451 auto& worker = *m_worker_states.at(worker_id);
0452
0453 if (task.arrow != nullptr) {
0454 CheckinCompletedTask_Unsafe(task, worker, checkin_time);
0455 }
0456
0457 if (worker.is_stop_requested) {
0458 return;
0459 }
0460
0461 FindNextReadyTask_Unsafe(task, worker);
0462
0463 if (nonblocking) { return; }
0464 auto idle_time_start = clock_t::now();
0465 m_total_scheduler_duration += (idle_time_start - checkin_time);
0466
0467 while (task.arrow == nullptr && !worker.is_stop_requested) {
0468 m_condvar.wait(lock);
0469 FindNextReadyTask_Unsafe(task, worker);
0470 }
0471 worker.last_checkout_time = clock_t::now();
0472
0473 if (task.input_event != nullptr) {
0474 worker.last_event_nr = task.input_event->GetEventNumber();
0475 }
0476 else {
0477 worker.last_event_nr = 0;
0478 }
0479 m_total_idle_duration += (worker.last_checkout_time - idle_time_start);
0480
0481
0482
0483
0484 m_condvar.notify_one();
0485 }
0486
0487
0488 void JExecutionEngine::CheckinCompletedTask_Unsafe(Task& task, WorkerState& worker, clock_t::time_point checkin_time) {
0489
0490 auto processing_duration = checkin_time - worker.last_checkout_time;
0491
0492 ArrowState& arrow_state = m_arrow_states.at(worker.last_arrow_id);
0493
0494 arrow_state.active_tasks -= 1;
0495 arrow_state.total_processing_duration += processing_duration;
0496
0497 for (size_t output=0; output<task.output_count; ++output) {
0498 if (!task.arrow->GetPort(task.outputs[output].second).GetSkipFinishEvent()) {
0499 arrow_state.events_processed++;
0500 }
0501 }
0502
0503
0504 task.arrow->Push(task.outputs, task.output_count, worker.location_id);
0505
0506 if (task.status == JArrow::FireResult::Finished) {
0507
0508
0509
0510
0511 arrow_state.status = ArrowState::Status::Finished;
0512
0513
0514 if (m_runstatus == RunStatus::Running) {
0515 bool draining = true;
0516 for (auto& arrow: m_arrow_states) {
0517 if (arrow.is_source && arrow.status == ArrowState::Status::Running) {
0518 draining = false;
0519 }
0520 }
0521 if (draining) {
0522 m_runstatus = RunStatus::Draining;
0523 }
0524 }
0525 }
0526 worker.last_arrow_id = -1;
0527 worker.last_event_nr = 0;
0528
0529 task.arrow = nullptr;
0530 task.input_event = nullptr;
0531 task.output_count = 0;
0532 task.status = JArrow::FireResult::NotRunYet;
0533 };
0534
0535
0536 void JExecutionEngine::FindNextReadyTask_Unsafe(Task& task, WorkerState& worker) {
0537
0538 if (m_runstatus == RunStatus::Running || m_runstatus == RunStatus::Draining) {
0539
0540
0541
0542 size_t arrow_count = m_arrow_states.size();
0543 m_next_arrow_id += 1;
0544 m_next_arrow_id %= arrow_count;
0545
0546 for (size_t i=m_next_arrow_id; i<(m_next_arrow_id+arrow_count); ++i) {
0547 size_t arrow_id = i % arrow_count;
0548
0549 auto& state = m_arrow_states[arrow_id];
0550 if (!state.is_parallel && (state.active_tasks != 0)) {
0551
0552 LOG_TRACE(GetLogger()) << "Scheduler: Arrow with id " << arrow_id << " is unready: Sequential and already active." << LOG_END;
0553 continue;
0554 }
0555
0556 if (state.status != ArrowState::Status::Running) {
0557 LOG_TRACE(GetLogger()) << "Scheduler: Arrow with id " << arrow_id << " is unready: Arrow is either paused or finished." << LOG_END;
0558 continue;
0559 }
0560
0561
0562
0563 JArrow* arrow = m_topology->GetArrows()[arrow_id];
0564
0565 auto port = arrow->GetNextPortIndex();
0566 JEvent* event = (port == -1) ? nullptr : arrow->Pull(port, worker.location_id);
0567 if (event != nullptr || port == -1) {
0568 LOG_TRACE(GetLogger()) << "Scheduler: Found next ready arrow with id " << arrow_id << LOG_END;
0569
0570 state.active_tasks += 1;
0571
0572 task.arrow = arrow;
0573 task.input_port = port;
0574 task.input_event = event;
0575 task.output_count = 0;
0576 task.status = JArrow::FireResult::NotRunYet;
0577
0578 worker.last_arrow_id = arrow_id;
0579 if (event != nullptr) {
0580 worker.is_event_warmed_up = event->IsWarmedUp();
0581 worker.last_event_nr = event->GetEventNumber();
0582 }
0583 else {
0584 worker.is_event_warmed_up = true;
0585 worker.last_event_nr = 0;
0586 }
0587 return;
0588 }
0589 else {
0590 LOG_TRACE(GetLogger()) << "Scheduler: Arrow with id " << arrow_id << " is unready: Input event is needed but not on queue yet." << LOG_END;
0591 }
0592 }
0593 }
0594
0595
0596
0597
0598
0599
0600 if (m_runstatus == RunStatus::Running || m_runstatus == RunStatus::Pausing || m_runstatus == RunStatus::Draining) {
0601
0602
0603
0604 bool any_active_source_found = false;
0605 bool any_active_task_found = false;
0606
0607 LOG_DEBUG(GetLogger()) << "Scheduler: No tasks ready" << LOG_END;
0608
0609 for (size_t arrow_id = 0; arrow_id < m_arrow_states.size(); ++arrow_id) {
0610 auto& state = m_arrow_states[arrow_id];
0611 any_active_source_found |= (state.status == ArrowState::Status::Running && state.is_source);
0612 any_active_task_found |= (state.active_tasks != 0);
0613
0614 }
0615
0616 if (!any_active_source_found && !any_active_task_found) {
0617
0618 m_time_at_finish = clock_t::now();
0619 m_event_count_at_finish = 0;
0620 for (auto& arrow_state : m_arrow_states) {
0621 if (arrow_state.is_sink) {
0622 m_event_count_at_finish += arrow_state.events_processed;
0623 }
0624 }
0625 LOG_DEBUG(GetLogger()) << "Scheduler: Processing paused" << LOG_END;
0626 m_runstatus = RunStatus::Paused;
0627
0628 }
0629 }
0630
0631 worker.last_arrow_id = -1;
0632
0633 task.arrow = nullptr;
0634 task.input_port = -1;
0635 task.input_event = nullptr;
0636 task.output_count = 0;
0637 task.status = JArrow::FireResult::NotRunYet;
0638 }
0639
0640
0641 void JExecutionEngine::PrintFinalReport() {
0642
0643 std::unique_lock<std::mutex> lock(m_mutex);
0644 auto event_count = m_event_count_at_finish - m_event_count_at_start;
0645 auto uptime_ms = std::chrono::duration_cast<std::chrono::milliseconds>(m_time_at_finish - m_time_at_start).count();
0646 auto thread_count = m_worker_states.size();
0647 auto throughput_hz = (event_count * 1000.0) / uptime_ms;
0648
0649 LOG_INFO(GetLogger()) << "Detailed report:" << LOG_END;
0650 LOG_INFO(GetLogger()) << LOG_END;
0651 LOG_INFO(GetLogger()) << " Avg throughput [Hz]: " << std::setprecision(3) << throughput_hz << LOG_END;
0652 LOG_INFO(GetLogger()) << " Completed events [count]: " << event_count << LOG_END;
0653 LOG_INFO(GetLogger()) << " Total uptime [s]: " << std::setprecision(4) << uptime_ms/1000.0 << LOG_END;
0654 LOG_INFO(GetLogger()) << " Thread team size [count]: " << thread_count << LOG_END;
0655 LOG_INFO(GetLogger()) << LOG_END;
0656 LOG_INFO(GetLogger()) << " Arrow-level metrics:" << LOG_END;
0657 LOG_INFO(GetLogger()) << LOG_END;
0658
0659 size_t total_useful_ms = 0;
0660
0661 for (size_t arrow_id=0; arrow_id < m_arrow_states.size(); ++arrow_id) {
0662 auto* arrow = m_topology->GetArrows()[arrow_id];
0663 auto& arrow_state = m_arrow_states[arrow_id];
0664 auto useful_ms = std::chrono::duration_cast<std::chrono::milliseconds>(arrow_state.total_processing_duration).count();
0665 total_useful_ms += useful_ms;
0666 auto avg_latency = useful_ms*1.0/arrow_state.events_processed;
0667 auto throughput_bottleneck = 1000.0 / avg_latency;
0668 if (arrow->IsParallel()) {
0669 throughput_bottleneck *= thread_count;
0670 }
0671
0672 LOG_INFO(GetLogger()) << " - Arrow name: " << arrow->GetName() << LOG_END;
0673 LOG_INFO(GetLogger()) << " Parallel: " << arrow->IsParallel() << LOG_END;
0674 LOG_INFO(GetLogger()) << " Events completed: " << arrow_state.events_processed << LOG_END;
0675 LOG_INFO(GetLogger()) << " Avg latency [ms/event]: " << avg_latency << LOG_END;
0676 LOG_INFO(GetLogger()) << " Throughput bottleneck [Hz]: " << throughput_bottleneck << LOG_END;
0677 LOG_INFO(GetLogger()) << LOG_END;
0678 }
0679
0680 auto total_scheduler_ms = std::chrono::duration_cast<std::chrono::milliseconds>(m_total_scheduler_duration).count();
0681 auto total_idle_ms = std::chrono::duration_cast<std::chrono::milliseconds>(m_total_idle_duration).count();
0682
0683 LOG_INFO(GetLogger()) << " Total useful time [s]: " << std::setprecision(6) << total_useful_ms/1000.0 << LOG_END;
0684 LOG_INFO(GetLogger()) << " Total scheduler time [s]: " << std::setprecision(6) << total_scheduler_ms/1000.0 << LOG_END;
0685 LOG_INFO(GetLogger()) << " Total idle time [s]: " << std::setprecision(6) << total_idle_ms/1000.0 << LOG_END;
0686
0687 LOG_INFO(GetLogger()) << LOG_END;
0688
0689 LOG_INFO(GetLogger()) << "Final report: " << event_count << " events processed at "
0690 << JTypeInfo::to_string_with_si_prefix(throughput_hz) << "Hz" << LOG_END;
0691
0692 }
0693
0694 void JExecutionEngine::SetTickerEnabled(bool show_ticker) {
0695 m_show_ticker = show_ticker;
0696 }
0697
0698 bool JExecutionEngine::IsTickerEnabled() const {
0699 return m_show_ticker;
0700 }
0701
0702 void JExecutionEngine::SetTimeoutEnabled(bool timeout_enabled) {
0703 m_enable_timeout = timeout_enabled;
0704 }
0705
0706 bool JExecutionEngine::IsTimeoutEnabled() const {
0707 return m_enable_timeout;
0708 }
0709
0710 JArrow::FireResult JExecutionEngine::Fire(size_t arrow_id, size_t location_id) {
0711
0712 std::unique_lock<std::mutex> lock(m_mutex);
0713 if (arrow_id >= m_topology->GetArrows().size()) {
0714 LOG_WARN(GetLogger()) << "Firing unsuccessful: No arrow exists with id=" << arrow_id << LOG_END;
0715 return JArrow::FireResult::NotRunYet;
0716 }
0717 JArrow* arrow = m_topology->GetArrows()[arrow_id];
0718 LOG_WARN(GetLogger()) << "Attempting to fire arrow with name=" << arrow->GetName()
0719 << ", index=" << arrow_id << ", location=" << location_id << LOG_END;
0720
0721 ArrowState& arrow_state = m_arrow_states[arrow_id];
0722 if (arrow_state.status == ArrowState::Status::Finished) {
0723 LOG_WARN(GetLogger()) << "Firing unsuccessful: Arrow status is Finished." << arrow_id << LOG_END;
0724 return JArrow::FireResult::Finished;
0725 }
0726 if (!arrow_state.is_parallel && arrow_state.active_tasks != 0) {
0727 LOG_WARN(GetLogger()) << "Firing unsuccessful: Arrow is sequential and already has an active task." << arrow_id << LOG_END;
0728 return JArrow::FireResult::NotRunYet;
0729 }
0730 arrow_state.active_tasks += 1;
0731
0732 auto port = arrow->GetNextPortIndex();
0733 JEvent* event = nullptr;
0734 if (port != -1) {
0735 event = arrow->Pull(port, location_id);
0736 if (event == nullptr) {
0737 LOG_WARN(GetLogger()) << "Firing unsuccessful: Arrow needs an input event from port " << port << ", but the queue or pool is empty." << LOG_END;
0738 arrow_state.active_tasks -= 1;
0739 return JArrow::FireResult::NotRunYet;
0740 }
0741 else {
0742 LOG_WARN(GetLogger()) << "Input event #" << event->GetEventNumber() << " from port " << port << LOG_END;
0743 }
0744 }
0745 else {
0746 LOG_WARN(GetLogger()) << "No input events" << LOG_END;
0747 }
0748 lock.unlock();
0749
0750 size_t output_count;
0751 JArrow::OutputData outputs;
0752 JArrow::FireResult result = JArrow::FireResult::NotRunYet;
0753
0754 LOG_WARN(GetLogger()) << "Firing arrow" << LOG_END;
0755 arrow->Fire(event, outputs, output_count, result);
0756 LOG_WARN(GetLogger()) << "Fired arrow with result " << ToString(result) << LOG_END;
0757 if (output_count == 0) {
0758 LOG_WARN(GetLogger()) << "No output events" << LOG_END;
0759 }
0760 else {
0761 for (size_t i=0; i<output_count; ++i) {
0762 LOG_WARN(GetLogger()) << "Output event #" << outputs.at(i).first->GetEventNumber() << " on port " << outputs.at(i).second << LOG_END;
0763 }
0764 }
0765
0766 lock.lock();
0767 arrow->Push(outputs, output_count, location_id);
0768 arrow_state.active_tasks -= 1;
0769 lock.unlock();
0770 return result;
0771 }
0772
0773
0774 void JExecutionEngine::HandleSIGINT() {
0775 InterruptStatus status = m_interrupt_status;
0776 std::cout << std::endl;
0777 switch (status) {
0778 case InterruptStatus::NoInterruptsSupervised: m_interrupt_status = InterruptStatus::InspectRequested; break;
0779 case InterruptStatus::InspectRequested: m_interrupt_status = InterruptStatus::PauseAndQuit; break;
0780 case InterruptStatus::NoInterruptsUnsupervised:
0781 case InterruptStatus::PauseAndQuit:
0782 case InterruptStatus::InspectInProgress:
0783 _exit(-2);
0784 }
0785 }
0786
0787 void JExecutionEngine::HandleSIGUSR1() {
0788 m_send_worker_report_requested = true;
0789 }
0790
0791 void JExecutionEngine::HandleSIGUSR2() {
0792 if (jana2_worker_backtrace != nullptr) {
0793 jana2_worker_backtrace->Capture(3);
0794 }
0795 }
0796
0797 void JExecutionEngine::HandleSIGTSTP() {
0798 std::cout << std::endl;
0799 m_print_worker_report_requested = true;
0800 }
0801
0802 void JExecutionEngine::PrintWorkerReport(bool send_to_pipe) {
0803
0804 std::unique_lock<std::mutex> lock(m_mutex);
0805 LOG_INFO(GetLogger()) << "Generating worker report. It may take some time to retrieve each symbol's debug information." << LOG_END;
0806 for (auto& worker: m_worker_states) {
0807 worker->backtrace.Reset();
0808 pthread_kill(worker->thread->native_handle(), SIGUSR2);
0809 }
0810 for (auto& worker: m_worker_states) {
0811 worker->backtrace.WaitForCapture();
0812 }
0813 std::ostringstream oss;
0814 oss << "Worker report" << std::endl;
0815 for (auto& worker: m_worker_states) {
0816 oss << "------------------------------" << std::endl
0817 << " Worker: " << worker->worker_id << std::endl
0818 << " Current arrow: " << worker->last_arrow_id << std::endl
0819 << " Current event: " << worker->last_event_nr << std::endl
0820 << " Backtrace:" << std::endl << std::endl
0821 << worker->backtrace.ToString();
0822 }
0823 auto s = oss.str();
0824 LOG_WARN(GetLogger()) << s << LOG_END;
0825
0826 if (send_to_pipe) {
0827
0828 int fd = open(m_path_to_named_pipe.c_str(), O_WRONLY);
0829 if (fd >= 0) {
0830 write(fd, s.c_str(), s.length()+1);
0831 close(fd);
0832 }
0833 else {
0834 LOG_ERROR(GetLogger()) << "Unable to open named pipe '" << m_path_to_named_pipe << "' for writing. \n"
0835 << " You can use a different named pipe for status info by setting the parameter `jana:status_fname`.\n"
0836 << " The status report will still show up in the log." << LOG_END;
0837 }
0838 }
0839 }
0840
0841
0842 std::string ToString(JExecutionEngine::RunStatus runstatus) {
0843 switch(runstatus) {
0844 case JExecutionEngine::RunStatus::Running: return "Running";
0845 case JExecutionEngine::RunStatus::Paused: return "Paused";
0846 case JExecutionEngine::RunStatus::Failed: return "Failed";
0847 case JExecutionEngine::RunStatus::Pausing: return "Pausing";
0848 case JExecutionEngine::RunStatus::Draining: return "Draining";
0849 case JExecutionEngine::RunStatus::Finished: return "Finished";
0850 default: return "CorruptedRunStatus";
0851 }
0852 }
0853