Warning, file /jana2/src/libraries/JANA/JApplication.cc was not indexed
or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).
0001
0002
0003
0004
0005 #include <JANA/JApplication.h>
0006 #include <JANA/JEventSource.h>
0007 #include <JANA/Engine/JExecutionEngine.h>
0008 #include <JANA/Services/JComponentManager.h>
0009 #include <JANA/Services/JGlobalRootLock.h>
0010 #include <JANA/Services/JParameterManager.h>
0011 #include <JANA/Services/JPluginLoader.h>
0012 #include <JANA/Topology/JTopologyBuilder.h>
0013 #include <JANA/Services/JWiringService.h>
0014 #include <JANA/Utils/JCpuInfo.h>
0015 #include <JANA/Utils/JApplicationInspector.h>
0016
0017 #include <chrono>
0018 #include <sstream>
0019 #include <unistd.h>
0020
0021 JApplication *japp = nullptr;
0022
0023
0024 JApplication::JApplication(JParameterManager* params) {
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034 if (params == nullptr) {
0035 m_params = std::make_shared<JParameterManager>();
0036 }
0037 else {
0038 m_params = std::shared_ptr<JParameterManager>(params);
0039 }
0040 m_component_manager = std::make_shared<JComponentManager>();
0041 m_plugin_loader = std::make_shared<JPluginLoader>();
0042 m_service_locator = std::make_unique<JServiceLocator>();
0043 m_execution_engine = std::make_unique<JExecutionEngine>();
0044
0045 ProvideService(m_params);
0046 ProvideService(m_component_manager);
0047 ProvideService(m_plugin_loader);
0048 ProvideService(m_execution_engine);
0049 ProvideService(std::make_shared<JGlobalRootLock>());
0050 ProvideService(std::make_shared<JTopologyBuilder>());
0051 ProvideService(std::make_shared<jana::services::JWiringService>());
0052
0053 }
0054
0055
0056 JApplication::~JApplication() {}
0057
0058
0059
0060
0061 void JApplication::AddPlugin(std::string plugin_name) {
0062 m_plugin_loader->add_plugin(plugin_name);
0063 }
0064
0065 void JApplication::AddPluginPath(std::string path) {
0066 m_plugin_loader->add_plugin_path(path);
0067 }
0068
0069
0070
0071
0072 void JApplication::Add(JEventSource* event_source) {
0073
0074 m_component_manager->add(event_source);
0075 }
0076
0077 void JApplication::Add(JEventSourceGenerator *source_generator) {
0078
0079 m_component_manager->add(source_generator);
0080 }
0081
0082 void JApplication::Add(JFactoryGenerator *factory_generator) {
0083
0084 m_component_manager->add(factory_generator);
0085 }
0086
0087 void JApplication::Add(JEventProcessor* processor) {
0088
0089 m_component_manager->add(processor);
0090 }
0091
0092 void JApplication::Add(std::string event_source_name) {
0093
0094
0095 m_component_manager->add(event_source_name);
0096 }
0097
0098 void JApplication::Add(JEventUnfolder* unfolder) {
0099
0100 m_component_manager->add(unfolder);
0101 }
0102
0103
0104 void JApplication::Initialize() {
0105
0106
0107
0108
0109
0110 if (m_initialized) return;
0111
0112
0113
0114 m_services_available = true;
0115
0116
0117 m_service_locator->get<JParameterManager>();
0118 auto component_manager = m_service_locator->get<JComponentManager>();
0119 auto plugin_loader = m_service_locator->get<JPluginLoader>();
0120 auto topology_builder = m_service_locator->get<JTopologyBuilder>();
0121 auto wiring_service = m_service_locator->get<jana::services::JWiringService>();
0122
0123
0124 m_logger = m_params->GetLogger("jana");
0125
0126 if (m_logger.level > JLogger::Level::INFO) {
0127 std::ostringstream oss;
0128 oss << "Initializing..." << std::endl << std::endl;
0129 JVersion::PrintVersionDescription(oss);
0130 LOG_INFO(m_logger) << oss.str() << LOG_END;
0131 }
0132 else {
0133 std::ostringstream oss;
0134 oss << "Initializing..." << std::endl;
0135 JVersion::PrintSplash(oss);
0136 JVersion::PrintVersionDescription(oss);
0137 LOG_INFO(m_logger) << oss.str() << LOG_END;
0138 }
0139
0140
0141 for (const auto& plugin_name : wiring_service->GetPluginNames()) {
0142 plugin_loader->add_plugin(plugin_name);
0143 }
0144 plugin_loader->attach_plugins(component_manager.get());
0145
0146
0147 component_manager->configure_components();
0148
0149
0150 wiring_service->CheckAllWiringsAreUsed();
0151
0152
0153 m_desired_nthreads = 1;
0154 m_params->SetDefaultParameter("nthreads", m_desired_nthreads, "Desired number of worker threads, or 'Ncores' to use all available cores.");
0155 if (m_params->GetParameterValue<std::string>("nthreads") == "Ncores") {
0156 m_desired_nthreads = JCpuInfo::GetNumCpus();
0157 }
0158
0159 topology_builder->create_topology();
0160
0161 m_params->SetDefaultParameter("jana:inspect", m_inspect, "Controls whether to drop immediately into the Inspector upon Run()");
0162
0163 auto execution_engine = m_service_locator->get<JExecutionEngine>();
0164
0165
0166 m_service_locator->InitAllServices();
0167
0168 m_initialized = true;
0169
0170 }
0171
0172
0173
0174
0175
0176
0177
0178
0179
0180
0181
0182
0183
0184
0185
0186
0187
0188
0189
0190 void JApplication::Run(bool wait_until_stopped, bool finish) {
0191
0192 Initialize();
0193 if(m_quitting) return;
0194
0195
0196
0197
0198
0199
0200
0201
0202
0203
0204
0205 m_params->PrintParameters();
0206
0207 LOG_WARN(m_logger) << "Starting processing with " << m_desired_nthreads << " threads requested..." << LOG_END;
0208 m_execution_engine->ScaleWorkers(m_desired_nthreads);
0209 if (m_inspect) {
0210 m_execution_engine->RequestInspector();
0211 }
0212 else {
0213 m_execution_engine->RunTopology();
0214 }
0215
0216 if (!wait_until_stopped) {
0217 return;
0218 }
0219
0220 m_execution_engine->RunSupervisor();
0221 if (finish) {
0222 m_execution_engine->FinishTopology();
0223 }
0224
0225
0226 if (!m_skip_join) {
0227 m_execution_engine->ScaleWorkers(0);
0228 }
0229 }
0230
0231
0232 void JApplication::Scale(int nthreads) {
0233 LOG_WARN(m_logger) << "Scaling to " << nthreads << " threads" << LOG_END;
0234 m_execution_engine->ScaleWorkers(nthreads);
0235 m_execution_engine->RunTopology();
0236 }
0237
0238
0239 void JApplication::Stop(bool wait_until_stopped, bool finish) {
0240 if (!m_initialized) {
0241
0242
0243
0244
0245
0246 m_quitting = true;
0247 }
0248 else {
0249
0250
0251 m_execution_engine->DrainTopology();
0252 if (wait_until_stopped) {
0253 m_execution_engine->RunSupervisor();
0254 if (finish) {
0255 m_execution_engine->FinishTopology();
0256 }
0257 }
0258 }
0259 }
0260
0261 void JApplication::Quit(bool skip_join) {
0262
0263 if (m_initialized) {
0264 m_skip_join = skip_join;
0265 m_quitting = true;
0266 if (!skip_join && m_execution_engine != nullptr) {
0267 Stop(true);
0268 }
0269 }
0270
0271
0272
0273
0274
0275
0276 _exit(m_exit_code);
0277 }
0278
0279 void JApplication::SetExitCode(int exit_code) {
0280
0281
0282
0283
0284
0285
0286 m_exit_code = exit_code;
0287 }
0288
0289 int JApplication::GetExitCode() {
0290
0291
0292
0293
0294
0295 return m_exit_code;
0296 }
0297
0298 const JComponentSummary& JApplication::GetComponentSummary() {
0299
0300 return m_component_manager->get_component_summary();
0301 }
0302
0303
0304 void JApplication::SetTicker(bool ticker_on) {
0305 m_execution_engine->SetTickerEnabled(ticker_on);
0306 }
0307
0308 bool JApplication::IsTickerEnabled() {
0309 return m_execution_engine->IsTickerEnabled();
0310 }
0311
0312 void JApplication::SetTimeoutEnabled(bool enabled) {
0313 m_execution_engine->SetTimeoutEnabled(enabled);
0314 }
0315
0316 bool JApplication::IsTimeoutEnabled() {
0317 return m_execution_engine->IsTimeoutEnabled();
0318 }
0319
0320 bool JApplication::IsDrainingQueues() {
0321 return (m_execution_engine->GetRunStatus() == JExecutionEngine::RunStatus::Draining);
0322 }
0323
0324
0325 uint64_t JApplication::GetNThreads() {
0326 return m_execution_engine->GetPerf().thread_count;
0327 }
0328
0329
0330 uint64_t JApplication::GetNEventsProcessed() {
0331 return m_execution_engine->GetPerf().event_count;
0332 }
0333
0334
0335 float JApplication::GetIntegratedRate() {
0336 return m_execution_engine->GetPerf().throughput_hz;
0337 }
0338
0339
0340 float JApplication::GetInstantaneousRate()
0341 {
0342 std::lock_guard<std::mutex> lock(m_inst_rate_mutex);
0343 auto latest_event_count = m_execution_engine->GetPerf().event_count;
0344 auto latest_time = JExecutionEngine::clock_t::now();
0345
0346 auto duration_ms = std::chrono::duration_cast<std::chrono::milliseconds>(latest_time - m_last_measurement_time).count();
0347 auto instantaneous_throughput = (duration_ms == 0) ? 0 : (latest_event_count - m_last_event_count) * 1000.0 / duration_ms;
0348
0349 m_last_event_count = latest_event_count;
0350 m_last_measurement_time = latest_time;
0351
0352 return instantaneous_throughput;
0353 }
0354
0355 void JApplication::PrintStatus() {
0356 auto perf = m_execution_engine->GetPerf();
0357 LOG_INFO(m_logger) << "Topology status: " << ToString(perf.runstatus) << LOG_END;
0358 LOG_INFO(m_logger) << "Worker thread count: " << perf.thread_count << LOG_END;
0359 LOG_INFO(m_logger) << "Events processed: " << perf.event_count << LOG_END;
0360 LOG_INFO(m_logger) << "Uptime [s]: " << perf.uptime_ms*1000 << LOG_END;
0361 LOG_INFO(m_logger) << "Throughput [Hz]: " << perf.throughput_hz << LOG_END;
0362 }
0363
0364
0365