Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:17:38

0001 
0002 // Copyright 2020, Jefferson Science Associates, LLC.
0003 // Subject to the terms in the LICENSE file found in the top-level directory.
0004 
0005 #include <JANA/JApplication.h>
0006 #include <JANA/JEventSource.h>
0007 #include <JANA/Engine/JExecutionEngine.h>
0008 #include <JANA/Services/JComponentManager.h>
0009 #include <JANA/Services/JGlobalRootLock.h>
0010 #include <JANA/Services/JParameterManager.h>
0011 #include <JANA/Services/JPluginLoader.h>
0012 #include <JANA/Topology/JTopologyBuilder.h>
0013 #include <JANA/Services/JWiringService.h>
0014 #include <JANA/Utils/JCpuInfo.h>
0015 #include <JANA/Utils/JApplicationInspector.h>
0016 
0017 #include <chrono>
0018 #include <sstream>
0019 #include <unistd.h>
0020 
0021 JApplication *japp = nullptr;
0022 
0023 
0024 JApplication::JApplication(JParameterManager* params) {
0025 
0026     // We set up some very essential services here, but note 
0027     // that they won't have been initialized yet. We need them now
0028     // so that they can passively receive components, plugin names, parameter values, etc.
0029     // These passive operations don't require any parameters, services, or
0030     // logging output, so they don't need to be initialized until later.
0031     // They will be fully initialized in JApplication::Initialize().
0032     // Only then they will be exposed to the user through the service locator.
0033 
0034     if (params == nullptr) {
0035         m_params = std::make_shared<JParameterManager>();
0036     }
0037     else {
0038         m_params = std::shared_ptr<JParameterManager>(params);
0039     }
0040     m_component_manager = std::make_shared<JComponentManager>();
0041     m_plugin_loader = std::make_shared<JPluginLoader>();
0042     m_service_locator = std::make_unique<JServiceLocator>();
0043     m_execution_engine = std::make_unique<JExecutionEngine>();
0044 
0045     ProvideService(m_params);
0046     ProvideService(m_component_manager);
0047     ProvideService(m_plugin_loader);
0048     ProvideService(m_execution_engine);
0049     ProvideService(std::make_shared<JGlobalRootLock>());
0050     ProvideService(std::make_shared<JTopologyBuilder>());
0051     ProvideService(std::make_shared<jana::services::JWiringService>());
0052 
0053 }
0054 
0055 
0056 JApplication::~JApplication() {}
0057 
0058 
0059 // Loading plugins
0060 
0061 void JApplication::AddPlugin(std::string plugin_name) {
0062     m_plugin_loader->add_plugin(plugin_name);
0063 }
0064 
0065 void JApplication::AddPluginPath(std::string path) {
0066     m_plugin_loader->add_plugin_path(path);
0067 }
0068 
0069 
0070 // Building a ProcessingTopology
0071 
0072 void JApplication::Add(JEventSource* event_source) {
0073     /// Adds the given JEventSource to the JANA context. Ownership is passed to JComponentManager.
0074     m_component_manager->add(event_source);
0075 }
0076 
0077 void JApplication::Add(JEventSourceGenerator *source_generator) {
0078     /// Adds the given JEventSourceGenerator to the JANA context. Ownership is passed to JComponentManager.
0079     m_component_manager->add(source_generator);
0080 }
0081 
0082 void JApplication::Add(JFactoryGenerator *factory_generator) {
0083     /// Adds the given JFactoryGenerator to the JANA context. Ownership is passed to JComponentManager.
0084     m_component_manager->add(factory_generator);
0085 }
0086 
0087 void JApplication::Add(JEventProcessor* processor) {
0088     /// Adds the given JEventProcessor to the JANA context. Ownership is passed to JComponentManager.
0089     m_component_manager->add(processor);
0090 }
0091 
0092 void JApplication::Add(std::string event_source_name) {
0093     /// Adds the event source name (e.g. a file or socket name) to the JANA context. JANA will instantiate
0094     /// the corresponding JEventSource using a user-provided JEventSourceGenerator.
0095     m_component_manager->add(event_source_name);
0096 }
0097 
0098 void JApplication::Add(JEventUnfolder* unfolder) {
0099     /// Adds the given JEventUnfolder to the JANA context. Ownership is passed to JComponentManager.
0100     m_component_manager->add(unfolder);
0101 }
0102 
0103 
0104 void JApplication::Initialize() {
0105 
0106     /// Initialize the application in preparation for data processing.
0107     /// This is called by the Run method so users will usually not need to call this directly.
0108 
0109     // Only run this once
0110     if (m_initialized) return;
0111 
0112     // Now that all parameters, components, plugin names, etc have been set, 
0113     // we can expose our builtin services to the user via GetService()
0114     m_services_available = true;
0115 
0116     // We trigger initialization 
0117     m_service_locator->get<JParameterManager>();
0118     auto component_manager = m_service_locator->get<JComponentManager>();
0119     auto plugin_loader = m_service_locator->get<JPluginLoader>();
0120     auto topology_builder = m_service_locator->get<JTopologyBuilder>();
0121 
0122     // Set logger on JApplication itself
0123     m_logger = m_params->GetLogger("jana");
0124 
0125     if (m_logger.level > JLogger::Level::INFO) {
0126         std::ostringstream oss;
0127         oss << "Initializing..." << std::endl << std::endl;
0128         JVersion::PrintVersionDescription(oss);
0129         LOG_WARN(m_logger) << oss.str() << LOG_END;
0130     }
0131     else {
0132         std::ostringstream oss;
0133         oss << "Initializing..." << std::endl;
0134         JVersion::PrintSplash(oss);
0135         JVersion::PrintVersionDescription(oss);
0136         LOG_WARN(m_logger) << oss.str() << LOG_END;
0137     }
0138 
0139     // Attach all plugins
0140     plugin_loader->attach_plugins(component_manager.get());
0141 
0142     // Resolve and initialize all components
0143     component_manager->configure_components();
0144 
0145     // Set desired nthreads. We parse the 'nthreads' parameter two different ways for backwards compatibility.
0146     m_desired_nthreads = 1;
0147     m_params->SetDefaultParameter("nthreads", m_desired_nthreads, "Desired number of worker threads, or 'Ncores' to use all available cores.");
0148     if (m_params->GetParameterValue<std::string>("nthreads") == "Ncores") {
0149         m_desired_nthreads = JCpuInfo::GetNumCpus();
0150     }
0151 
0152     topology_builder->create_topology();
0153     auto execution_engine = m_service_locator->get<JExecutionEngine>();
0154 
0155     // Make sure that Init() is called on any remaining JServices
0156     m_service_locator->wire_everything();
0157 
0158     m_initialized = true;
0159     // This needs to be at the end so that m_initialized==false while InitPlugin() is being called
0160 }
0161 
0162 /// @brief Run the application, launching 1 or more threads to do the work.
0163 ///
0164 /// This will initialize the application, attaching plugins etc. and launching
0165 /// threads to process events/time slices. This will then either return immediately
0166 /// (if wait_until_finish=false) or enter a lazy loop checking the progress
0167 /// of the data processing (if wait_until_finish=true).
0168 ///
0169 /// In the `wait_until_finished` mode, this will run continuously until 
0170 /// the JProcessingController indicates it is finished or stopped or 
0171 /// some other condition exists that would cause it to end early. Under
0172 /// normal conditions, the data processing stops when polling JProcessingController::is_finished()
0173 /// indicates the JArrowTopology is in the `JArrowTopology::Status::Finished`
0174 /// state. This will occur when all event sources have been exhausted and
0175 /// all events have been processed such that all JWorkers have stopped.
0176 ///
0177 /// See JProcessingController::run() for more details.
0178 ///
0179 /// @param [in] wait_until_finished If true (default) do not return until the work has completed.
0180 void JApplication::Run(bool wait_until_stopped, bool finish) {
0181 
0182     Initialize();
0183     if(m_quitting) return;
0184 
0185     // At this point, all components should have been provided and all parameter values should have been set.
0186     // Let's report what we found!
0187     //
0188     // You might be wondering why we do this here instead of at the end of Initialize(), which might make more sense.
0189     // The reason is that there might be other things, specifically JBenchmarker, that do request Parameters and Services
0190     // but aren't JComponents (yet). These have to happen after JApplication::Initialize() and before the parameter table
0191     // gets printed. Because of their weird position, they are not able to add additional plugins or components, nor 
0192     // submit Parameter values.
0193     //
0194     // Print summary of all config parameters
0195     m_params->PrintParameters();
0196 
0197     LOG_WARN(m_logger) << "Starting processing with " << m_desired_nthreads << " threads requested..." << LOG_END;
0198     m_execution_engine->ScaleWorkers(m_desired_nthreads);
0199     m_execution_engine->RunTopology();
0200 
0201     if (!wait_until_stopped) {
0202         return;
0203     }
0204 
0205     m_execution_engine->RunSupervisor();
0206     if (finish) {
0207         m_execution_engine->FinishTopology();
0208     }
0209 
0210     // Join all threads
0211     if (!m_skip_join) {
0212         m_execution_engine->ScaleWorkers(0);
0213     }
0214 }
0215 
0216 
0217 void JApplication::Scale(int nthreads) {
0218     LOG_WARN(m_logger) << "Scaling to " << nthreads << " threads" << LOG_END;
0219     m_execution_engine->ScaleWorkers(nthreads);
0220     m_execution_engine->RunTopology();
0221 }
0222 
0223 void JApplication::Inspect() {
0224     ::InspectApplication(this);
0225     // While we are inside InspectApplication, any SIGINTs will lead to shutdown.
0226     // Once we exit InspectApplication, one SIGINT will pause processing and reopen InspectApplication.
0227     m_sigint_count = 0; 
0228     m_inspecting = false;
0229 }
0230 
0231 void JApplication::Stop(bool wait_until_stopped, bool finish) {
0232     if (!m_initialized) {
0233         // People might call Stop() during Initialize() rather than Run().
0234         // For instance, during JEventProcessor::Init, or via Ctrl-C.
0235         // If this is the case, we finish with initialization and then cancel the Run().
0236         // We don't wait on  because we don't want to Finalize() anything
0237         // we haven't Initialize()d yet.
0238         m_quitting = true;
0239     }
0240     else {
0241         // Once we've called Initialize(), we can Finish() all of our components
0242         // whenever we like
0243         m_execution_engine->DrainTopology();
0244         if (wait_until_stopped) {
0245             m_execution_engine->RunSupervisor();
0246             if (finish) {
0247                 m_execution_engine->FinishTopology();
0248             }
0249         }
0250     }
0251 }
0252 
0253 void JApplication::Quit(bool skip_join) {
0254 
0255     if (m_initialized) {
0256         m_skip_join = skip_join;
0257         m_quitting = true;
0258         if (!skip_join && m_execution_engine != nullptr) {
0259             Stop(true);
0260         }
0261     }
0262 
0263     // People might call Quit() during Initialize() rather than Run().
0264     // For instance, during JEventProcessor::Init, or via Ctrl-C.
0265     // If this is the case, we exit immediately rather than make the user
0266     // wait on a long Initialize() if no data has been generated yet.
0267 
0268     _exit(m_exit_code);
0269 }
0270 
0271 void JApplication::SetExitCode(int exit_code) {
0272     /// Set a value of the exit code in that can be later retrieved
0273     /// using GetExitCode. This is so the executable can return
0274     /// a meaningful error code if processing is stopped prematurely,
0275     /// but the program is able to stop gracefully without a hard
0276     /// exit. See also GetExitCode.
0277 
0278     m_exit_code = exit_code;
0279 }
0280 
0281 int JApplication::GetExitCode() {
0282     /// Returns the currently set exit code. This can be used by
0283     /// JProcessor/JFactory classes to communicate an appropriate
0284     /// exit code that a jana program can return upon exit. The
0285     /// value can be set via the SetExitCode method.
0286 
0287     return m_exit_code;
0288 }
0289 
0290 const JComponentSummary& JApplication::GetComponentSummary() {
0291     /// Returns a data object describing all components currently running
0292     return m_component_manager->get_component_summary();
0293 }
0294 
0295 // Performance/status monitoring
0296 void JApplication::SetTicker(bool ticker_on) {
0297     m_execution_engine->SetTickerEnabled(ticker_on);
0298 }
0299 
0300 bool JApplication::IsTickerEnabled() {
0301     return m_execution_engine->IsTickerEnabled();
0302 }
0303 
0304 void JApplication::SetTimeoutEnabled(bool enabled) {
0305     m_execution_engine->SetTimeoutEnabled(enabled);
0306 }
0307 
0308 bool JApplication::IsTimeoutEnabled() {
0309     return m_execution_engine->IsTimeoutEnabled();
0310 }
0311 
0312 bool JApplication::IsDrainingQueues() {
0313     return (m_execution_engine->GetRunStatus() == JExecutionEngine::RunStatus::Draining);
0314 }
0315 
0316 /// Returns the number of threads currently being used.
0317 uint64_t JApplication::GetNThreads() {
0318     return m_execution_engine->GetPerf().thread_count;
0319 }
0320 
0321 /// Returns the number of events processed since Run() was called.
0322 uint64_t JApplication::GetNEventsProcessed() {
0323     return m_execution_engine->GetPerf().event_count;
0324 }
0325 
0326 /// Returns the total integrated throughput so far in Hz since Run() was called.
0327 float JApplication::GetIntegratedRate() {
0328     return m_execution_engine->GetPerf().throughput_hz;
0329 }
0330 
0331 /// Returns the 'instantaneous' throughput in Hz since the last such call was made.
0332 float JApplication::GetInstantaneousRate()
0333 {
0334     std::lock_guard<std::mutex> lock(m_inst_rate_mutex);
0335     auto latest_event_count = m_execution_engine->GetPerf().event_count;
0336     auto latest_time = JExecutionEngine::clock_t::now();
0337     
0338     auto duration_ms = std::chrono::duration_cast<std::chrono::milliseconds>(latest_time - m_last_measurement_time).count();
0339     auto instantaneous_throughput = (duration_ms == 0) ? 0 : (latest_event_count - m_last_event_count) * 1000.0 / duration_ms;
0340 
0341     m_last_event_count = latest_event_count;
0342     m_last_measurement_time = latest_time;
0343 
0344     return instantaneous_throughput;
0345 }
0346 
0347 void JApplication::PrintStatus() {
0348     auto perf = m_execution_engine->GetPerf();
0349     LOG_INFO(m_logger) << "Topology status:     " << ToString(perf.runstatus) << LOG_END;
0350     LOG_INFO(m_logger) << "Worker thread count: " << perf.thread_count << LOG_END;
0351     LOG_INFO(m_logger) << "Events processed:    " << perf.event_count << LOG_END;
0352     LOG_INFO(m_logger) << "Uptime [s]:          " << perf.uptime_ms*1000 << LOG_END;
0353     LOG_INFO(m_logger) << "Throughput [Hz]:     " << perf.throughput_hz << LOG_END;
0354 }
0355 
0356 
0357