Back to home page

EIC code displayed by LXR

 
 

    


Warning, file /jana2/src/libraries/JANA/JApplication.cc was not indexed or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).

0001 
0002 // Copyright 2020, Jefferson Science Associates, LLC.
0003 // Subject to the terms in the LICENSE file found in the top-level directory.
0004 
0005 #include <JANA/JApplication.h>
0006 #include <JANA/JEventSource.h>
0007 #include <JANA/Engine/JExecutionEngine.h>
0008 #include <JANA/Services/JComponentManager.h>
0009 #include <JANA/Services/JGlobalRootLock.h>
0010 #include <JANA/Services/JParameterManager.h>
0011 #include <JANA/Services/JPluginLoader.h>
0012 #include <JANA/Topology/JTopologyBuilder.h>
0013 #include <JANA/Services/JWiringService.h>
0014 #include <JANA/Utils/JCpuInfo.h>
0015 #include <JANA/Utils/JApplicationInspector.h>
0016 
0017 #include <chrono>
0018 #include <sstream>
0019 #include <unistd.h>
0020 
0021 JApplication *japp = nullptr;
0022 
0023 
0024 JApplication::JApplication(JParameterManager* params) {
0025 
0026     // We set up some very essential services here, but note 
0027     // that they won't have been initialized yet. We need them now
0028     // so that they can passively receive components, plugin names, parameter values, etc.
0029     // These passive operations don't require any parameters, services, or
0030     // logging output, so they don't need to be initialized until later.
0031     // They will be fully initialized in JApplication::Initialize().
0032     // Only then they will be exposed to the user through the service locator.
0033 
0034     if (params == nullptr) {
0035         m_params = std::make_shared<JParameterManager>();
0036     }
0037     else {
0038         m_params = std::shared_ptr<JParameterManager>(params);
0039     }
0040     m_component_manager = std::make_shared<JComponentManager>();
0041     m_plugin_loader = std::make_shared<JPluginLoader>();
0042     m_service_locator = std::make_unique<JServiceLocator>();
0043     m_execution_engine = std::make_unique<JExecutionEngine>();
0044 
0045     ProvideService(m_params);
0046     ProvideService(m_component_manager);
0047     ProvideService(m_plugin_loader);
0048     ProvideService(m_execution_engine);
0049     ProvideService(std::make_shared<JGlobalRootLock>());
0050     ProvideService(std::make_shared<JTopologyBuilder>());
0051     ProvideService(std::make_shared<jana::services::JWiringService>());
0052 
0053 }
0054 
0055 
0056 JApplication::~JApplication() {}
0057 
0058 
0059 // Loading plugins
0060 
0061 void JApplication::AddPlugin(std::string plugin_name) {
0062     m_plugin_loader->add_plugin(plugin_name);
0063 }
0064 
0065 void JApplication::AddPluginPath(std::string path) {
0066     m_plugin_loader->add_plugin_path(path);
0067 }
0068 
0069 
0070 // Building a ProcessingTopology
0071 
0072 void JApplication::Add(JEventSource* event_source) {
0073     /// Adds the given JEventSource to the JANA context. Ownership is passed to JComponentManager.
0074     m_component_manager->add(event_source);
0075 }
0076 
0077 void JApplication::Add(JEventSourceGenerator *source_generator) {
0078     /// Adds the given JEventSourceGenerator to the JANA context. Ownership is passed to JComponentManager.
0079     m_component_manager->add(source_generator);
0080 }
0081 
0082 void JApplication::Add(JFactoryGenerator *factory_generator) {
0083     /// Adds the given JFactoryGenerator to the JANA context. Ownership is passed to JComponentManager.
0084     m_component_manager->add(factory_generator);
0085 }
0086 
0087 void JApplication::Add(JEventProcessor* processor) {
0088     /// Adds the given JEventProcessor to the JANA context. Ownership is passed to JComponentManager.
0089     m_component_manager->add(processor);
0090 }
0091 
0092 void JApplication::Add(std::string event_source_name) {
0093     /// Adds the event source name (e.g. a file or socket name) to the JANA context. JANA will instantiate
0094     /// the corresponding JEventSource using a user-provided JEventSourceGenerator.
0095     m_component_manager->add(event_source_name);
0096 }
0097 
0098 void JApplication::Add(JEventUnfolder* unfolder) {
0099     /// Adds the given JEventUnfolder to the JANA context. Ownership is passed to JComponentManager.
0100     m_component_manager->add(unfolder);
0101 }
0102 
0103 
0104 void JApplication::Initialize() {
0105 
0106     /// Initialize the application in preparation for data processing.
0107     /// This is called by the Run method so users will usually not need to call this directly.
0108 
0109     // Only run this once
0110     if (m_initialized) return;
0111 
0112     // Now that all parameters, components, plugin names, etc have been set, 
0113     // we can expose our builtin services to the user via GetService()
0114     m_services_available = true;
0115 
0116     // We trigger initialization 
0117     m_service_locator->get<JParameterManager>();
0118     auto component_manager = m_service_locator->get<JComponentManager>();
0119     auto plugin_loader = m_service_locator->get<JPluginLoader>();
0120     auto topology_builder = m_service_locator->get<JTopologyBuilder>();
0121     auto wiring_service = m_service_locator->get<jana::services::JWiringService>();
0122 
0123     // Set logger on JApplication itself
0124     m_logger = m_params->GetLogger("jana");
0125 
0126     if (m_logger.level > JLogger::Level::INFO) {
0127         std::ostringstream oss;
0128         oss << "Initializing..." << std::endl << std::endl;
0129         JVersion::PrintVersionDescription(oss);
0130         LOG_INFO(m_logger) << oss.str() << LOG_END;
0131     }
0132     else {
0133         std::ostringstream oss;
0134         oss << "Initializing..." << std::endl;
0135         JVersion::PrintSplash(oss);
0136         JVersion::PrintVersionDescription(oss);
0137         LOG_INFO(m_logger) << oss.str() << LOG_END;
0138     }
0139 
0140     // Attach all plugins
0141     for (const auto& plugin_name : wiring_service->GetPluginNames()) {
0142         plugin_loader->add_plugin(plugin_name);
0143     }
0144     plugin_loader->attach_plugins(component_manager.get());
0145 
0146     // Resolve and initialize all components
0147     component_manager->configure_components();
0148 
0149     // Once all components have been wired in jcm::configure_components(), there shouldn't be any unused wirings
0150     wiring_service->CheckAllWiringsAreUsed();
0151 
0152     // Set desired nthreads. We parse the 'nthreads' parameter two different ways for backwards compatibility.
0153     m_desired_nthreads = 1;
0154     m_params->SetDefaultParameter("nthreads", m_desired_nthreads, "Desired number of worker threads, or 'Ncores' to use all available cores.");
0155     if (m_params->GetParameterValue<std::string>("nthreads") == "Ncores") {
0156         m_desired_nthreads = JCpuInfo::GetNumCpus();
0157     }
0158 
0159     topology_builder->create_topology();
0160 
0161     m_params->SetDefaultParameter("jana:inspect", m_inspect, "Controls whether to drop immediately into the Inspector upon Run()");
0162 
0163     auto execution_engine = m_service_locator->get<JExecutionEngine>();
0164 
0165     // Make sure that Init() is called on any remaining JServices
0166     m_service_locator->InitAllServices();
0167 
0168     m_initialized = true;
0169     // This needs to be at the end so that m_initialized==false while InitPlugin() is being called
0170 }
0171 
0172 /// @brief Run the application, launching 1 or more threads to do the work.
0173 ///
0174 /// This will initialize the application, attaching plugins etc. and launching
0175 /// threads to process events/time slices. This will then either return immediately
0176 /// (if wait_until_finish=false) or enter a lazy loop checking the progress
0177 /// of the data processing (if wait_until_finish=true).
0178 ///
0179 /// In the `wait_until_finished` mode, this will run continuously until 
0180 /// the JProcessingController indicates it is finished or stopped or 
0181 /// some other condition exists that would cause it to end early. Under
0182 /// normal conditions, the data processing stops when polling JProcessingController::is_finished()
0183 /// indicates the JArrowTopology is in the `JArrowTopology::Status::Finished`
0184 /// state. This will occur when all event sources have been exhausted and
0185 /// all events have been processed such that all JWorkers have stopped.
0186 ///
0187 /// See JProcessingController::run() for more details.
0188 ///
0189 /// @param [in] wait_until_finished If true (default) do not return until the work has completed.
0190 void JApplication::Run(bool wait_until_stopped, bool finish) {
0191 
0192     Initialize();
0193     if(m_quitting) return;
0194 
0195     // At this point, all components should have been provided and all parameter values should have been set.
0196     // Let's report what we found!
0197     //
0198     // You might be wondering why we do this here instead of at the end of Initialize(), which might make more sense.
0199     // The reason is that there might be other things, specifically JBenchmarker, that do request Parameters and Services
0200     // but aren't JComponents (yet). These have to happen after JApplication::Initialize() and before the parameter table
0201     // gets printed. Because of their weird position, they are not able to add additional plugins or components, nor 
0202     // submit Parameter values.
0203     //
0204     // Print summary of all config parameters
0205     m_params->PrintParameters();
0206 
0207     LOG_WARN(m_logger) << "Starting processing with " << m_desired_nthreads << " threads requested..." << LOG_END;
0208     m_execution_engine->ScaleWorkers(m_desired_nthreads);
0209     if (m_inspect) {
0210         m_execution_engine->RequestInspector();
0211     }
0212     else {
0213         m_execution_engine->RunTopology();
0214     }
0215 
0216     if (!wait_until_stopped) {
0217         return;
0218     }
0219 
0220     m_execution_engine->RunSupervisor();
0221     if (finish) {
0222         m_execution_engine->FinishTopology();
0223     }
0224 
0225     // Join all threads
0226     if (!m_skip_join) {
0227         m_execution_engine->ScaleWorkers(0);
0228     }
0229 }
0230 
0231 
0232 void JApplication::Scale(int nthreads) {
0233     LOG_WARN(m_logger) << "Scaling to " << nthreads << " threads" << LOG_END;
0234     m_execution_engine->ScaleWorkers(nthreads);
0235     m_execution_engine->RunTopology();
0236 }
0237 
0238 
0239 void JApplication::Stop(bool wait_until_stopped, bool finish) {
0240     if (!m_initialized) {
0241         // People might call Stop() during Initialize() rather than Run().
0242         // For instance, during JEventProcessor::Init, or via Ctrl-C.
0243         // If this is the case, we finish with initialization and then cancel the Run().
0244         // We don't wait on  because we don't want to Finalize() anything
0245         // we haven't Initialize()d yet.
0246         m_quitting = true;
0247     }
0248     else {
0249         // Once we've called Initialize(), we can Finish() all of our components
0250         // whenever we like
0251         m_execution_engine->DrainTopology();
0252         if (wait_until_stopped) {
0253             m_execution_engine->RunSupervisor();
0254             if (finish) {
0255                 m_execution_engine->FinishTopology();
0256             }
0257         }
0258     }
0259 }
0260 
0261 void JApplication::Quit(bool skip_join) {
0262 
0263     if (m_initialized) {
0264         m_skip_join = skip_join;
0265         m_quitting = true;
0266         if (!skip_join && m_execution_engine != nullptr) {
0267             Stop(true);
0268         }
0269     }
0270 
0271     // People might call Quit() during Initialize() rather than Run().
0272     // For instance, during JEventProcessor::Init, or via Ctrl-C.
0273     // If this is the case, we exit immediately rather than make the user
0274     // wait on a long Initialize() if no data has been generated yet.
0275 
0276     _exit(m_exit_code);
0277 }
0278 
0279 void JApplication::SetExitCode(int exit_code) {
0280     /// Set a value of the exit code in that can be later retrieved
0281     /// using GetExitCode. This is so the executable can return
0282     /// a meaningful error code if processing is stopped prematurely,
0283     /// but the program is able to stop gracefully without a hard
0284     /// exit. See also GetExitCode.
0285 
0286     m_exit_code = exit_code;
0287 }
0288 
0289 int JApplication::GetExitCode() {
0290     /// Returns the currently set exit code. This can be used by
0291     /// JProcessor/JFactory classes to communicate an appropriate
0292     /// exit code that a jana program can return upon exit. The
0293     /// value can be set via the SetExitCode method.
0294 
0295     return m_exit_code;
0296 }
0297 
0298 const JComponentSummary& JApplication::GetComponentSummary() {
0299     /// Returns a data object describing all components currently running
0300     return m_component_manager->get_component_summary();
0301 }
0302 
0303 // Performance/status monitoring
0304 void JApplication::SetTicker(bool ticker_on) {
0305     m_execution_engine->SetTickerEnabled(ticker_on);
0306 }
0307 
0308 bool JApplication::IsTickerEnabled() {
0309     return m_execution_engine->IsTickerEnabled();
0310 }
0311 
0312 void JApplication::SetTimeoutEnabled(bool enabled) {
0313     m_execution_engine->SetTimeoutEnabled(enabled);
0314 }
0315 
0316 bool JApplication::IsTimeoutEnabled() {
0317     return m_execution_engine->IsTimeoutEnabled();
0318 }
0319 
0320 bool JApplication::IsDrainingQueues() {
0321     return (m_execution_engine->GetRunStatus() == JExecutionEngine::RunStatus::Draining);
0322 }
0323 
0324 /// Returns the number of threads currently being used.
0325 uint64_t JApplication::GetNThreads() {
0326     return m_execution_engine->GetPerf().thread_count;
0327 }
0328 
0329 /// Returns the number of events processed since Run() was called.
0330 uint64_t JApplication::GetNEventsProcessed() {
0331     return m_execution_engine->GetPerf().event_count;
0332 }
0333 
0334 /// Returns the total integrated throughput so far in Hz since Run() was called.
0335 float JApplication::GetIntegratedRate() {
0336     return m_execution_engine->GetPerf().throughput_hz;
0337 }
0338 
0339 /// Returns the 'instantaneous' throughput in Hz since the last such call was made.
0340 float JApplication::GetInstantaneousRate()
0341 {
0342     std::lock_guard<std::mutex> lock(m_inst_rate_mutex);
0343     auto latest_event_count = m_execution_engine->GetPerf().event_count;
0344     auto latest_time = JExecutionEngine::clock_t::now();
0345     
0346     auto duration_ms = std::chrono::duration_cast<std::chrono::milliseconds>(latest_time - m_last_measurement_time).count();
0347     auto instantaneous_throughput = (duration_ms == 0) ? 0 : (latest_event_count - m_last_event_count) * 1000.0 / duration_ms;
0348 
0349     m_last_event_count = latest_event_count;
0350     m_last_measurement_time = latest_time;
0351 
0352     return instantaneous_throughput;
0353 }
0354 
0355 void JApplication::PrintStatus() {
0356     auto perf = m_execution_engine->GetPerf();
0357     LOG_INFO(m_logger) << "Topology status:     " << ToString(perf.runstatus) << LOG_END;
0358     LOG_INFO(m_logger) << "Worker thread count: " << perf.thread_count << LOG_END;
0359     LOG_INFO(m_logger) << "Events processed:    " << perf.event_count << LOG_END;
0360     LOG_INFO(m_logger) << "Uptime [s]:          " << perf.uptime_ms*1000 << LOG_END;
0361     LOG_INFO(m_logger) << "Throughput [Hz]:     " << perf.throughput_hz << LOG_END;
0362 }
0363 
0364 
0365