File indexing completed on 2025-01-18 10:17:34
0001
0002 #include "JExecutionEngine.h"
0003 #include <JANA/Utils/JApplicationInspector.h>
0004
0005 #include <chrono>
0006 #include <cstddef>
0007 #include <cstdio>
0008 #include <ctime>
0009 #include <exception>
0010 #include <mutex>
0011 #include <csignal>
0012 #include <ostream>
0013 #include <sstream>
0014 #include <sys/stat.h>
0015 #include <fcntl.h>
0016 #include <unistd.h>
0017
0018 thread_local int jana2_worker_id = -1;
0019 thread_local JBacktrace* jana2_worker_backtrace = nullptr;
0020
0021 void JExecutionEngine::Init() {
0022 auto params = GetApplication()->GetJParameterManager();
0023
0024 params->SetDefaultParameter("jana:timeout", m_timeout_s,
0025 "Max time (in seconds) JANA will wait for a thread to update its heartbeat before hard-exiting. 0 to disable timeout completely.");
0026
0027 params->SetDefaultParameter("jana:warmup_timeout", m_warmup_timeout_s,
0028 "Max time (in seconds) JANA will wait for 'initial' events to complete before hard-exiting.");
0029
0030 params->SetDefaultParameter("jana:backoff_interval", m_backoff_ms,
0031 "Max time (in seconds) JANA will wait for 'initial' events to complete before hard-exiting.");
0032
0033 params->SetDefaultParameter("jana:show_ticker", m_show_ticker, "Controls whether the ticker is visible");
0034
0035 params->SetDefaultParameter("jana:ticker_interval", m_ticker_ms, "Controls the ticker interval (in ms)");
0036
0037 auto p = params->SetDefaultParameter("jana:status_fname", m_path_to_named_pipe,
0038 "Filename of named pipe for retrieving instantaneous status info");
0039
0040 size_t pid = getpid();
0041 mkfifo(m_path_to_named_pipe.c_str(), 0666);
0042
0043 LOG_WARN(GetLogger()) << "To pause processing and inspect, press Ctrl-C." << LOG_END;
0044 LOG_WARN(GetLogger()) << "For a clean shutdown, press Ctrl-C twice." << LOG_END;
0045 LOG_WARN(GetLogger()) << "For a hard shutdown, press Ctrl-C three times." << LOG_END;
0046
0047 if (p->IsDefault()) {
0048 LOG_WARN(GetLogger()) << "For worker status information, press Ctrl-Z, or run `jana-status " << pid << "`" << LOG_END;
0049 }
0050 else {
0051 LOG_WARN(GetLogger()) << "For worker status information, press Ctrl-Z, or run `jana-status " << pid << " " << m_path_to_named_pipe << "`" << LOG_END;
0052 }
0053
0054
0055
0056
0057
0058 for (JArrow* arrow : m_topology->arrows) {
0059
0060 arrow->initialize();
0061
0062 m_arrow_states.emplace_back();
0063 auto& arrow_state = m_arrow_states.back();
0064 arrow_state.is_source = arrow->is_source();
0065 arrow_state.is_sink = arrow->is_sink();
0066 arrow_state.is_parallel = arrow->is_parallel();
0067 arrow_state.next_input = arrow->get_next_port_index();
0068 }
0069 }
0070
0071
0072 void JExecutionEngine::RunTopology() {
0073 std::unique_lock<std::mutex> lock(m_mutex);
0074
0075 if (m_runstatus == RunStatus::Failed) {
0076 throw JException("Cannot switch topology runstatus to Running because it is already Failed");
0077 }
0078 if (m_runstatus == RunStatus::Finished) {
0079 throw JException("Cannot switch topology runstatus to Running because it is already Finished");
0080 }
0081
0082
0083 m_time_at_start = clock_t::now();
0084 m_event_count_at_start = m_event_count_at_finish;
0085
0086
0087 for (auto& arrow: m_arrow_states) {
0088 if (arrow.status == ArrowState::Status::Paused) {
0089 arrow.status = ArrowState::Status::Running;
0090 }
0091 }
0092
0093 m_runstatus = RunStatus::Running;
0094
0095 lock.unlock();
0096 m_condvar.notify_one();
0097 }
0098
0099 void JExecutionEngine::ScaleWorkers(size_t nthreads) {
0100
0101
0102
0103
0104
0105
0106
0107
0108 std::unique_lock<std::mutex> lock(m_mutex);
0109
0110 auto prev_nthreads = m_worker_states.size();
0111
0112 if (prev_nthreads < nthreads) {
0113
0114 LOG_DEBUG(GetLogger()) << "Scaling up to " << nthreads << " worker threads" << LOG_END;
0115 for (size_t worker_id=prev_nthreads; worker_id < nthreads; ++worker_id) {
0116 auto worker = std::make_unique<WorkerState>();
0117 worker->worker_id = worker_id;
0118 worker->is_stop_requested = false;
0119 worker->cpu_id = m_topology->mapping.get_cpu_id(worker_id);
0120 worker->location_id = m_topology->mapping.get_loc_id(worker_id);
0121 worker->thread = new std::thread(&JExecutionEngine::RunWorker, this, Worker{worker_id, &worker->backtrace});
0122 LOG_DEBUG(GetLogger()) << "Launching worker thread " << worker_id << " on cpu=" << worker->cpu_id << ", location=" << worker->location_id << LOG_END;
0123 m_worker_states.push_back(std::move(worker));
0124
0125 bool pin_to_cpu = (m_topology->mapping.get_affinity() != JProcessorMapping::AffinityStrategy::None);
0126 if (pin_to_cpu) {
0127 JCpuInfo::PinThreadToCpu(worker->thread, worker->cpu_id);
0128 }
0129 }
0130 }
0131
0132 else if (prev_nthreads > nthreads) {
0133
0134 LOG_DEBUG(GetLogger()) << "Scaling down to " << nthreads << " worker threads" << LOG_END;
0135
0136
0137 for (int worker_id=prev_nthreads-1; worker_id >= (int)nthreads; --worker_id) {
0138 LOG_DEBUG(GetLogger()) << "Stopping worker " << worker_id << LOG_END;
0139 m_worker_states[worker_id]->is_stop_requested = true;
0140 }
0141 m_condvar.notify_all();
0142 lock.unlock();
0143
0144
0145 for (int worker_id=prev_nthreads-1; worker_id >= (int) nthreads; --worker_id) {
0146 if (m_worker_states[worker_id]->thread != nullptr) {
0147 if (m_worker_states[worker_id]->is_timed_out) {
0148
0149
0150
0151 m_worker_states[worker_id]->thread->detach();
0152 LOG_DEBUG(GetLogger()) << "Detached worker " << worker_id << LOG_END;
0153 }
0154 else {
0155 LOG_DEBUG(GetLogger()) << "Joining worker " << worker_id << LOG_END;
0156 m_worker_states[worker_id]->thread->join();
0157 LOG_DEBUG(GetLogger()) << "Joined worker " << worker_id << LOG_END;
0158 }
0159 }
0160 else {
0161 LOG_DEBUG(GetLogger()) << "Skipping worker " << worker_id << LOG_END;
0162 }
0163 }
0164
0165 lock.lock();
0166
0167 for (int worker_id=prev_nthreads-1; worker_id >= (int)nthreads; --worker_id) {
0168 if (m_worker_states.back()->thread != nullptr) {
0169 delete m_worker_states.back()->thread;
0170 }
0171 m_worker_states.pop_back();
0172 }
0173 }
0174 }
0175
0176 void JExecutionEngine::PauseTopology() {
0177 std::unique_lock<std::mutex> lock(m_mutex);
0178 if (m_runstatus != RunStatus::Running) return;
0179 m_runstatus = RunStatus::Pausing;
0180 for (auto& arrow: m_arrow_states) {
0181 if (arrow.status == ArrowState::Status::Running) {
0182 arrow.status = ArrowState::Status::Paused;
0183 }
0184 }
0185 LOG_WARN(GetLogger()) << "Requested pause" << LOG_END;
0186 lock.unlock();
0187 m_condvar.notify_all();
0188 }
0189
0190 void JExecutionEngine::DrainTopology() {
0191 std::unique_lock<std::mutex> lock(m_mutex);
0192 if (m_runstatus != RunStatus::Running) return;
0193 m_runstatus = RunStatus::Draining;
0194 for (auto& arrow: m_arrow_states) {
0195 if (arrow.is_source) {
0196 if (arrow.status == ArrowState::Status::Running) {
0197 arrow.status = ArrowState::Status::Paused;
0198 }
0199 }
0200 }
0201 LOG_WARN(GetLogger()) << "Requested drain" << LOG_END;
0202 lock.unlock();
0203 m_condvar.notify_all();
0204 }
0205
0206 void JExecutionEngine::RunSupervisor() {
0207
0208 m_interrupt_status = InterruptStatus::NoInterruptsSupervised;
0209 size_t last_event_count = 0;
0210 clock_t::time_point last_measurement_time = clock_t::now();
0211
0212 Perf perf;
0213 while (true) {
0214
0215 if (m_enable_timeout && m_timeout_s > 0) {
0216 CheckTimeout();
0217 }
0218
0219 if (m_print_worker_report_requested) {
0220 PrintWorkerReport(false);
0221 m_print_worker_report_requested = false;
0222 }
0223
0224 if (m_send_worker_report_requested) {
0225 PrintWorkerReport(true);
0226 m_send_worker_report_requested = false;
0227 }
0228
0229 perf = GetPerf();
0230 if ((perf.runstatus == RunStatus::Paused && m_interrupt_status != InterruptStatus::InspectRequested) ||
0231 perf.runstatus == RunStatus::Finished ||
0232 perf.runstatus == RunStatus::Failed) {
0233 break;
0234 }
0235
0236 if (m_interrupt_status == InterruptStatus::InspectRequested) {
0237 if (perf.runstatus == RunStatus::Paused) {
0238 LOG_INFO(GetLogger()) << "Entering inspector" << LOG_END;
0239 m_enable_timeout = false;
0240 m_interrupt_status = InterruptStatus::InspectInProgress;
0241 InspectApplication(GetApplication());
0242 m_interrupt_status = InterruptStatus::NoInterruptsSupervised;
0243
0244
0245 last_measurement_time = clock_t::now();
0246 last_event_count = 0;
0247 continue;
0248 }
0249 else if (perf.runstatus == RunStatus::Running) {
0250 PauseTopology();
0251 }
0252 }
0253 else if (m_interrupt_status == InterruptStatus::PauseAndQuit) {
0254 PauseTopology();
0255 }
0256
0257 if (m_show_ticker) {
0258 auto next_measurement_time = clock_t::now();
0259 auto last_measurement_duration_ms = std::chrono::duration_cast<std::chrono::milliseconds>(next_measurement_time - last_measurement_time).count();
0260 float latest_throughput_hz = (last_measurement_duration_ms == 0) ? 0 : (perf.event_count - last_event_count) * 1000.0 / last_measurement_duration_ms;
0261 last_measurement_time = next_measurement_time;
0262 last_event_count = perf.event_count;
0263
0264
0265 LOG_WARN(m_logger) << "Status: " << perf.event_count << " events processed at "
0266 << JTypeInfo::to_string_with_si_prefix(latest_throughput_hz) << "Hz ("
0267 << JTypeInfo::to_string_with_si_prefix(perf.throughput_hz) << "Hz avg)" << LOG_END;
0268 }
0269
0270 std::this_thread::sleep_for(std::chrono::milliseconds(m_ticker_ms));
0271 }
0272 LOG_INFO(GetLogger()) << "Processing paused." << LOG_END;
0273
0274 if (perf.runstatus == RunStatus::Failed) {
0275 HandleFailures();
0276 }
0277
0278 PrintFinalReport();
0279 }
0280
0281 bool JExecutionEngine::CheckTimeout() {
0282 std::unique_lock<std::mutex> lock(m_mutex);
0283 auto now = clock_t::now();
0284 bool timeout_detected = false;
0285 for (auto& worker: m_worker_states) {
0286 auto timeout_s = (worker->is_event_warmed_up) ? m_timeout_s : m_warmup_timeout_s;
0287 auto duration_s = std::chrono::duration_cast<std::chrono::seconds>(now - worker->last_checkout_time).count();
0288 if (duration_s > timeout_s) {
0289 worker->is_timed_out = true;
0290 timeout_detected = true;
0291 m_runstatus = RunStatus::Failed;
0292 }
0293 }
0294 return timeout_detected;
0295 }
0296
0297 void JExecutionEngine::HandleFailures() {
0298
0299 std::unique_lock<std::mutex> lock(m_mutex);
0300
0301
0302 for (auto& worker: m_worker_states) {
0303 const auto& arrow_name = m_topology->arrows[worker->last_arrow_id]->get_name();
0304 if (worker->is_timed_out) {
0305 LOG_FATAL(GetLogger()) << "Timeout in worker thread " << worker->worker_id << " while executing " << arrow_name << " on event #" << worker->last_event_nr << LOG_END;
0306 pthread_kill(worker->thread->native_handle(), SIGUSR2);
0307 LOG_INFO(GetLogger()) << "Worker thread signalled; waiting for backtrace capture." << LOG_END;
0308 worker->backtrace.WaitForCapture();
0309 }
0310 if (worker->stored_exception != nullptr) {
0311 LOG_FATAL(GetLogger()) << "Exception in worker thread " << worker->worker_id << " while executing " << arrow_name << " on event #" << worker->last_event_nr << LOG_END;
0312 }
0313 }
0314
0315
0316
0317 for (auto& worker: m_worker_states) {
0318 if (worker->stored_exception != nullptr) {
0319 GetApplication()->SetExitCode((int) JApplication::ExitCode::UnhandledException);
0320 std::rethrow_exception(worker->stored_exception);
0321 }
0322 if (worker->is_timed_out) {
0323 GetApplication()->SetExitCode((int) JApplication::ExitCode::Timeout);
0324 auto ex = JException("Timeout in worker thread");
0325 ex.backtrace = worker->backtrace;
0326 throw ex;
0327 }
0328 }
0329 }
0330
0331 void JExecutionEngine::FinishTopology() {
0332 std::unique_lock<std::mutex> lock(m_mutex);
0333 assert(m_runstatus == RunStatus::Paused);
0334
0335 LOG_DEBUG(GetLogger()) << "Finishing processing..." << LOG_END;
0336 for (auto* arrow : m_topology->arrows) {
0337 arrow->finalize();
0338 }
0339 for (auto* pool: m_topology->pools) {
0340 pool->Finalize();
0341 }
0342 m_runstatus = RunStatus::Finished;
0343 LOG_INFO(GetLogger()) << "Finished processing." << LOG_END;
0344 }
0345
0346 JExecutionEngine::RunStatus JExecutionEngine::GetRunStatus() {
0347 std::unique_lock<std::mutex> lock(m_mutex);
0348 return m_runstatus;
0349 }
0350
0351 JExecutionEngine::Perf JExecutionEngine::GetPerf() {
0352 std::unique_lock<std::mutex> lock(m_mutex);
0353 Perf result;
0354 if (m_runstatus == RunStatus::Paused || m_runstatus == RunStatus::Failed) {
0355 result.event_count = m_event_count_at_finish - m_event_count_at_start;
0356 result.uptime_ms = std::chrono::duration_cast<std::chrono::milliseconds>(m_time_at_finish - m_time_at_start).count();
0357 }
0358 else {
0359
0360 size_t current_event_count = 0;
0361 for (auto& state : m_arrow_states) {
0362 if (state.is_sink) {
0363 current_event_count += state.events_processed;
0364 }
0365 }
0366 result.event_count = current_event_count - m_event_count_at_start;
0367 result.uptime_ms = std::chrono::duration_cast<std::chrono::milliseconds>(clock_t::now() - m_time_at_start).count();
0368 }
0369 result.runstatus = m_runstatus;
0370 result.thread_count = m_worker_states.size();
0371 result.throughput_hz = (result.uptime_ms == 0) ? 0 : (result.event_count * 1000.0) / result.uptime_ms;
0372 result.event_level = JEventLevel::PhysicsEvent;
0373 return result;
0374 }
0375
0376 JExecutionEngine::Worker JExecutionEngine::RegisterWorker() {
0377 std::unique_lock<std::mutex> lock(m_mutex);
0378 auto worker_id = m_worker_states.size();
0379 auto worker = std::make_unique<WorkerState>();
0380 worker->worker_id = worker_id;
0381 worker->is_stop_requested = false;
0382 worker->cpu_id = m_topology->mapping.get_cpu_id(worker_id);
0383 worker->location_id = m_topology->mapping.get_loc_id(worker_id);
0384 worker->thread = nullptr;
0385 m_worker_states.push_back(std::move(worker));
0386
0387 bool pin_to_cpu = (m_topology->mapping.get_affinity() != JProcessorMapping::AffinityStrategy::None);
0388 if (pin_to_cpu) {
0389 JCpuInfo::PinThreadToCpu(worker->thread, worker->cpu_id);
0390 }
0391 return {worker_id, &worker->backtrace};
0392
0393 }
0394
0395
0396 void JExecutionEngine::RunWorker(Worker worker) {
0397
0398 LOG_DEBUG(GetLogger()) << "Launched worker thread " << worker.worker_id << LOG_END;
0399 jana2_worker_id = worker.worker_id;
0400 jana2_worker_backtrace = worker.backtrace;
0401 try {
0402 Task task;
0403 while (true) {
0404 ExchangeTask(task, worker.worker_id);
0405 if (task.arrow == nullptr) break;
0406 task.arrow->fire(task.input_event, task.outputs, task.output_count, task.status);
0407 }
0408 LOG_DEBUG(GetLogger()) << "Stopped worker thread " << worker.worker_id << LOG_END;
0409 }
0410 catch (...) {
0411 LOG_ERROR(GetLogger()) << "Exception on worker thread " << worker.worker_id << LOG_END;
0412 std::unique_lock<std::mutex> lock(m_mutex);
0413 m_runstatus = RunStatus::Failed;
0414 m_worker_states.at(worker.worker_id)->stored_exception = std::current_exception();
0415 }
0416 }
0417
0418
0419 void JExecutionEngine::ExchangeTask(Task& task, size_t worker_id, bool nonblocking) {
0420
0421 auto checkin_time = std::chrono::steady_clock::now();
0422
0423
0424
0425 std::unique_lock<std::mutex> lock(m_mutex);
0426
0427 auto& worker = *m_worker_states.at(worker_id);
0428
0429 if (task.arrow != nullptr) {
0430 CheckinCompletedTask_Unsafe(task, worker, checkin_time);
0431 }
0432
0433 if (worker.is_stop_requested) {
0434 return;
0435 }
0436
0437 FindNextReadyTask_Unsafe(task, worker);
0438
0439 if (nonblocking) { return; }
0440 auto idle_time_start = clock_t::now();
0441 m_total_scheduler_duration += (idle_time_start - checkin_time);
0442
0443 while (task.arrow == nullptr && !worker.is_stop_requested) {
0444 m_condvar.wait(lock);
0445 FindNextReadyTask_Unsafe(task, worker);
0446 }
0447 worker.last_checkout_time = clock_t::now();
0448
0449 if (task.input_event != nullptr) {
0450 worker.last_event_nr = task.input_event->GetEventNumber();
0451 }
0452 else {
0453 worker.last_event_nr = 0;
0454 }
0455 m_total_idle_duration += (worker.last_checkout_time - idle_time_start);
0456
0457
0458
0459
0460 m_condvar.notify_one();
0461 }
0462
0463
0464 void JExecutionEngine::CheckinCompletedTask_Unsafe(Task& task, WorkerState& worker, clock_t::time_point checkin_time) {
0465
0466 auto processing_duration = checkin_time - worker.last_checkout_time;
0467
0468 ArrowState& arrow_state = m_arrow_states.at(worker.last_arrow_id);
0469
0470 arrow_state.active_tasks -= 1;
0471 arrow_state.total_processing_duration += processing_duration;
0472
0473 for (size_t output=0; output<task.output_count; ++output) {
0474 if (!task.arrow->get_port(task.outputs[output].second).is_input) {
0475 arrow_state.events_processed++;
0476 }
0477 }
0478
0479
0480 task.arrow->push(task.outputs, task.output_count, worker.location_id);
0481
0482 if (task.status == JArrow::FireResult::Finished) {
0483
0484
0485
0486
0487 arrow_state.status = ArrowState::Status::Finished;
0488
0489
0490 if (m_runstatus == RunStatus::Running) {
0491 bool draining = true;
0492 for (auto& arrow: m_arrow_states) {
0493 if (arrow.is_source && arrow.status == ArrowState::Status::Running) {
0494 draining = false;
0495 }
0496 }
0497 if (draining) {
0498 m_runstatus = RunStatus::Draining;
0499 }
0500 }
0501 }
0502 worker.last_arrow_id = -1;
0503 worker.last_event_nr = 0;
0504
0505 task.arrow = nullptr;
0506 task.input_event = nullptr;
0507 task.output_count = 0;
0508 task.status = JArrow::FireResult::NotRunYet;
0509 };
0510
0511
0512 void JExecutionEngine::FindNextReadyTask_Unsafe(Task& task, WorkerState& worker) {
0513
0514 if (m_runstatus == RunStatus::Running || m_runstatus == RunStatus::Draining) {
0515
0516
0517
0518 size_t arrow_count = m_arrow_states.size();
0519 m_next_arrow_id += 1;
0520 m_next_arrow_id %= arrow_count;
0521
0522 for (size_t i=m_next_arrow_id; i<(m_next_arrow_id+arrow_count); ++i) {
0523 size_t arrow_id = i % arrow_count;
0524
0525 auto& state = m_arrow_states[arrow_id];
0526 if (!state.is_parallel && (state.active_tasks != 0)) {
0527
0528 LOG_TRACE(GetLogger()) << "Scheduler: Arrow with id " << arrow_id << " is unready: Sequential and already active." << LOG_END;
0529 continue;
0530 }
0531
0532 if (state.status != ArrowState::Status::Running) {
0533 LOG_TRACE(GetLogger()) << "Scheduler: Arrow with id " << arrow_id << " is unready: Arrow is either paused or finished." << LOG_END;
0534 continue;
0535 }
0536
0537
0538
0539 JArrow* arrow = m_topology->arrows[arrow_id];
0540
0541 auto port = arrow->get_next_port_index();
0542 JEvent* event = (port == -1) ? nullptr : arrow->pull(port, worker.location_id);
0543 if (event != nullptr || port == -1) {
0544 LOG_TRACE(GetLogger()) << "Scheduler: Found next ready arrow with id " << arrow_id << LOG_END;
0545
0546 state.active_tasks += 1;
0547
0548 task.arrow = arrow;
0549 task.input_port = port;
0550 task.input_event = event;
0551 task.output_count = 0;
0552 task.status = JArrow::FireResult::NotRunYet;
0553
0554 worker.last_arrow_id = arrow_id;
0555 if (event != nullptr) {
0556 worker.is_event_warmed_up = event->IsWarmedUp();
0557 worker.last_event_nr = event->GetEventNumber();
0558 }
0559 else {
0560 worker.is_event_warmed_up = true;
0561 worker.last_event_nr = 0;
0562 }
0563 return;
0564 }
0565 else {
0566 LOG_TRACE(GetLogger()) << "Scheduler: Arrow with id " << arrow_id << " is unready: Input event is needed but not on queue yet." << LOG_END;
0567 }
0568 }
0569 }
0570
0571
0572
0573
0574
0575
0576 if (m_runstatus == RunStatus::Running || m_runstatus == RunStatus::Pausing || m_runstatus == RunStatus::Draining) {
0577
0578
0579
0580 bool any_active_source_found = false;
0581 bool any_active_task_found = false;
0582
0583 LOG_DEBUG(GetLogger()) << "Scheduler: No tasks ready" << LOG_END;
0584
0585 for (size_t arrow_id = 0; arrow_id < m_arrow_states.size(); ++arrow_id) {
0586 auto& state = m_arrow_states[arrow_id];
0587 any_active_source_found |= (state.status == ArrowState::Status::Running && state.is_source);
0588 any_active_task_found |= (state.active_tasks != 0);
0589
0590 }
0591
0592 if (!any_active_source_found && !any_active_task_found) {
0593
0594 m_time_at_finish = clock_t::now();
0595 m_event_count_at_finish = 0;
0596 for (auto& arrow_state : m_arrow_states) {
0597 if (arrow_state.is_sink) {
0598 m_event_count_at_finish += arrow_state.events_processed;
0599 }
0600 }
0601 LOG_DEBUG(GetLogger()) << "Scheduler: Processing paused" << LOG_END;
0602 m_runstatus = RunStatus::Paused;
0603
0604 }
0605 }
0606
0607 worker.last_arrow_id = -1;
0608
0609 task.arrow = nullptr;
0610 task.input_port = -1;
0611 task.input_event = nullptr;
0612 task.output_count = 0;
0613 task.status = JArrow::FireResult::NotRunYet;
0614 }
0615
0616
0617 void JExecutionEngine::PrintFinalReport() {
0618
0619 std::unique_lock<std::mutex> lock(m_mutex);
0620 auto event_count = m_event_count_at_finish - m_event_count_at_start;
0621 auto uptime_ms = std::chrono::duration_cast<std::chrono::milliseconds>(m_time_at_finish - m_time_at_start).count();
0622 auto thread_count = m_worker_states.size();
0623 auto throughput_hz = (event_count * 1000.0) / uptime_ms;
0624
0625 LOG_INFO(GetLogger()) << "Detailed report:" << LOG_END;
0626 LOG_INFO(GetLogger()) << LOG_END;
0627 LOG_INFO(GetLogger()) << " Avg throughput [Hz]: " << std::setprecision(3) << throughput_hz << LOG_END;
0628 LOG_INFO(GetLogger()) << " Completed events [count]: " << event_count << LOG_END;
0629 LOG_INFO(GetLogger()) << " Total uptime [s]: " << std::setprecision(4) << uptime_ms/1000.0 << LOG_END;
0630 LOG_INFO(GetLogger()) << " Thread team size [count]: " << thread_count << LOG_END;
0631 LOG_INFO(GetLogger()) << LOG_END;
0632 LOG_INFO(GetLogger()) << " Arrow-level metrics:" << LOG_END;
0633 LOG_INFO(GetLogger()) << LOG_END;
0634
0635 size_t total_useful_ms = 0;
0636
0637 for (size_t arrow_id=0; arrow_id < m_arrow_states.size(); ++arrow_id) {
0638 auto* arrow = m_topology->arrows[arrow_id];
0639 auto& arrow_state = m_arrow_states[arrow_id];
0640 auto useful_ms = std::chrono::duration_cast<std::chrono::milliseconds>(arrow_state.total_processing_duration).count();
0641 total_useful_ms += useful_ms;
0642 auto avg_latency = useful_ms*1.0/arrow_state.events_processed;
0643 auto throughput_bottleneck = 1000.0 / avg_latency;
0644 if (arrow->is_parallel()) {
0645 throughput_bottleneck *= thread_count;
0646 }
0647
0648 LOG_INFO(GetLogger()) << " - Arrow name: " << arrow->get_name() << LOG_END;
0649 LOG_INFO(GetLogger()) << " Parallel: " << arrow->is_parallel() << LOG_END;
0650 LOG_INFO(GetLogger()) << " Events completed: " << arrow_state.events_processed << LOG_END;
0651 LOG_INFO(GetLogger()) << " Avg latency [ms/event]: " << avg_latency << LOG_END;
0652 LOG_INFO(GetLogger()) << " Throughput bottleneck [Hz]: " << throughput_bottleneck << LOG_END;
0653 LOG_INFO(GetLogger()) << LOG_END;
0654 }
0655
0656 auto total_scheduler_ms = std::chrono::duration_cast<std::chrono::milliseconds>(m_total_scheduler_duration).count();
0657 auto total_idle_ms = std::chrono::duration_cast<std::chrono::milliseconds>(m_total_idle_duration).count();
0658
0659 LOG_INFO(GetLogger()) << " Total useful time [s]: " << std::setprecision(6) << total_useful_ms/1000.0 << LOG_END;
0660 LOG_INFO(GetLogger()) << " Total scheduler time [s]: " << std::setprecision(6) << total_scheduler_ms/1000.0 << LOG_END;
0661 LOG_INFO(GetLogger()) << " Total idle time [s]: " << std::setprecision(6) << total_idle_ms/1000.0 << LOG_END;
0662
0663 LOG_INFO(GetLogger()) << LOG_END;
0664
0665 LOG_WARN(GetLogger()) << "Final report: " << event_count << " events processed at "
0666 << JTypeInfo::to_string_with_si_prefix(throughput_hz) << "Hz" << LOG_END;
0667
0668 }
0669
0670 void JExecutionEngine::SetTickerEnabled(bool show_ticker) {
0671 m_show_ticker = show_ticker;
0672 }
0673
0674 bool JExecutionEngine::IsTickerEnabled() const {
0675 return m_show_ticker;
0676 }
0677
0678 void JExecutionEngine::SetTimeoutEnabled(bool timeout_enabled) {
0679 m_enable_timeout = timeout_enabled;
0680 }
0681
0682 bool JExecutionEngine::IsTimeoutEnabled() const {
0683 return m_enable_timeout;
0684 }
0685
0686 JArrow::FireResult JExecutionEngine::Fire(size_t arrow_id, size_t location_id) {
0687
0688 std::unique_lock<std::mutex> lock(m_mutex);
0689 if (arrow_id >= m_topology->arrows.size()) {
0690 LOG_WARN(GetLogger()) << "Firing unsuccessful: No arrow exists with id=" << arrow_id << LOG_END;
0691 return JArrow::FireResult::NotRunYet;
0692 }
0693 JArrow* arrow = m_topology->arrows[arrow_id];
0694 LOG_WARN(GetLogger()) << "Attempting to fire arrow with name=" << arrow->get_name()
0695 << ", index=" << arrow_id << ", location=" << location_id << LOG_END;
0696
0697 ArrowState& arrow_state = m_arrow_states[arrow_id];
0698 if (arrow_state.status == ArrowState::Status::Finished) {
0699 LOG_WARN(GetLogger()) << "Firing unsuccessful: Arrow status is Finished." << arrow_id << LOG_END;
0700 return JArrow::FireResult::Finished;
0701 }
0702 if (!arrow_state.is_parallel && arrow_state.active_tasks != 0) {
0703 LOG_WARN(GetLogger()) << "Firing unsuccessful: Arrow is sequential and already has an active task." << arrow_id << LOG_END;
0704 return JArrow::FireResult::NotRunYet;
0705 }
0706 arrow_state.active_tasks += 1;
0707
0708 auto port = arrow->get_next_port_index();
0709 JEvent* event = nullptr;
0710 if (port != -1) {
0711 event = arrow->pull(port, location_id);
0712 if (event == nullptr) {
0713 LOG_WARN(GetLogger()) << "Firing unsuccessful: Arrow needs an input event from port " << port << ", but the queue or pool is empty." << LOG_END;
0714 arrow_state.active_tasks -= 1;
0715 return JArrow::FireResult::NotRunYet;
0716 }
0717 else {
0718 LOG_WARN(GetLogger()) << "Input event #" << event->GetEventNumber() << " from port " << port << LOG_END;
0719 }
0720 }
0721 else {
0722 LOG_WARN(GetLogger()) << "No input events" << LOG_END;
0723 }
0724 lock.unlock();
0725
0726 size_t output_count;
0727 JArrow::OutputData outputs;
0728 JArrow::FireResult result = JArrow::FireResult::NotRunYet;
0729
0730 LOG_WARN(GetLogger()) << "Firing arrow" << LOG_END;
0731 arrow->fire(event, outputs, output_count, result);
0732 LOG_WARN(GetLogger()) << "Fired arrow with result " << to_string(result) << LOG_END;
0733 if (output_count == 0) {
0734 LOG_WARN(GetLogger()) << "No output events" << LOG_END;
0735 }
0736 else {
0737 for (size_t i=0; i<output_count; ++i) {
0738 LOG_WARN(GetLogger()) << "Output event #" << outputs.at(i).first->GetEventNumber() << " on port " << outputs.at(i).second << LOG_END;
0739 }
0740 }
0741
0742 lock.lock();
0743 arrow->push(outputs, output_count, location_id);
0744 arrow_state.active_tasks -= 1;
0745 lock.unlock();
0746 return result;
0747 }
0748
0749
0750 void JExecutionEngine::HandleSIGINT() {
0751 InterruptStatus status = m_interrupt_status;
0752 std::cout << std::endl;
0753 switch (status) {
0754 case InterruptStatus::NoInterruptsSupervised: m_interrupt_status = InterruptStatus::InspectRequested; break;
0755 case InterruptStatus::InspectRequested: m_interrupt_status = InterruptStatus::PauseAndQuit; break;
0756 case InterruptStatus::NoInterruptsUnsupervised:
0757 case InterruptStatus::PauseAndQuit:
0758 case InterruptStatus::InspectInProgress:
0759 _exit(-2);
0760 }
0761 }
0762
0763 void JExecutionEngine::HandleSIGUSR1() {
0764 m_send_worker_report_requested = true;
0765 }
0766
0767 void JExecutionEngine::HandleSIGUSR2() {
0768 if (jana2_worker_backtrace != nullptr) {
0769 jana2_worker_backtrace->Capture(3);
0770 }
0771 }
0772
0773 void JExecutionEngine::HandleSIGTSTP() {
0774 std::cout << std::endl;
0775 m_print_worker_report_requested = true;
0776 }
0777
0778 void JExecutionEngine::PrintWorkerReport(bool send_to_pipe) {
0779
0780 std::unique_lock<std::mutex> lock(m_mutex);
0781 LOG_INFO(GetLogger()) << "Generating worker report. It may take some time to retrieve each symbol's debug information." << LOG_END;
0782 for (auto& worker: m_worker_states) {
0783 worker->backtrace.Reset();
0784 pthread_kill(worker->thread->native_handle(), SIGUSR2);
0785 }
0786 for (auto& worker: m_worker_states) {
0787 worker->backtrace.WaitForCapture();
0788 }
0789 std::ostringstream oss;
0790 oss << "Worker report" << std::endl;
0791 for (auto& worker: m_worker_states) {
0792 oss << "------------------------------" << std::endl
0793 << " Worker: " << worker->worker_id << std::endl
0794 << " Current arrow: " << worker->last_arrow_id << std::endl
0795 << " Current event: " << worker->last_event_nr << std::endl
0796 << " Backtrace:" << std::endl << std::endl
0797 << worker->backtrace.ToString();
0798 }
0799 auto s = oss.str();
0800 LOG_WARN(GetLogger()) << s << LOG_END;
0801
0802 if (send_to_pipe) {
0803
0804 int fd = open(m_path_to_named_pipe.c_str(), O_WRONLY);
0805 if (fd >= 0) {
0806 write(fd, s.c_str(), s.length()+1);
0807 close(fd);
0808 }
0809 else {
0810 LOG_ERROR(GetLogger()) << "Unable to open named pipe '" << m_path_to_named_pipe << "' for writing. \n"
0811 << " You can use a different named pipe for status info by setting the parameter `jana:status_fname`.\n"
0812 << " The status report will still show up in the log." << LOG_END;
0813 }
0814 }
0815 }
0816
0817
0818 std::string ToString(JExecutionEngine::RunStatus runstatus) {
0819 switch(runstatus) {
0820 case JExecutionEngine::RunStatus::Running: return "Running";
0821 case JExecutionEngine::RunStatus::Paused: return "Paused";
0822 case JExecutionEngine::RunStatus::Failed: return "Failed";
0823 case JExecutionEngine::RunStatus::Pausing: return "Pausing";
0824 case JExecutionEngine::RunStatus::Draining: return "Draining";
0825 case JExecutionEngine::RunStatus::Finished: return "Finished";
0826 default: return "CorruptedRunStatus";
0827 }
0828 }
0829