File indexing completed on 2025-01-18 10:17:38
0001
0002
0003
0004
0005 #include <JANA/JApplication.h>
0006 #include <JANA/JEventSource.h>
0007 #include <JANA/Engine/JExecutionEngine.h>
0008 #include <JANA/Services/JComponentManager.h>
0009 #include <JANA/Services/JGlobalRootLock.h>
0010 #include <JANA/Services/JParameterManager.h>
0011 #include <JANA/Services/JPluginLoader.h>
0012 #include <JANA/Topology/JTopologyBuilder.h>
0013 #include <JANA/Services/JWiringService.h>
0014 #include <JANA/Utils/JCpuInfo.h>
0015 #include <JANA/Utils/JApplicationInspector.h>
0016
0017 #include <chrono>
0018 #include <sstream>
0019 #include <unistd.h>
0020
0021 JApplication *japp = nullptr;
0022
0023
0024 JApplication::JApplication(JParameterManager* params) {
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034 if (params == nullptr) {
0035 m_params = std::make_shared<JParameterManager>();
0036 }
0037 else {
0038 m_params = std::shared_ptr<JParameterManager>(params);
0039 }
0040 m_component_manager = std::make_shared<JComponentManager>();
0041 m_plugin_loader = std::make_shared<JPluginLoader>();
0042 m_service_locator = std::make_unique<JServiceLocator>();
0043 m_execution_engine = std::make_unique<JExecutionEngine>();
0044
0045 ProvideService(m_params);
0046 ProvideService(m_component_manager);
0047 ProvideService(m_plugin_loader);
0048 ProvideService(m_execution_engine);
0049 ProvideService(std::make_shared<JGlobalRootLock>());
0050 ProvideService(std::make_shared<JTopologyBuilder>());
0051 ProvideService(std::make_shared<jana::services::JWiringService>());
0052
0053 }
0054
0055
0056 JApplication::~JApplication() {}
0057
0058
0059
0060
0061 void JApplication::AddPlugin(std::string plugin_name) {
0062 m_plugin_loader->add_plugin(plugin_name);
0063 }
0064
0065 void JApplication::AddPluginPath(std::string path) {
0066 m_plugin_loader->add_plugin_path(path);
0067 }
0068
0069
0070
0071
0072 void JApplication::Add(JEventSource* event_source) {
0073
0074 m_component_manager->add(event_source);
0075 }
0076
0077 void JApplication::Add(JEventSourceGenerator *source_generator) {
0078
0079 m_component_manager->add(source_generator);
0080 }
0081
0082 void JApplication::Add(JFactoryGenerator *factory_generator) {
0083
0084 m_component_manager->add(factory_generator);
0085 }
0086
0087 void JApplication::Add(JEventProcessor* processor) {
0088
0089 m_component_manager->add(processor);
0090 }
0091
0092 void JApplication::Add(std::string event_source_name) {
0093
0094
0095 m_component_manager->add(event_source_name);
0096 }
0097
0098 void JApplication::Add(JEventUnfolder* unfolder) {
0099
0100 m_component_manager->add(unfolder);
0101 }
0102
0103
0104 void JApplication::Initialize() {
0105
0106
0107
0108
0109
0110 if (m_initialized) return;
0111
0112
0113
0114 m_services_available = true;
0115
0116
0117 m_service_locator->get<JParameterManager>();
0118 auto component_manager = m_service_locator->get<JComponentManager>();
0119 auto plugin_loader = m_service_locator->get<JPluginLoader>();
0120 auto topology_builder = m_service_locator->get<JTopologyBuilder>();
0121
0122
0123 m_logger = m_params->GetLogger("jana");
0124
0125 if (m_logger.level > JLogger::Level::INFO) {
0126 std::ostringstream oss;
0127 oss << "Initializing..." << std::endl << std::endl;
0128 JVersion::PrintVersionDescription(oss);
0129 LOG_WARN(m_logger) << oss.str() << LOG_END;
0130 }
0131 else {
0132 std::ostringstream oss;
0133 oss << "Initializing..." << std::endl;
0134 JVersion::PrintSplash(oss);
0135 JVersion::PrintVersionDescription(oss);
0136 LOG_WARN(m_logger) << oss.str() << LOG_END;
0137 }
0138
0139
0140 plugin_loader->attach_plugins(component_manager.get());
0141
0142
0143 component_manager->configure_components();
0144
0145
0146 m_desired_nthreads = 1;
0147 m_params->SetDefaultParameter("nthreads", m_desired_nthreads, "Desired number of worker threads, or 'Ncores' to use all available cores.");
0148 if (m_params->GetParameterValue<std::string>("nthreads") == "Ncores") {
0149 m_desired_nthreads = JCpuInfo::GetNumCpus();
0150 }
0151
0152 topology_builder->create_topology();
0153 auto execution_engine = m_service_locator->get<JExecutionEngine>();
0154
0155
0156 m_service_locator->wire_everything();
0157
0158 m_initialized = true;
0159
0160 }
0161
0162
0163
0164
0165
0166
0167
0168
0169
0170
0171
0172
0173
0174
0175
0176
0177
0178
0179
0180 void JApplication::Run(bool wait_until_stopped, bool finish) {
0181
0182 Initialize();
0183 if(m_quitting) return;
0184
0185
0186
0187
0188
0189
0190
0191
0192
0193
0194
0195 m_params->PrintParameters();
0196
0197 LOG_WARN(m_logger) << "Starting processing with " << m_desired_nthreads << " threads requested..." << LOG_END;
0198 m_execution_engine->ScaleWorkers(m_desired_nthreads);
0199 m_execution_engine->RunTopology();
0200
0201 if (!wait_until_stopped) {
0202 return;
0203 }
0204
0205 m_execution_engine->RunSupervisor();
0206 if (finish) {
0207 m_execution_engine->FinishTopology();
0208 }
0209
0210
0211 if (!m_skip_join) {
0212 m_execution_engine->ScaleWorkers(0);
0213 }
0214 }
0215
0216
0217 void JApplication::Scale(int nthreads) {
0218 LOG_WARN(m_logger) << "Scaling to " << nthreads << " threads" << LOG_END;
0219 m_execution_engine->ScaleWorkers(nthreads);
0220 m_execution_engine->RunTopology();
0221 }
0222
0223 void JApplication::Inspect() {
0224 ::InspectApplication(this);
0225
0226
0227 m_sigint_count = 0;
0228 m_inspecting = false;
0229 }
0230
0231 void JApplication::Stop(bool wait_until_stopped, bool finish) {
0232 if (!m_initialized) {
0233
0234
0235
0236
0237
0238 m_quitting = true;
0239 }
0240 else {
0241
0242
0243 m_execution_engine->DrainTopology();
0244 if (wait_until_stopped) {
0245 m_execution_engine->RunSupervisor();
0246 if (finish) {
0247 m_execution_engine->FinishTopology();
0248 }
0249 }
0250 }
0251 }
0252
0253 void JApplication::Quit(bool skip_join) {
0254
0255 if (m_initialized) {
0256 m_skip_join = skip_join;
0257 m_quitting = true;
0258 if (!skip_join && m_execution_engine != nullptr) {
0259 Stop(true);
0260 }
0261 }
0262
0263
0264
0265
0266
0267
0268 _exit(m_exit_code);
0269 }
0270
0271 void JApplication::SetExitCode(int exit_code) {
0272
0273
0274
0275
0276
0277
0278 m_exit_code = exit_code;
0279 }
0280
0281 int JApplication::GetExitCode() {
0282
0283
0284
0285
0286
0287 return m_exit_code;
0288 }
0289
0290 const JComponentSummary& JApplication::GetComponentSummary() {
0291
0292 return m_component_manager->get_component_summary();
0293 }
0294
0295
0296 void JApplication::SetTicker(bool ticker_on) {
0297 m_execution_engine->SetTickerEnabled(ticker_on);
0298 }
0299
0300 bool JApplication::IsTickerEnabled() {
0301 return m_execution_engine->IsTickerEnabled();
0302 }
0303
0304 void JApplication::SetTimeoutEnabled(bool enabled) {
0305 m_execution_engine->SetTimeoutEnabled(enabled);
0306 }
0307
0308 bool JApplication::IsTimeoutEnabled() {
0309 return m_execution_engine->IsTimeoutEnabled();
0310 }
0311
0312 bool JApplication::IsDrainingQueues() {
0313 return (m_execution_engine->GetRunStatus() == JExecutionEngine::RunStatus::Draining);
0314 }
0315
0316
0317 uint64_t JApplication::GetNThreads() {
0318 return m_execution_engine->GetPerf().thread_count;
0319 }
0320
0321
0322 uint64_t JApplication::GetNEventsProcessed() {
0323 return m_execution_engine->GetPerf().event_count;
0324 }
0325
0326
0327 float JApplication::GetIntegratedRate() {
0328 return m_execution_engine->GetPerf().throughput_hz;
0329 }
0330
0331
0332 float JApplication::GetInstantaneousRate()
0333 {
0334 std::lock_guard<std::mutex> lock(m_inst_rate_mutex);
0335 auto latest_event_count = m_execution_engine->GetPerf().event_count;
0336 auto latest_time = JExecutionEngine::clock_t::now();
0337
0338 auto duration_ms = std::chrono::duration_cast<std::chrono::milliseconds>(latest_time - m_last_measurement_time).count();
0339 auto instantaneous_throughput = (duration_ms == 0) ? 0 : (latest_event_count - m_last_event_count) * 1000.0 / duration_ms;
0340
0341 m_last_event_count = latest_event_count;
0342 m_last_measurement_time = latest_time;
0343
0344 return instantaneous_throughput;
0345 }
0346
0347 void JApplication::PrintStatus() {
0348 auto perf = m_execution_engine->GetPerf();
0349 LOG_INFO(m_logger) << "Topology status: " << ToString(perf.runstatus) << LOG_END;
0350 LOG_INFO(m_logger) << "Worker thread count: " << perf.thread_count << LOG_END;
0351 LOG_INFO(m_logger) << "Events processed: " << perf.event_count << LOG_END;
0352 LOG_INFO(m_logger) << "Uptime [s]: " << perf.uptime_ms*1000 << LOG_END;
0353 LOG_INFO(m_logger) << "Throughput [Hz]: " << perf.throughput_hz << LOG_END;
0354 }
0355
0356
0357