File indexing completed on 2025-01-18 09:14:09
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015 #include <DD4hep/Detector.h>
0016 #include <DD4hep/Memory.h>
0017 #include <DD4hep/Plugins.h>
0018 #include <DD4hep/Printout.h>
0019 #include <DD4hep/Primitives.h>
0020 #include <DD4hep/InstanceCount.h>
0021
0022 #include <DDDigi/DigiKernel.h>
0023 #include <DDDigi/DigiContext.h>
0024 #include <DDDigi/DigiActionSequence.h>
0025 #include <DDDigi/DigiMonitorHandler.h>
0026
0027 #ifdef DD4HEP_USE_TBB
0028 #include <tbb/task_group.h>
0029 #include <tbb/global_control.h>
0030 #else
0031 namespace tbb { struct global_control { enum { max_allowed_parallelism = -1 }; }; }
0032 #endif
0033
0034 #include <TRandom.h>
0035
0036
0037 #include <stdexcept>
0038 #include <algorithm>
0039 #include <memory>
0040 #include <chrono>
0041
0042 using namespace dd4hep::digi;
0043
0044
0045
0046
0047
0048
0049
0050
0051 class DigiKernel::Internals {
0052 public:
0053
0054 ClientOutputLevels clientLevels;
0055
0056 std::atomic_int events_todo;
0057
0058 std::atomic_int events_finished;
0059
0060 std::size_t events_submitted;
0061
0062
0063 std::mutex counter_lock { };
0064
0065
0066 std::mutex initializer_lock { };
0067
0068
0069 std::mutex global_io_lock { };
0070
0071
0072 std::mutex global_output_lock { };
0073
0074 using callbacks_t = std::vector<std::function<void()> >;
0075 using ev_callbacks_t = std::vector<std::function<void(DigiContext&)> >;
0076
0077
0078 callbacks_t configurators { };
0079
0080 callbacks_t initializers { };
0081
0082 callbacks_t terminators { };
0083
0084 ev_callbacks_t start_event { };
0085
0086 ev_callbacks_t end_event { };
0087
0088
0089 DigiActionSequence* input_action { nullptr };
0090
0091 DigiActionSequence* event_action { nullptr };
0092
0093 DigiActionSequence* output_action { nullptr };
0094
0095 DigiMonitorHandler* monitor_handler { nullptr };
0096
0097
0098 TRandom* root_random;
0099
0100 std::shared_ptr<DigiRandomGenerator> random { };
0101
0102 std::unique_ptr<tbb::global_control> tbb_init { };
0103
0104 int outputLevel;
0105
0106 int numEvents;
0107
0108 int maxEventsParallel;
0109
0110 int num_threads;
0111
0112 bool stop = false;
0113
0114 public:
0115
0116 Internals() = default;
0117
0118 ~Internals() = default;
0119
0120 static std::mutex kernel_mutex;
0121 };
0122
0123 std::mutex DigiKernel::Internals::kernel_mutex {};
0124
0125
0126
0127
0128
0129
0130
0131
0132 template<typename ACTION, typename ARG> class DigiKernel::Wrapper {
0133 public:
0134 ACTION* action = 0;
0135 ARG context;
0136 Wrapper(ACTION* a, ARG c) : action(a), context(c) {}
0137 Wrapper(Wrapper&& copy) = default;
0138 Wrapper(const Wrapper& copy) = default;
0139 Wrapper& operator=(Wrapper&& copy) = delete;
0140 Wrapper& operator=(const Wrapper& copy) = delete;
0141 void operator()() const {
0142 action->execute(context);
0143 }
0144 };
0145
0146
0147
0148
0149
0150
0151
0152
0153 class DigiKernel::Processor {
0154 DigiKernel& kernel;
0155 public:
0156 Processor(DigiKernel& k) : kernel(k) {}
0157 Processor(Processor&& l) = default;
0158 Processor(const Processor& l) = default;
0159 void operator()() const {
0160 int todo = 1;
0161 while( todo >= 0 ) {
0162 todo = -1;
0163 {
0164 std::lock_guard<std::mutex> lock(kernel.internals->counter_lock);
0165 if( !kernel.internals->stop && kernel.internals->events_todo > 0)
0166 todo = --kernel.internals->events_todo;
0167 }
0168 if ( todo >= 0 ) {
0169 int ev_num = kernel.internals->numEvents - todo;
0170 std::unique_ptr<DigiContext> context =
0171 std::make_unique<DigiContext>(this->kernel,std::make_unique<DigiEvent>(ev_num));
0172 context->set_random_generator(this->kernel.internals->random);
0173 kernel.executeEvent(std::move(context));
0174 }
0175 }
0176 }
0177 };
0178
0179
0180 DigiKernel::DigiKernel(Detector& description_ref)
0181 : DigiAction(*this, "DigiKernel"), m_detDesc(&description_ref)
0182 {
0183 internals = new Internals();
0184 internals->num_threads = tbb::global_control::max_allowed_parallelism;
0185 declareProperty("maxEventsParallel",internals->maxEventsParallel = 1);
0186 declareProperty("numThreads", internals->num_threads);
0187 declareProperty("numEvents", internals->numEvents = 10);
0188 declareProperty("stop", internals->stop = false);
0189 declareProperty("OutputLevels", internals->clientLevels);
0190 auto* h = new DigiMonitorHandler(*this, "MonitorData");
0191 properties().add("MonitorOutput", h->property("MonitorOutput"));
0192 internals->monitor_handler = h;
0193
0194 internals->input_action = new DigiActionSequence(*this, "InputAction");
0195 internals->event_action = new DigiActionSequence(*this, "EventAction");
0196 internals->output_action = new DigiActionSequence(*this, "OutputAction");
0197 internals->input_action->setExecuteParallel(false);
0198 internals->event_action->setExecuteParallel(false);
0199 internals->output_action->setExecuteParallel(false);
0200 internals->root_random = new TRandom();
0201 internals->random = std::make_shared<DigiRandomGenerator>();
0202 internals->random->engine = [this] { return internals->root_random->Uniform(1.0); };
0203 InstanceCount::increment(this);
0204 }
0205
0206
0207 DigiKernel::~DigiKernel() {
0208 std::lock_guard<std::mutex> lock(Internals::kernel_mutex);
0209 internals->tbb_init.reset();
0210 detail::releasePtr(internals->monitor_handler);
0211 detail::releasePtr(internals->output_action);
0212 detail::releasePtr(internals->event_action);
0213 detail::releasePtr(internals->input_action);
0214 detail::deletePtr(internals->root_random);
0215 internals->random.reset();
0216 detail::deletePtr(internals);
0217 InstanceCount::decrement(this);
0218 }
0219
0220
0221 DigiKernel& DigiKernel::instance(Detector& description) {
0222 static dd4hep::dd4hep_ptr<DigiKernel> s_main_instance(0);
0223 if ( 0 == s_main_instance.get() ) {
0224 std::lock_guard<std::mutex> lock(Internals::kernel_mutex);
0225 if ( 0 == s_main_instance.get() ) {
0226 s_main_instance.adopt(new DigiKernel(description));
0227 }
0228 }
0229 return *(s_main_instance.get());
0230 }
0231
0232
0233 std::mutex& DigiKernel::initializer_lock() const {
0234 return internals->initializer_lock;
0235 }
0236
0237
0238 std::mutex& DigiKernel::global_output_lock() const {
0239 return internals->global_output_lock;
0240 }
0241
0242
0243 std::mutex& DigiKernel::global_io_lock() const {
0244 return internals->global_io_lock;
0245 }
0246
0247
0248 void DigiKernel::printProperties() const {
0249 this->DigiAction::printProperties();
0250 for( const auto& cl : internals->clientLevels ) {
0251 always("OutputLevel[%s]: %d", cl.first.c_str(), cl.second);
0252 }
0253 }
0254
0255
0256 void DigiKernel::setOutputLevel(const std::string object, PrintLevel new_level) {
0257 internals->clientLevels[object] = new_level;
0258 }
0259
0260
0261 dd4hep::PrintLevel DigiKernel::getOutputLevel(const std::string object) const {
0262 ClientOutputLevels::const_iterator i=internals->clientLevels.find(object);
0263 if ( i != internals->clientLevels.end() ) return (PrintLevel)(*i).second;
0264 return dd4hep::PrintLevel(dd4hep::printLevel()-1);
0265 }
0266
0267
0268 std::size_t DigiKernel::events_todo() const {
0269 std::lock_guard<std::mutex> lock(internals->counter_lock);
0270 std::size_t evts = internals->events_todo;
0271 return evts;
0272 }
0273
0274
0275 std::size_t DigiKernel::events_done() const {
0276 std::lock_guard<std::mutex> lock(internals->counter_lock);
0277 std::size_t evts = internals->numEvents - internals->events_todo;
0278 return evts;
0279 }
0280
0281
0282 std::size_t DigiKernel::events_processing() const {
0283 std::lock_guard<std::mutex> lock(internals->counter_lock);
0284 std::size_t evts = internals->events_submitted - internals->events_finished;
0285 return evts;
0286 }
0287
0288
0289 void DigiKernel::loadGeometry(const std::string& compact_file) {
0290 char* arg = (char*) compact_file.c_str();
0291 m_detDesc->apply("DD4hep_XMLLoader", 1, &arg);
0292 }
0293
0294
0295 void DigiKernel::loadXML(const char* fname) {
0296 const char* args[] = { fname, 0 };
0297 m_detDesc->apply("DD4hep_XMLLoader", 1, (char**) args);
0298 }
0299
0300
0301 int DigiKernel::configure() {
0302 for(auto& call : internals->configurators) call();
0303 return 1;
0304 }
0305
0306
0307 int DigiKernel::initialize() {
0308 for(auto& call : internals->initializers) call();
0309 return 1;
0310 }
0311
0312
0313 DigiActionSequence& DigiKernel::inputAction() const {
0314 return *internals->input_action;
0315 }
0316
0317
0318 DigiActionSequence& DigiKernel::eventAction() const {
0319 return *internals->event_action;
0320 }
0321
0322
0323 DigiActionSequence& DigiKernel::outputAction() const {
0324 return *internals->output_action;
0325 }
0326
0327
0328 void DigiKernel::register_configure(const std::function<void()>& callback) const {
0329 std::lock_guard<std::mutex> lock(initializer_lock());
0330 internals->configurators.push_back(callback);
0331 }
0332
0333
0334 void DigiKernel::register_initialize(const std::function<void()>& callback) const {
0335 std::lock_guard<std::mutex> lock(initializer_lock());
0336 internals->initializers.push_back(callback);
0337 }
0338
0339
0340 void DigiKernel::register_terminate(const std::function<void()>& callback) const {
0341 std::lock_guard<std::mutex> lock(initializer_lock());
0342 internals->terminators.push_back(callback);
0343 }
0344
0345
0346 void DigiKernel::register_start_event(const std::function<void(DigiContext&)>& callback) const {
0347 internals->start_event.push_back(callback);
0348 }
0349
0350
0351 void DigiKernel::register_end_event(const std::function<void(DigiContext&)>& callback) const {
0352 internals->end_event.push_back(callback);
0353 }
0354
0355
0356 void DigiKernel::register_monitor(DigiAction* action, TNamed* object) const {
0357 if ( action && object ) {
0358 std::lock_guard<std::mutex> lock(internals->counter_lock);
0359 internals->monitor_handler->adopt(action, object);
0360 return;
0361 }
0362 except("DigiKernel","+++ Invalid monitor request from action %s with object %s: %s",
0363 action ? action->name().c_str() : "[Invalid]",
0364 object ? object->GetName() : "[Invalid]",
0365 object ? object->GetTitle() : "");
0366 }
0367
0368
0369 void DigiKernel::submit (DigiContext& context, ParallelCall*const algorithms[], std::size_t count, void* data, bool parallel) const {
0370 const char* tag = context.event->id();
0371 #ifdef DD4HEP_USE_TBB
0372 bool para = parallel && (internals->tbb_init && internals->num_threads > 0);
0373 if ( para ) {
0374 tbb::task_group que;
0375 info("%s+++ Executing chunk of %3ld execution entries in parallel", tag, count);
0376 try {
0377 for( std::size_t i=0; i<count && !internals->stop; ++i)
0378 que.run( Wrapper<ParallelCall,void*>(algorithms[i], data) );
0379 que.wait();
0380 }
0381 catch(const std::exception& e) {
0382 std::exception_ptr eptr = std::current_exception();
0383 internals->stop = true;
0384 error("%s+++ C++ exception. STOP event loop. [%s]", tag, e.what());
0385 std::rethrow_exception(std::move(eptr));
0386 }
0387 return;
0388 }
0389 #else
0390 (void)parallel;
0391 #endif
0392 info("%s+++ Executing chunk of %3ld execution entries sequentially", tag, count);
0393 for( std::size_t i=0; i<count; ++i)
0394 algorithms[i]->execute(data);
0395 }
0396
0397
0398 void DigiKernel::submit (DigiContext& context, const std::vector<ParallelCall*>& algorithms, void* data, bool parallel) const {
0399 submit(context, &algorithms[0], algorithms.size(), data, parallel);
0400 }
0401
0402 void DigiKernel::wait(DigiContext& context) const {
0403 if ( context.event ) {}
0404 }
0405
0406
0407 void DigiKernel::executeEvent(std::unique_ptr<DigiContext>&& context) {
0408 DigiContext& refContext = *context;
0409 try {
0410 for(auto& call : internals->start_event) call(refContext);
0411 inputAction().execute(refContext);
0412 eventAction().execute(refContext);
0413 outputAction().execute(refContext);
0414 for(auto& call : internals->end_event) call(refContext);
0415 notify(std::move(context));
0416 }
0417 catch(const std::exception& e) {
0418 notify(std::move(context), e);
0419 }
0420 }
0421
0422
0423 void DigiKernel::notify(std::unique_ptr<DigiContext>&& context) {
0424 if ( context ) {
0425 context->event.reset();
0426 }
0427 context.reset();
0428 ++internals->events_finished;
0429 }
0430
0431
0432 void DigiKernel::notify(std::unique_ptr<DigiContext>&& context, const std::exception& e) {
0433 const char* tag = context->event->id();
0434 internals->stop = true;
0435 error("%s+++ Exception during event processing [Shall stop the event loop]", tag);
0436 error("%s -> %s", tag, e.what());
0437 notify(std::move(context));
0438 }
0439
0440
0441 int DigiKernel::run() {
0442 std::chrono::system_clock::time_point start = std::chrono::system_clock::now();
0443 internals->stop = false;
0444 internals->events_finished = 0;
0445 internals->events_submitted = 0;
0446 internals->events_todo = internals->numEvents;
0447 info("+++ Total number of events: %d",internals->numEvents);
0448 #ifdef DD4HEP_USE_TBB
0449 if ( !internals->tbb_init && internals->num_threads > 0 ) {
0450 using ctrl_t = tbb::global_control;
0451 if ( 0 == internals->num_threads ) {
0452 internals->num_threads = ctrl_t::max_allowed_parallelism;
0453 }
0454 info("+++ Number of TBB threads: %d",internals->num_threads);
0455 info("+++ Number of parallel events: %d",internals->maxEventsParallel);
0456 internals->tbb_init = std::make_unique<ctrl_t>(ctrl_t::max_allowed_parallelism,internals->num_threads+1);
0457 if ( internals->maxEventsParallel >= 0 ) {
0458 int todo_evt = internals->events_todo;
0459 int num_proc = std::min(todo_evt,internals->maxEventsParallel);
0460 tbb::task_group main_group;
0461 try {
0462 for(int i=0; i < num_proc; ++i)
0463 main_group.run(Processor(*this));
0464 main_group.wait();
0465 }
0466 catch(const std::exception& e) {
0467 internals->stop = true;
0468 error("run: +++ C++ exception. Event loop stop. [%s]", e.what());
0469 main_group.wait();
0470 }
0471 }
0472 debug("+++ All event processing threads Synchronized --- Done!");
0473 }
0474 else
0475 #endif
0476 {
0477 while ( internals->events_todo > 0 && !internals->stop ) {
0478 Processor proc(*this);
0479 proc();
0480 ++internals->events_submitted;
0481 }
0482 }
0483
0484 std::chrono::duration<double> duration = std::chrono::system_clock::now() - start;
0485 double sec = std::chrono::duration_cast<std::chrono::seconds>(duration).count();
0486 info("+++ %d Events out of %d processed. "
0487 "Total: %7.1f seconds %7.3f seconds/event",
0488 internals->numEvents-int(internals->events_todo), internals->numEvents,
0489 sec, sec/double(std::max(1,internals->numEvents)));
0490 return 1;
0491 }
0492
0493
0494 int DigiKernel::terminate() {
0495 info("++ Saving monitoring quantities.");
0496 internals->monitor_handler->save();
0497 info("++ Terminate Digi and delete associated actions.");
0498 for(auto& call : internals->terminators) call();
0499 m_detDesc->destroyInstance();
0500 m_detDesc = 0;
0501 return 1;
0502 }
0503