Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-05-27 07:36:29

0001 
0002 #include "JExecutionEngine.h"
0003 #include <JANA/Utils/JApplicationInspector.h>
0004 
0005 #include <chrono>
0006 #include <cstddef>
0007 #include <cstdio>
0008 #include <ctime>
0009 #include <exception>
0010 #include <mutex>
0011 #include <csignal>
0012 #include <ostream>
0013 #include <sstream>
0014 #include <sys/stat.h>
0015 #include <fcntl.h>
0016 #include <unistd.h>
0017 
0018 thread_local int jana2_worker_id = -1;
0019 thread_local JBacktrace* jana2_worker_backtrace = nullptr;
0020 
0021 void JExecutionEngine::Init() {
0022     auto params = GetApplication()->GetJParameterManager();
0023 
0024     params->SetDefaultParameter("jana:timeout", m_timeout_s, 
0025         "Max time (in seconds) JANA will wait for a thread to update its heartbeat before hard-exiting. 0 to disable timeout completely.");
0026 
0027     params->SetDefaultParameter("jana:warmup_timeout", m_warmup_timeout_s, 
0028         "Max time (in seconds) JANA will wait for 'initial' events to complete before hard-exiting.");
0029 
0030     params->SetDefaultParameter("jana:backoff_interval", m_backoff_ms, 
0031         "Max time (in seconds) JANA will wait for 'initial' events to complete before hard-exiting.");
0032 
0033     params->SetDefaultParameter("jana:show_ticker", m_show_ticker, "Controls whether the ticker is visible");
0034 
0035     params->SetDefaultParameter("jana:ticker_interval", m_ticker_ms, "Controls the ticker interval (in ms)");
0036 
0037     auto p = params->SetDefaultParameter("jana:status_fname", m_path_to_named_pipe,
0038         "Filename of named pipe for retrieving instantaneous status info");
0039 
0040     size_t pid = getpid();
0041     mkfifo(m_path_to_named_pipe.c_str(), 0666);
0042 
0043     LOG_WARN(GetLogger()) << "To pause processing and inspect, press Ctrl-C." << LOG_END;
0044     LOG_WARN(GetLogger()) << "For a clean shutdown, press Ctrl-C twice." << LOG_END;
0045     LOG_WARN(GetLogger()) << "For a hard shutdown, press Ctrl-C three times." << LOG_END;
0046 
0047     if (p->IsDefault()) {
0048         LOG_WARN(GetLogger()) << "For worker status information, press Ctrl-Z, or run `jana-status " << pid << "`" << LOG_END;
0049     }
0050     else {
0051         LOG_WARN(GetLogger()) << "For worker status information, press Ctrl-Z, or run `jana-status " << pid << " " << m_path_to_named_pipe << "`" << LOG_END;
0052     }
0053 
0054 
0055     // Not sure how I feel about putting this here yet, but I think it will at least work in both cases it needs to.
0056     // The reason this works is because JTopologyBuilder::create_topology() has already been called before 
0057     // JApplication::ProvideService<JExecutionEngine>().
0058     for (JArrow* arrow : m_topology->GetArrows()) {
0059 
0060         arrow->Initialize();
0061 
0062         m_arrow_states.emplace_back();
0063         auto& arrow_state = m_arrow_states.back();
0064         arrow_state.is_source = arrow->IsSource();
0065         arrow_state.is_sink = arrow->IsSink();
0066         arrow_state.is_parallel = arrow->IsParallel();
0067         arrow_state.next_input = arrow->GetNextPortIndex();
0068     }
0069 }
0070 
0071 void JExecutionEngine::RequestInspector() {
0072     std::unique_lock<std::mutex> lock(m_mutex);
0073     m_interrupt_status = InterruptStatus::InspectRequested;
0074 }
0075 
0076 void JExecutionEngine::RunTopology() {
0077     std::unique_lock<std::mutex> lock(m_mutex);
0078 
0079     if (m_runstatus == RunStatus::Failed) {
0080         throw JException("Cannot switch topology runstatus to Running because it is already Failed");
0081     }
0082     if (m_runstatus == RunStatus::Finished) {
0083         throw JException("Cannot switch topology runstatus to Running because it is already Finished");
0084     }
0085     if (m_arrow_states.size() == 0) {
0086         throw JException("Cannot execute an empty topology! Hint: Have you provided an event source?");
0087     }
0088 
0089     // Set start time and event count
0090     m_time_at_start = clock_t::now();
0091     m_event_count_at_start = m_event_count_at_finish;
0092 
0093     // Reactivate topology
0094     for (auto& arrow: m_arrow_states) {
0095         if (arrow.status == ArrowState::Status::Paused) {
0096             arrow.status = ArrowState::Status::Running;
0097         }
0098     }
0099 
0100     m_runstatus = RunStatus::Running;
0101 
0102     lock.unlock();
0103     m_condvar.notify_one();
0104 }
0105 
0106 void JExecutionEngine::ScaleWorkers(size_t nthreads) {
0107     // We both create and destroy the pool of workers here. They all sleep until they 
0108     // receive work from the scheduler, which won't happen until the runstatus <- {Running, 
0109     // Pausing, Draining} and there is a task ready to execute. This way worker creation/destruction
0110     // is decoupled from topology execution.
0111 
0112     // If we scale to zero, no workers will run. This is useful for testing, and also for using
0113     // an external thread team, should the need arise.
0114 
0115     std::unique_lock<std::mutex> lock(m_mutex);
0116 
0117     if (nthreads != 0 && m_arrow_states.size() == 0) {
0118         // We check that (nthreads != 0) because this gets called at shutdown even if the topology wasn't run
0119         // Remember, we want JApplication::Initialize() to succeed and JMain to shut down cleanly even when the topology is empty
0120         throw JException("Cannot execute an empty topology! Hint: Have you provided an event source?");
0121     }
0122 
0123     auto prev_nthreads = m_worker_states.size();
0124 
0125     if (prev_nthreads < nthreads) {
0126         // We are launching additional worker threads
0127         LOG_DEBUG(GetLogger()) << "Scaling up to " << nthreads << " worker threads" << LOG_END;
0128         auto mapping = m_topology->GetProcessorMapping();
0129         for (size_t worker_id=prev_nthreads; worker_id < nthreads; ++worker_id) {
0130             auto worker = std::make_unique<WorkerState>();
0131             worker->worker_id = worker_id;
0132             worker->is_stop_requested = false;
0133             worker->cpu_id = mapping.get_cpu_id(worker_id);
0134             worker->location_id = mapping.get_loc_id(worker_id);
0135             worker->thread = new std::thread(&JExecutionEngine::RunWorker, this, Worker{worker_id, &worker->backtrace});
0136             LOG_DEBUG(GetLogger()) << "Launching worker thread " << worker_id << " on cpu=" << worker->cpu_id << ", location=" << worker->location_id << LOG_END;
0137             m_worker_states.push_back(std::move(worker));
0138 
0139             bool pin_to_cpu = (mapping.get_affinity() != JProcessorMapping::AffinityStrategy::None);
0140             if (pin_to_cpu) {
0141                 JCpuInfo::PinThreadToCpu(worker->thread, worker->cpu_id);
0142             }
0143         }
0144     }
0145 
0146     else if (prev_nthreads > nthreads) {
0147         // We are destroying existing worker threads
0148         LOG_DEBUG(GetLogger()) << "Scaling down to " << nthreads << " worker threads" << LOG_END;
0149 
0150         // Signal to threads that they need to terminate.
0151         for (int worker_id=prev_nthreads-1; worker_id >= (int)nthreads; --worker_id) {
0152             LOG_DEBUG(GetLogger()) << "Stopping worker " << worker_id << LOG_END;
0153             m_worker_states[worker_id]->is_stop_requested = true;
0154         }
0155         m_condvar.notify_all(); // Wake up all threads so that they can exit the condvar wait loop
0156         lock.unlock();
0157 
0158         // We join all (eligible) threads _outside_ of the mutex
0159         for (int worker_id=prev_nthreads-1; worker_id >= (int) nthreads; --worker_id) {
0160             if (m_worker_states[worker_id]->thread != nullptr) {
0161                 if (m_worker_states[worker_id]->is_timed_out) {
0162                     // Thread has timed out. Rather than non-cooperatively killing it,
0163                     // we relinquish ownership of it but remember that it was ours once and
0164                     // is still out there, somewhere, biding its time
0165                     m_worker_states[worker_id]->thread->detach();
0166                     LOG_DEBUG(GetLogger()) << "Detached worker " << worker_id << LOG_END;
0167                 }
0168                 else {
0169                     LOG_DEBUG(GetLogger()) << "Joining worker " << worker_id << LOG_END;
0170                     m_worker_states[worker_id]->thread->join();
0171                     LOG_DEBUG(GetLogger()) << "Joined worker " << worker_id << LOG_END;
0172                 }
0173             }
0174             else {
0175                 LOG_DEBUG(GetLogger()) << "Skipping worker " << worker_id << LOG_END;
0176             }
0177         }
0178 
0179         lock.lock();
0180         // We retake the mutex so we can safely modify m_worker_states
0181         for (int worker_id=prev_nthreads-1; worker_id >= (int)nthreads; --worker_id) {
0182             if (m_worker_states.back()->thread != nullptr) {
0183                 delete m_worker_states.back()->thread;
0184             }
0185             m_worker_states.pop_back();
0186         }
0187     }
0188 }
0189 
0190 void JExecutionEngine::PauseTopology() {
0191     std::unique_lock<std::mutex> lock(m_mutex);
0192     if (m_runstatus != RunStatus::Running) return;
0193     m_runstatus = RunStatus::Pausing;
0194     for (auto& arrow: m_arrow_states) {
0195         if (arrow.status == ArrowState::Status::Running) {
0196             arrow.status = ArrowState::Status::Paused;
0197         }
0198     }
0199     LOG_WARN(GetLogger()) << "Requested pause" << LOG_END;
0200     lock.unlock();
0201     m_condvar.notify_all();
0202 }
0203 
0204 void JExecutionEngine::DrainTopology() {
0205     std::unique_lock<std::mutex> lock(m_mutex);
0206     if (m_runstatus != RunStatus::Running) return;
0207     m_runstatus = RunStatus::Draining;
0208     for (auto& arrow: m_arrow_states) {
0209         if (arrow.is_source) {
0210             if (arrow.status == ArrowState::Status::Running) {
0211                 arrow.status = ArrowState::Status::Paused;
0212             }
0213         }
0214     }
0215     LOG_WARN(GetLogger()) << "Requested drain" << LOG_END;
0216     lock.unlock();
0217     m_condvar.notify_all();
0218 }
0219 
0220 void JExecutionEngine::RunSupervisor() {
0221 
0222     if (m_interrupt_status == InterruptStatus::NoInterruptsUnsupervised) {
0223         m_interrupt_status = InterruptStatus::NoInterruptsSupervised;
0224     }
0225     size_t last_event_count = 0;
0226     clock_t::time_point last_measurement_time = clock_t::now();
0227 
0228     Perf perf;
0229     while (true) {
0230 
0231         if (m_enable_timeout && m_timeout_s > 0) {
0232             CheckTimeout();
0233         }
0234 
0235         if (m_print_worker_report_requested) {
0236             PrintWorkerReport(false);
0237             m_print_worker_report_requested = false;
0238         }
0239 
0240         if (m_send_worker_report_requested) {
0241             PrintWorkerReport(true);
0242             m_send_worker_report_requested = false;
0243         }
0244 
0245         perf = GetPerf();
0246         if ((perf.runstatus == RunStatus::Paused && m_interrupt_status != InterruptStatus::InspectRequested) || 
0247             perf.runstatus == RunStatus::Finished || 
0248             perf.runstatus == RunStatus::Failed) {
0249             break;
0250         }
0251 
0252         if (m_interrupt_status == InterruptStatus::InspectRequested) {
0253             if (perf.runstatus == RunStatus::Paused) {
0254                 LOG_INFO(GetLogger()) << "Entering inspector" << LOG_END;
0255                 m_enable_timeout = false;
0256                 m_interrupt_status = InterruptStatus::InspectInProgress;
0257                 InspectApplication(GetApplication());
0258                 m_interrupt_status = InterruptStatus::NoInterruptsSupervised;
0259 
0260                 // Jump back to the top of the loop so that we have fresh event count data
0261                 last_measurement_time = clock_t::now();
0262                 last_event_count = 0;
0263                 continue; 
0264             }
0265             else if (perf.runstatus == RunStatus::Running) {
0266                 PauseTopology();
0267             }
0268         }
0269         else if (m_interrupt_status == InterruptStatus::PauseAndQuit) {
0270             PauseTopology();
0271         }
0272 
0273         if (m_show_ticker) {
0274             auto next_measurement_time = clock_t::now();
0275             auto last_measurement_duration_ms = std::chrono::duration_cast<std::chrono::milliseconds>(next_measurement_time - last_measurement_time).count();
0276             float latest_throughput_hz = (last_measurement_duration_ms == 0) ? 0 : (perf.event_count - last_event_count) * 1000.0 / last_measurement_duration_ms;
0277             last_measurement_time = next_measurement_time;
0278             last_event_count = perf.event_count;
0279 
0280             // Print rates
0281             LOG_INFO(m_logger) << "Status: " << perf.event_count << " events processed at "
0282                             << JTypeInfo::to_string_with_si_prefix(latest_throughput_hz) << "Hz ("
0283                             << JTypeInfo::to_string_with_si_prefix(perf.throughput_hz) << "Hz avg)" << LOG_END;
0284         }
0285 
0286         std::this_thread::sleep_for(std::chrono::milliseconds(m_ticker_ms));
0287     }
0288     LOG_INFO(GetLogger()) << "Processing paused." << LOG_END;
0289 
0290     if (perf.runstatus == RunStatus::Failed) {
0291         HandleFailures();
0292     }
0293 
0294     PrintFinalReport();
0295 }
0296 
0297 bool JExecutionEngine::CheckTimeout() {
0298     std::unique_lock<std::mutex> lock(m_mutex);
0299     auto now = clock_t::now();
0300     bool timeout_detected = false;
0301     for (auto& worker: m_worker_states) {
0302         auto timeout_s = (worker->is_event_warmed_up) ? m_timeout_s : m_warmup_timeout_s;
0303         auto duration_s = std::chrono::duration_cast<std::chrono::seconds>(now - worker->last_checkout_time).count();
0304         if (duration_s > timeout_s && worker->last_arrow_id != static_cast<uint64_t>(-1)) {
0305             worker->is_timed_out = true;
0306             timeout_detected = true;
0307             m_runstatus = RunStatus::Failed;
0308         }
0309     }
0310     return timeout_detected;
0311 }
0312 
0313 void JExecutionEngine::HandleFailures() {
0314 
0315     std::unique_lock<std::mutex> lock(m_mutex);
0316 
0317     // First, we log all of the failures we've found
0318     for (auto& worker: m_worker_states) {
0319         if (worker->is_timed_out) {
0320             std::string arrow_name = (worker->last_arrow_id == static_cast<uint64_t>(-1)) ? "(none)" : m_topology->GetArrows()[worker->last_arrow_id]->GetName();
0321             LOG_FATAL(GetLogger()) << "Timeout in worker thread " << worker->worker_id << " while executing " << arrow_name << " on event #" << worker->last_event_nr << LOG_END;
0322             pthread_kill(worker->thread->native_handle(), SIGUSR2);
0323             LOG_INFO(GetLogger()) << "Worker thread signalled; waiting for backtrace capture." << LOG_END;
0324             worker->backtrace.WaitForCapture();
0325         }
0326         if (worker->stored_exception != nullptr) {
0327             std::string arrow_name = (worker->last_arrow_id == static_cast<uint64_t>(-1)) ? "(none)" : m_topology->GetArrows()[worker->last_arrow_id]->GetName();
0328             LOG_FATAL(GetLogger()) << "Exception in worker thread " << worker->worker_id << " while executing " << arrow_name << " on event #" << worker->last_event_nr << LOG_END;
0329         }
0330     }
0331 
0332     // Now we throw each of these exceptions in order, in case the caller is going to attempt to catch them.
0333     // In reality all callers are going to print everything they can about the exception and exit.
0334     for (auto& worker: m_worker_states) {
0335         if (worker->stored_exception != nullptr) {
0336             GetApplication()->SetExitCode((int) JApplication::ExitCode::UnhandledException);
0337             std::rethrow_exception(worker->stored_exception);
0338         }
0339         if (worker->is_timed_out) {
0340             GetApplication()->SetExitCode((int) JApplication::ExitCode::Timeout);
0341             auto ex = JException("Timeout in worker thread");
0342             ex.backtrace = worker->backtrace;
0343             throw ex;
0344         }
0345     }
0346 }
0347 
0348 void JExecutionEngine::FinishTopology() {
0349     std::unique_lock<std::mutex> lock(m_mutex);
0350     assert(m_runstatus == RunStatus::Paused);
0351 
0352     LOG_DEBUG(GetLogger()) << "Finishing processing..." << LOG_END;
0353     for (auto* arrow : m_topology->GetArrows()) {
0354         arrow->Finalize();
0355     }
0356     for (auto* pool: m_topology->GetPools()) {
0357         pool->Finalize();
0358     }
0359     m_runstatus = RunStatus::Finished;
0360     LOG_INFO(GetLogger()) << "Finished processing." << LOG_END;
0361 }
0362 
0363 JExecutionEngine::RunStatus JExecutionEngine::GetRunStatus() {
0364     std::unique_lock<std::mutex> lock(m_mutex);
0365     return m_runstatus;
0366 }
0367 
0368 JExecutionEngine::Perf JExecutionEngine::GetPerf() {
0369     std::unique_lock<std::mutex> lock(m_mutex);
0370     Perf result;
0371     if (m_runstatus == RunStatus::Paused || m_runstatus == RunStatus::Failed) {
0372         result.event_count = m_event_count_at_finish - m_event_count_at_start;
0373         result.uptime_ms = std::chrono::duration_cast<std::chrono::milliseconds>(m_time_at_finish - m_time_at_start).count();
0374     }
0375     else {
0376         // Obtain current event count
0377         size_t current_event_count = 0;
0378         for (auto& state : m_arrow_states) {
0379             if (state.is_sink) {
0380                 current_event_count += state.events_processed;
0381             }
0382         }
0383         result.event_count = current_event_count - m_event_count_at_start;
0384         result.uptime_ms = std::chrono::duration_cast<std::chrono::milliseconds>(clock_t::now() - m_time_at_start).count();
0385     }
0386     result.runstatus = m_runstatus;
0387     result.thread_count = m_worker_states.size();
0388     result.throughput_hz = (result.uptime_ms == 0) ? 0 : (result.event_count * 1000.0) / result.uptime_ms;
0389     result.event_level = JEventLevel::PhysicsEvent;
0390     return result;
0391 }
0392 
0393 JExecutionEngine::Worker JExecutionEngine::RegisterWorker() {
0394     std::unique_lock<std::mutex> lock(m_mutex);
0395     auto mapping = m_topology->GetProcessorMapping();
0396     auto worker_id = m_worker_states.size();
0397     auto worker = std::make_unique<WorkerState>();
0398     worker->worker_id = worker_id;
0399     worker->is_stop_requested = false;
0400     worker->cpu_id = mapping.get_cpu_id(worker_id);
0401     worker->location_id = mapping.get_loc_id(worker_id);
0402     worker->thread = nullptr;
0403     m_worker_states.push_back(std::move(worker));
0404 
0405     bool pin_to_cpu = (mapping.get_affinity() != JProcessorMapping::AffinityStrategy::None);
0406     if (pin_to_cpu) {
0407         JCpuInfo::PinThreadToCpu(worker->thread, worker->cpu_id);
0408     }
0409     return {worker_id, &worker->backtrace};
0410 
0411 }
0412 
0413 
0414 void JExecutionEngine::RunWorker(Worker worker) {
0415 
0416     LOG_DEBUG(GetLogger()) << "Launched worker thread " << worker.worker_id << LOG_END;
0417     jana2_worker_id = worker.worker_id;
0418     jana2_worker_backtrace = worker.backtrace;
0419     try {
0420         Task task;
0421         while (true) {
0422             ExchangeTask(task, worker.worker_id);
0423             if (task.arrow == nullptr) break; // Exit as soon as ExchangeTask() stops blocking
0424             task.arrow->Fire(task.input_event, task.outputs, task.output_count, task.status);
0425         }
0426         LOG_DEBUG(GetLogger()) << "Stopped worker thread " << worker.worker_id << LOG_END;
0427     }
0428     catch(JException& ex) {
0429         LOG_ERROR(GetLogger()) << "Exception on worker thread " << worker.worker_id << ": " << ex.GetMessage();
0430         std::unique_lock<std::mutex> lock(m_mutex);
0431         m_runstatus = RunStatus::Failed;
0432         m_worker_states.at(worker.worker_id)->stored_exception = std::current_exception();
0433     }
0434     catch (...) {
0435         LOG_ERROR(GetLogger()) << "Exception on worker thread " << worker.worker_id << LOG_END;
0436         std::unique_lock<std::mutex> lock(m_mutex);
0437         m_runstatus = RunStatus::Failed;
0438         m_worker_states.at(worker.worker_id)->stored_exception = std::current_exception();
0439     }
0440 }
0441 
0442 
0443 void JExecutionEngine::ExchangeTask(Task& task, size_t worker_id, bool nonblocking) {
0444 
0445     auto checkin_time = std::chrono::steady_clock::now();
0446     // It's important to start measuring this _before_ acquiring the lock because acquiring the lock
0447     // may be a big part of the scheduler overhead
0448 
0449     std::unique_lock<std::mutex> lock(m_mutex);
0450     
0451     auto& worker = *m_worker_states.at(worker_id);
0452 
0453     if (task.arrow != nullptr) {
0454         CheckinCompletedTask_Unsafe(task, worker, checkin_time);
0455     }
0456 
0457     if (worker.is_stop_requested) {
0458         return;
0459     }
0460 
0461     FindNextReadyTask_Unsafe(task, worker);
0462 
0463     if (nonblocking) { return; }
0464     auto idle_time_start = clock_t::now();
0465     m_total_scheduler_duration += (idle_time_start - checkin_time);
0466 
0467     while (task.arrow == nullptr && !worker.is_stop_requested) {
0468         m_condvar.wait(lock);
0469         FindNextReadyTask_Unsafe(task, worker);
0470     }
0471     worker.last_checkout_time = clock_t::now();
0472 
0473     if (task.input_event != nullptr) {
0474         worker.last_event_nr = task.input_event->GetEventNumber();
0475     }
0476     else {
0477         worker.last_event_nr = 0;
0478     }
0479     m_total_idle_duration += (worker.last_checkout_time - idle_time_start);
0480 
0481     // Notify one worker, who will notify the next, etc, as long as FindNextReadyTaskUnsafe() succeeds.
0482     // After FindNextReadyTaskUnsafe fails, all threads block until the next returning worker reactivates the
0483     // notification chain.
0484     m_condvar.notify_one();
0485 }
0486 
0487 
0488 void JExecutionEngine::CheckinCompletedTask_Unsafe(Task& task, WorkerState& worker, clock_t::time_point checkin_time) {
0489 
0490     auto processing_duration = checkin_time - worker.last_checkout_time;
0491 
0492     ArrowState& arrow_state = m_arrow_states.at(worker.last_arrow_id);
0493 
0494     arrow_state.active_tasks -= 1;
0495     arrow_state.total_processing_duration += processing_duration;
0496 
0497     for (size_t output=0; output<task.output_count; ++output) {
0498         if (!task.arrow->GetPort(task.outputs[output].second).GetSkipFinishEvent()) {
0499             arrow_state.events_processed++;
0500         }
0501     }
0502 
0503     // Put each output in its correct queue or pool
0504     task.arrow->Push(task.outputs, task.output_count, worker.location_id);
0505 
0506     if (task.status == JArrow::FireResult::Finished) {
0507         // If this is an eventsource self-terminating (the only thing that returns Status::Finished right now) it will
0508         // have already called DoClose(). I'm tempted to always call DoClose() as part of JExecutionEngine::Finish() instead, however.
0509 
0510         // Mark arrow as finished
0511         arrow_state.status = ArrowState::Status::Finished;
0512 
0513         // Check if this switches the topology to Draining()
0514         if (m_runstatus == RunStatus::Running) {
0515             bool draining = true;
0516             for (auto& arrow: m_arrow_states) {
0517                 if (arrow.is_source && arrow.status == ArrowState::Status::Running) {
0518                     draining = false;
0519                 }
0520             }
0521             if (draining) {
0522                 m_runstatus = RunStatus::Draining;
0523             }
0524         }
0525     }
0526     worker.last_arrow_id = -1;
0527     worker.last_event_nr = 0;
0528 
0529     task.arrow = nullptr;
0530     task.input_event = nullptr;
0531     task.output_count = 0;
0532     task.status = JArrow::FireResult::NotRunYet;
0533 };
0534 
0535 
0536 void JExecutionEngine::FindNextReadyTask_Unsafe(Task& task, WorkerState& worker) {
0537 
0538     if (m_runstatus == RunStatus::Running || m_runstatus == RunStatus::Draining) {
0539         // We only pick up a new task if the topology is running or draining.
0540 
0541         // Each call to FindNextReadyTask_Unsafe() starts with a different m_next_arrow_id to ensure balanced arrow assignments
0542         size_t arrow_count = m_arrow_states.size();
0543         m_next_arrow_id += 1;
0544         m_next_arrow_id %= arrow_count;
0545 
0546         for (size_t i=m_next_arrow_id; i<(m_next_arrow_id+arrow_count); ++i) {
0547             size_t arrow_id = i % arrow_count;
0548 
0549             auto& state = m_arrow_states[arrow_id];
0550             if (!state.is_parallel && (state.active_tasks != 0)) {
0551                 // We've found a sequential arrow that is already active. Nothing we can do here.
0552                 LOG_TRACE(GetLogger()) << "Scheduler: Arrow with id " << arrow_id << " is unready: Sequential and already active." << LOG_END;
0553                 continue;
0554             }
0555 
0556             if (state.status != ArrowState::Status::Running) {
0557                 LOG_TRACE(GetLogger()) << "Scheduler: Arrow with id " << arrow_id << " is unready: Arrow is either paused or finished." << LOG_END;
0558                 continue;
0559             }
0560             // TODO: Support next_visit_time so that we don't hammer blocked event sources
0561 
0562             // See if we can obtain an input event (this is silly)
0563             JArrow* arrow = m_topology->GetArrows()[arrow_id];
0564             // TODO: consider setting state.next_input, retrieving via Fire()
0565             auto port = arrow->GetNextPortIndex();
0566             JEvent* event = (port == -1) ? nullptr : arrow->Pull(port, worker.location_id);
0567             if (event != nullptr || port == -1) {
0568                 LOG_TRACE(GetLogger()) << "Scheduler: Found next ready arrow with id " << arrow_id << LOG_END;
0569                 // We've found a task that is ready!
0570                 state.active_tasks += 1;
0571 
0572                 task.arrow = arrow;
0573                 task.input_port = port;
0574                 task.input_event = event;
0575                 task.output_count = 0;
0576                 task.status = JArrow::FireResult::NotRunYet;
0577 
0578                 worker.last_arrow_id = arrow_id;
0579                 if (event != nullptr) {
0580                     worker.is_event_warmed_up = event->IsWarmedUp();
0581                     worker.last_event_nr = event->GetEventNumber();
0582                 }
0583                 else {
0584                     worker.is_event_warmed_up = true; // Use shorter timeout
0585                     worker.last_event_nr = 0;
0586                 }
0587                 return;
0588             }
0589             else {
0590                 LOG_TRACE(GetLogger()) << "Scheduler: Arrow with id " << arrow_id << " is unready: Input event is needed but not on queue yet." << LOG_END;
0591             }
0592         }
0593     }
0594 
0595     // Because we reached this point, we know that there aren't any tasks ready,
0596     // so we check whether more are potentially coming. If not, we can pause the topology.
0597     // Note that our worker threads will still wait at ExchangeTask() until they get
0598     // shut down separately during Scale().
0599     
0600     if (m_runstatus == RunStatus::Running || m_runstatus == RunStatus::Pausing || m_runstatus == RunStatus::Draining) {
0601         // We want to avoid scenarios such as where the topology already Finished but then gets reset to Paused
0602         // This also leaves a cleaner narrative in the logs. 
0603 
0604         bool any_active_source_found = false;
0605         bool any_active_task_found = false;
0606         
0607         LOG_DEBUG(GetLogger()) << "Scheduler: No tasks ready" << LOG_END;
0608 
0609         for (size_t arrow_id = 0; arrow_id < m_arrow_states.size(); ++arrow_id) {
0610             auto& state = m_arrow_states[arrow_id];
0611             any_active_source_found |= (state.status == ArrowState::Status::Running && state.is_source);
0612             any_active_task_found |= (state.active_tasks != 0);
0613             // A source might have been deactivated by RequestPause, Ctrl-C, etc, and might be inactive even though it still has active tasks
0614         }
0615 
0616         if (!any_active_source_found && !any_active_task_found) {
0617             // Pause the topology
0618             m_time_at_finish = clock_t::now();
0619             m_event_count_at_finish = 0;
0620             for (auto& arrow_state : m_arrow_states) {
0621                 if (arrow_state.is_sink) {
0622                     m_event_count_at_finish += arrow_state.events_processed;
0623                 }
0624             }
0625             LOG_DEBUG(GetLogger()) << "Scheduler: Processing paused" << LOG_END;
0626             m_runstatus = RunStatus::Paused;
0627             // I think this is the ONLY site where the topology gets paused. Verify this?
0628         }
0629     }
0630 
0631     worker.last_arrow_id = -1;
0632 
0633     task.arrow = nullptr;
0634     task.input_port = -1;
0635     task.input_event = nullptr;
0636     task.output_count = 0;
0637     task.status = JArrow::FireResult::NotRunYet;
0638 }
0639 
0640 
0641 void JExecutionEngine::PrintFinalReport() {
0642 
0643     std::unique_lock<std::mutex> lock(m_mutex);
0644     auto event_count = m_event_count_at_finish - m_event_count_at_start;
0645     auto uptime_ms = std::chrono::duration_cast<std::chrono::milliseconds>(m_time_at_finish - m_time_at_start).count();
0646     auto thread_count = m_worker_states.size();
0647     auto throughput_hz = (event_count * 1000.0) / uptime_ms;
0648 
0649     LOG_INFO(GetLogger()) << "Detailed report:" << LOG_END;
0650     LOG_INFO(GetLogger()) << LOG_END;
0651     LOG_INFO(GetLogger()) << "  Avg throughput [Hz]:         " << std::setprecision(3) << throughput_hz << LOG_END;
0652     LOG_INFO(GetLogger()) << "  Completed events [count]:    " << event_count << LOG_END;
0653     LOG_INFO(GetLogger()) << "  Total uptime [s]:            " << std::setprecision(4) << uptime_ms/1000.0 << LOG_END;
0654     LOG_INFO(GetLogger()) << "  Thread team size [count]:    " << thread_count << LOG_END;
0655     LOG_INFO(GetLogger()) << LOG_END;
0656     LOG_INFO(GetLogger()) << "  Arrow-level metrics:" << LOG_END;
0657     LOG_INFO(GetLogger()) << LOG_END;
0658 
0659     size_t total_useful_ms = 0;
0660 
0661     for (size_t arrow_id=0; arrow_id < m_arrow_states.size(); ++arrow_id) {
0662         auto* arrow = m_topology->GetArrows()[arrow_id];
0663         auto& arrow_state = m_arrow_states[arrow_id];
0664         auto useful_ms = std::chrono::duration_cast<std::chrono::milliseconds>(arrow_state.total_processing_duration).count();
0665         total_useful_ms += useful_ms;
0666         auto avg_latency = useful_ms*1.0/arrow_state.events_processed;
0667         auto throughput_bottleneck = 1000.0 / avg_latency;
0668         if (arrow->IsParallel()) {
0669             throughput_bottleneck *= thread_count;
0670         }
0671 
0672         LOG_INFO(GetLogger()) << "  - Arrow name:                 " << arrow->GetName() << LOG_END;
0673         LOG_INFO(GetLogger()) << "    Parallel:                   " << arrow->IsParallel() << LOG_END;
0674         LOG_INFO(GetLogger()) << "    Events completed:           " << arrow_state.events_processed << LOG_END;
0675         LOG_INFO(GetLogger()) << "    Avg latency [ms/event]:     " << avg_latency << LOG_END;
0676         LOG_INFO(GetLogger()) << "    Throughput bottleneck [Hz]: " << throughput_bottleneck << LOG_END;
0677         LOG_INFO(GetLogger()) << LOG_END;
0678     }
0679 
0680     auto total_scheduler_ms = std::chrono::duration_cast<std::chrono::milliseconds>(m_total_scheduler_duration).count();
0681     auto total_idle_ms = std::chrono::duration_cast<std::chrono::milliseconds>(m_total_idle_duration).count();
0682 
0683     LOG_INFO(GetLogger()) << "  Total useful time [s]:     " << std::setprecision(6) << total_useful_ms/1000.0 << LOG_END;
0684     LOG_INFO(GetLogger()) << "  Total scheduler time [s]:  " << std::setprecision(6) << total_scheduler_ms/1000.0 << LOG_END;
0685     LOG_INFO(GetLogger()) << "  Total idle time [s]:       " << std::setprecision(6) << total_idle_ms/1000.0 << LOG_END;
0686 
0687     LOG_INFO(GetLogger()) << LOG_END;
0688 
0689     LOG_INFO(GetLogger()) << "Final report: " << event_count << " events processed at "
0690                           << JTypeInfo::to_string_with_si_prefix(throughput_hz) << "Hz" << LOG_END;
0691 
0692 }
0693 
0694 void JExecutionEngine::SetTickerEnabled(bool show_ticker) {
0695     m_show_ticker = show_ticker;
0696 }
0697 
0698 bool JExecutionEngine::IsTickerEnabled() const {
0699     return m_show_ticker;
0700 }
0701 
0702 void JExecutionEngine::SetTimeoutEnabled(bool timeout_enabled) {
0703     m_enable_timeout = timeout_enabled;
0704 }
0705 
0706 bool JExecutionEngine::IsTimeoutEnabled() const {
0707     return m_enable_timeout;
0708 }
0709 
0710 JArrow::FireResult JExecutionEngine::Fire(size_t arrow_id, size_t location_id) {
0711 
0712     std::unique_lock<std::mutex> lock(m_mutex);
0713     if (arrow_id >= m_topology->GetArrows().size()) {
0714         LOG_WARN(GetLogger()) << "Firing unsuccessful: No arrow exists with id=" << arrow_id << LOG_END;
0715         return JArrow::FireResult::NotRunYet;
0716     }
0717     JArrow* arrow = m_topology->GetArrows()[arrow_id];
0718     LOG_WARN(GetLogger()) << "Attempting to fire arrow with name=" << arrow->GetName() 
0719                           << ", index=" << arrow_id << ", location=" << location_id << LOG_END;
0720 
0721     ArrowState& arrow_state = m_arrow_states[arrow_id];
0722     if (arrow_state.status == ArrowState::Status::Finished) {
0723         LOG_WARN(GetLogger()) << "Firing unsuccessful: Arrow status is Finished." << arrow_id << LOG_END;
0724         return JArrow::FireResult::Finished;
0725     }
0726     if (!arrow_state.is_parallel && arrow_state.active_tasks != 0) {
0727         LOG_WARN(GetLogger()) << "Firing unsuccessful: Arrow is sequential and already has an active task." << arrow_id << LOG_END;
0728         return JArrow::FireResult::NotRunYet;
0729     }
0730     arrow_state.active_tasks += 1;
0731 
0732     auto port = arrow->GetNextPortIndex();
0733     JEvent* event = nullptr;
0734     if (port != -1) {
0735         event = arrow->Pull(port, location_id);
0736         if (event == nullptr) {
0737             LOG_WARN(GetLogger()) << "Firing unsuccessful: Arrow needs an input event from port " << port << ", but the queue or pool is empty." << LOG_END;
0738             arrow_state.active_tasks -= 1;
0739             return JArrow::FireResult::NotRunYet;
0740         }
0741         else {
0742             LOG_WARN(GetLogger()) << "Input event #" << event->GetEventNumber() << " from port " << port << LOG_END;
0743         }
0744     }
0745     else {
0746         LOG_WARN(GetLogger()) << "No input events" << LOG_END;
0747     }
0748     lock.unlock();
0749 
0750     size_t output_count;
0751     JArrow::OutputData outputs;
0752     JArrow::FireResult result = JArrow::FireResult::NotRunYet;
0753 
0754     LOG_WARN(GetLogger()) << "Firing arrow" << LOG_END;
0755     arrow->Fire(event, outputs, output_count, result);
0756     LOG_WARN(GetLogger()) << "Fired arrow with result " << ToString(result) << LOG_END;
0757     if (output_count == 0) {
0758         LOG_WARN(GetLogger()) << "No output events" << LOG_END;
0759     }
0760     else {
0761         for (size_t i=0; i<output_count; ++i) {
0762             LOG_WARN(GetLogger()) << "Output event #" << outputs.at(i).first->GetEventNumber() << " on port " << outputs.at(i).second << LOG_END;
0763         }
0764     }
0765 
0766     lock.lock();
0767     arrow->Push(outputs, output_count, location_id);
0768     arrow_state.active_tasks -= 1;
0769     lock.unlock();
0770     return result;
0771 }
0772 
0773 
0774 void JExecutionEngine::HandleSIGINT() {
0775     InterruptStatus status = m_interrupt_status;
0776     std::cout << std::endl;
0777     switch (status) {
0778         case InterruptStatus::NoInterruptsSupervised: m_interrupt_status = InterruptStatus::InspectRequested; break;
0779         case InterruptStatus::InspectRequested: m_interrupt_status = InterruptStatus::PauseAndQuit; break;
0780         case InterruptStatus::NoInterruptsUnsupervised:
0781         case InterruptStatus::PauseAndQuit:
0782         case InterruptStatus::InspectInProgress: 
0783             _exit(-2);
0784     }
0785 }
0786 
0787 void JExecutionEngine::HandleSIGUSR1() {
0788     m_send_worker_report_requested = true;
0789 }
0790 
0791 void JExecutionEngine::HandleSIGUSR2() {
0792     if (jana2_worker_backtrace != nullptr) {
0793         jana2_worker_backtrace->Capture(3);
0794     }
0795 }
0796 
0797 void JExecutionEngine::HandleSIGTSTP() {
0798     std::cout << std::endl;
0799     m_print_worker_report_requested = true;
0800 }
0801 
0802 void JExecutionEngine::PrintWorkerReport(bool send_to_pipe) {
0803 
0804     std::unique_lock<std::mutex> lock(m_mutex);
0805     LOG_INFO(GetLogger()) << "Generating worker report. It may take some time to retrieve each symbol's debug information." << LOG_END;
0806     for (auto& worker: m_worker_states) {
0807         worker->backtrace.Reset();
0808         pthread_kill(worker->thread->native_handle(), SIGUSR2);
0809     }
0810     for (auto& worker: m_worker_states) {
0811         worker->backtrace.WaitForCapture();
0812     }
0813     std::ostringstream oss;
0814     oss << "Worker report" << std::endl;
0815     for (auto& worker: m_worker_states) {
0816         oss << "------------------------------" << std::endl 
0817             << "  Worker:        " << worker->worker_id << std::endl
0818             << "  Current arrow: " << worker->last_arrow_id << std::endl
0819             << "  Current event: " << worker->last_event_nr << std::endl
0820             << "  Backtrace:" << std::endl << std::endl
0821             << worker->backtrace.ToString();
0822     }
0823     auto s = oss.str();
0824     LOG_WARN(GetLogger()) << s << LOG_END;
0825 
0826     if (send_to_pipe) {
0827 
0828         int fd = open(m_path_to_named_pipe.c_str(), O_WRONLY);
0829         if (fd >= 0) {
0830             write(fd, s.c_str(), s.length()+1);
0831             close(fd);
0832         }
0833         else {
0834             LOG_ERROR(GetLogger()) << "Unable to open named pipe '" << m_path_to_named_pipe << "' for writing. \n"
0835             << "  You can use a different named pipe for status info by setting the parameter `jana:status_fname`.\n"
0836             << "  The status report will still show up in the log." << LOG_END;
0837         }
0838     }
0839 }
0840 
0841 
0842 std::string ToString(JExecutionEngine::RunStatus runstatus) {
0843     switch(runstatus) {
0844         case JExecutionEngine::RunStatus::Running: return "Running";
0845         case JExecutionEngine::RunStatus::Paused: return "Paused";
0846         case JExecutionEngine::RunStatus::Failed: return "Failed";
0847         case JExecutionEngine::RunStatus::Pausing: return "Pausing";
0848         case JExecutionEngine::RunStatus::Draining: return "Draining";
0849         case JExecutionEngine::RunStatus::Finished: return "Finished";
0850         default: return "CorruptedRunStatus";
0851     }
0852 }
0853