Warning, file /DD4hep/DDDigi/src/DigiContainerProcessor.cpp was not indexed
or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015 #include <DD4hep/InstanceCount.h>
0016 #include <DDDigi/DigiData.h>
0017 #include <DDDigi/DigiKernel.h>
0018 #include <DDDigi/DigiContext.h>
0019 #include <DDDigi/DigiContainerProcessor.h>
0020 #include <DDDigi/DigiSegmentSplitter.h>
0021
0022
0023 #include <sstream>
0024
0025 using namespace dd4hep::digi;
0026
0027
0028 namespace dd4hep {
0029
0030
0031 namespace digi {
0032
0033 template <typename T> T* DigiContainerProcessor::work_t::get_input(bool exc) {
0034 if ( input.data->has_value() ) {
0035 T* object = std::any_cast<T>(input.data);
0036 if ( object ) {
0037 return object;
0038 }
0039 }
0040 if ( exc ) {
0041 dd4hep::except("DigiContainerProcessor",
0042 "+++ Cannot access input %s. Invalid data handle of type: %s",
0043 Key::key_name(input.key).c_str(), input_type_name().c_str());
0044 }
0045 return nullptr;
0046 }
0047
0048 template <typename T> const T* DigiContainerProcessor::work_t::get_input(bool exc) const {
0049 if ( input.data->has_value() ) {
0050 const T* object = std::any_cast<T>(input.data);
0051 if ( object ) {
0052 return object;
0053 }
0054 }
0055 if ( exc ) {
0056 dd4hep::except("DigiContainerProcessor",
0057 "+++ Cannot access input %s. Invalid data handle of type: %s",
0058 Key::key_name(input.key).c_str(), input_type_name().c_str());
0059 }
0060 return nullptr;
0061 }
0062 }
0063 }
0064
0065 template DepositVector* DigiContainerProcessor::work_t::get_input(bool exc);
0066 template const DepositVector* DigiContainerProcessor::work_t::get_input(bool exc) const;
0067 template DepositMapping* DigiContainerProcessor::work_t::get_input(bool exc);
0068 template const DepositMapping* DigiContainerProcessor::work_t::get_input(bool exc) const;
0069 template ParticleMapping* DigiContainerProcessor::work_t::get_input(bool exc);
0070 template const ParticleMapping* DigiContainerProcessor::work_t::get_input(bool exc) const;
0071 template DetectorHistory* DigiContainerProcessor::work_t::get_input(bool exc);
0072 template const DetectorHistory* DigiContainerProcessor::work_t::get_input(bool exc) const;
0073 template DetectorResponse* DigiContainerProcessor::work_t::get_input(bool exc);
0074 template const DetectorResponse* DigiContainerProcessor::work_t::get_input(bool exc) const;
0075
0076
0077 const std::type_info& DigiContainerProcessor::work_t::input_type() const {
0078 return input.data->type();
0079 }
0080
0081
0082 std::string DigiContainerProcessor::work_t::input_type_name() const {
0083 return typeName(input.data->type());
0084 }
0085
0086
0087 const DigiContainerProcessor::predicate_t& DigiContainerProcessor::accept_all() {
0088 static predicate_t s_pred { std::bind(predicate_t::always_true, std::placeholders::_1), 0, nullptr };
0089 return s_pred;
0090 }
0091
0092
0093 const DigiContainerProcessor::predicate_t& DigiContainerProcessor::accept_not_killed() {
0094 static predicate_t s_pred { std::bind(predicate_t::not_killed, std::placeholders::_1), 0, nullptr };
0095 return s_pred;
0096 }
0097
0098
0099 DigiContainerProcessor::DigiContainerProcessor(const kernel_t& kernel, const std::string& name)
0100 : DigiAction(kernel, name)
0101 {
0102 InstanceCount::increment(this);
0103 }
0104
0105
0106 DigiContainerProcessor::~DigiContainerProcessor() {
0107 InstanceCount::decrement(this);
0108 }
0109
0110
0111 void DigiContainerProcessor::adopt_monitor(DigiDepositMonitor* monitor) {
0112 if ( monitor ) monitor->addRef();
0113 if ( m_monitor ) m_monitor->release();
0114 m_monitor = monitor;
0115 }
0116
0117
0118 void DigiContainerProcessor::execute(context_t& ,
0119 work_t& ,
0120 const predicate_t& ) const {
0121 }
0122
0123
0124 void DigiDepositsProcessor::execute(context_t& context, work_t& work, const predicate_t& predicate) const {
0125 if ( auto* vector_data = work.get_input<DepositVector>() )
0126 m_handleVector(context, *vector_data, work, predicate);
0127 else if ( auto* mapped_data = work.get_input<DepositMapping>() )
0128 m_handleMapping(context, *mapped_data, work, predicate);
0129 else
0130 except("Request to handle unknown data type: %s", work.input_type_name().c_str());
0131 }
0132
0133
0134 DigiContainerSequence::DigiContainerSequence(const kernel_t& krnl, const std::string& nam)
0135 : DigiContainerProcessor(krnl, nam)
0136 {
0137 declareProperty("parallel", m_parallel = false);
0138 InstanceCount::increment(this);
0139 }
0140
0141
0142 DigiContainerSequence::~DigiContainerSequence() {
0143 InstanceCount::decrement(this);
0144 }
0145
0146
0147 void DigiContainerSequence::set_predicate(const predicate_t& predicate) {
0148 m_worker_predicate = predicate;
0149 }
0150
0151
0152 void DigiContainerSequence::adopt_processor(DigiContainerProcessor* action) {
0153 if ( !action ) {
0154 except("+++ FAILED: attempt to add invalid processor!");
0155 return;
0156 }
0157 m_workers.insert(new worker_t(action, m_workers.size(), *this));
0158 }
0159
0160
0161 void DigiContainerSequence::execute(context_t& context, work_t& work, const predicate_t& ) const {
0162 auto group = m_workers.get_group();
0163 m_kernel.submit(context, group, m_workers.size(), &work, m_parallel);
0164 }
0165
0166
0167 template <> void DigiParallelWorker<DigiContainerProcessor,
0168 DigiContainerSequence::work_t,
0169 std::size_t,
0170 DigiContainerSequence&>::execute(void* data) const {
0171 calldata_t* arg = reinterpret_cast<calldata_t*>(data);
0172 action->execute(arg->environ.context, *arg, predicate.m_worker_predicate);
0173 }
0174
0175
0176 DigiContainerSequenceAction::DigiContainerSequenceAction(const kernel_t& krnl, const std::string& nam)
0177 : DigiEventAction(krnl, nam)
0178 {
0179 declareProperty("input_mask", m_input_mask);
0180 declareProperty("input_segment", m_input_segment);
0181 declareProperty("output_mask", m_output_mask);
0182 declareProperty("output_segment", m_output_segment);
0183 m_kernel.register_initialize(std::bind(&DigiContainerSequenceAction::initialize,this));
0184 m_kernel.register_terminate(std::bind(&DigiContainerSequenceAction::finalize,this));
0185 InstanceCount::increment(this);
0186 }
0187
0188
0189 DigiContainerSequenceAction::~DigiContainerSequenceAction() {
0190 InstanceCount::decrement(this);
0191 }
0192
0193
0194 void dd4hep::digi::DigiContainerSequenceAction::initialize() {
0195 for( auto& ent : m_registered_processors ) {
0196 worker_t* w = new worker_t(ent.second, m_registered_workers.size(), *this);
0197 m_registered_workers.emplace(ent.first, w);
0198 m_workers.insert(w);
0199 ent.second->release();
0200 }
0201 }
0202
0203
0204 void dd4hep::digi::DigiContainerSequenceAction::finalize() {
0205 m_workers.clear();
0206 }
0207
0208
0209 void DigiContainerSequenceAction::set_predicate(const predicate_t& predicate) {
0210 m_worker_predicate = predicate;
0211 }
0212
0213
0214 void DigiContainerSequenceAction::adopt_processor(DigiContainerProcessor* action,
0215 const std::string& container)
0216 {
0217 Key key(container.c_str(), 0x0);
0218 auto it = m_registered_processors.find(key);
0219 if ( it != m_registered_processors.end() ) {
0220 if ( action != it->second ) {
0221 except("+++ The action %s was already registered to mask:%04X container:%s!",
0222 action->c_name(), m_input_mask, container.c_str());
0223 }
0224 else {
0225 warning("+++ The action %s was already registered to mask:%04X container:%s!",
0226 action->c_name(), m_input_mask, container.c_str());
0227 }
0228 return;
0229 }
0230 action->addRef();
0231 m_registered_processors.emplace(key, action);
0232 info("+++ Adding processor: %s for container: [%08X] %s",
0233 action->c_name(), key.item(), ('"'+container+'"').c_str());
0234 }
0235
0236
0237 void DigiContainerSequenceAction::adopt_processor(DigiContainerProcessor* action,
0238 const std::vector<std::string>& containers)
0239 {
0240 info("+++ Adding bulk processor: %s for %ld container", action->c_name(), containers.size());
0241 for( const auto& cont : containers ) {
0242 adopt_processor(action, cont);
0243 }
0244 }
0245
0246
0247 DigiContainerSequenceAction::worker_t*
0248 DigiContainerSequenceAction::need_registered_worker(Key key, bool exc) const {
0249 Key item_key;
0250 item_key.set_item(key.item());
0251 auto it = m_registered_workers.find(item_key);
0252 if ( it != m_registered_workers.end() ) {
0253 return it->second;
0254 }
0255 if ( exc ) {
0256 except("No worker registered for input: %08X", item_key.item());
0257 }
0258 return nullptr;
0259 }
0260
0261
0262 void DigiContainerSequenceAction::execute(context_t& context) const {
0263 std::vector<ParallelWorker*> event_workers;
0264 work_items_t items;
0265 auto& event = *context.event;
0266 auto& input = event.get_segment(m_input_segment);
0267 auto& output = event.get_segment(m_output_segment);
0268 output_t out { m_output_mask, output };
0269 env_t env { context, m_properties, out };
0270 work_item_t itm { nullptr, { }, nullptr };
0271 work_t arg { env, items, *this };
0272
0273 arg.input_items.resize(m_workers.size(), itm);
0274 event_workers.reserve(input.size());
0275 for( auto& i : input ) {
0276 Key key(i.first);
0277 if ( key.mask() == m_input_mask ) {
0278 if ( worker_t* w = need_registered_worker(key, false) ) {
0279 event_workers.emplace_back(w);
0280 arg.input_items[w->options] = { &input, std::move(key), &i.second };
0281 }
0282 }
0283 }
0284 if ( !event_workers.empty() ) {
0285 m_kernel.submit(context, &event_workers.at(0), event_workers.size(), &arg, m_parallel);
0286 }
0287 }
0288
0289
0290 template <> void DigiParallelWorker<DigiContainerProcessor,
0291 DigiContainerSequenceAction::work_t,
0292 std::size_t,
0293 DigiContainerSequenceAction&>::execute(void* data) const {
0294 auto* args = reinterpret_cast<calldata_t*>(data);
0295 auto& item = args->input_items[this->options];
0296 DigiContainerProcessor::work_t work { args->environ, item };
0297 action->execute(args->environ.context, work, predicate.m_worker_predicate);
0298 }
0299
0300
0301 DigiMultiContainerProcessor::DigiMultiContainerProcessor(const kernel_t& krnl, const std::string& nam)
0302 : DigiEventAction(krnl, nam)
0303 {
0304 declareProperty("input_masks", m_input_masks);
0305 declareProperty("input_segment", m_input_segment);
0306 declareProperty("output_mask", m_output_mask);
0307 declareProperty("output_segment", m_output_segment);
0308 m_kernel.register_initialize(std::bind(&DigiMultiContainerProcessor::initialize,this));
0309 InstanceCount::increment(this);
0310 }
0311
0312
0313 DigiMultiContainerProcessor::~DigiMultiContainerProcessor() {
0314 InstanceCount::decrement(this);
0315 }
0316
0317
0318 void DigiMultiContainerProcessor::initialize() {
0319 std::map<processor_t*,worker_t*> action_workers;
0320 std::map<processor_t*,std::vector<Key> > keys;
0321 std::map<processor_t*,std::vector<std::string> > action_containers;
0322
0323 m_worker_keys.clear();
0324 m_worker_map.clear();
0325 m_work_items.clear();
0326 m_workers.clear();
0327 for( auto& proc : m_processors ) {
0328 Key key(proc.first, 0x0);
0329 m_work_items.insert(key);
0330 for ( auto* action : proc.second ) {
0331 keys[action].push_back(key);
0332 action_containers[action].push_back(proc.first);
0333 }
0334 }
0335
0336
0337 for( auto* action : m_actions ) {
0338 auto worker_keys = keys[action];
0339 worker_t* w = nullptr;
0340 auto iw = action_workers.find(action);
0341 if ( iw != action_workers.end() ) {
0342 w = iw->second;
0343 }
0344 else {
0345 w = new worker_t(action, m_workers.size(), *this);
0346 m_worker_keys.emplace_back(worker_keys);
0347 m_workers.insert(w);
0348 action_workers[action] = w;
0349 }
0350 for( const auto& key : worker_keys )
0351 m_worker_map[key.item()].push_back(w);
0352 }
0353
0354 for(auto* action : m_actions ) {
0355 auto conts = action_containers[action];
0356 std::stringstream str;
0357 str << "[ ";
0358 for( const auto& cont : conts )
0359 str << cont << " ";
0360 str << "]";
0361 info("+++ Use processor: %-32s for processing: %s", action->c_name(), str.str().c_str());
0362 }
0363
0364 for(auto* action : m_actions )
0365 action->release();
0366 }
0367
0368
0369 void DigiMultiContainerProcessor::set_predicate(const predicate_t& predicate) {
0370 m_worker_predicate = predicate;
0371 }
0372
0373
0374 void DigiMultiContainerProcessor::adopt_processor(DigiContainerProcessor* action, const std::vector<std::string>& containers) {
0375 if ( !action ) {
0376 except("+++ Attempt to use invalid processor. Request FAILED.");
0377 }
0378 else if ( containers.empty() ) {
0379 except("+++ Processor %s is defined, but no workload was assigned. Request FAILED.");
0380 }
0381 for(const auto& cont : containers) {
0382 m_processors[cont].push_back(action);
0383 }
0384 if ( std::find(m_actions.begin(), m_actions.end(), action) == m_actions.end() ) {
0385 m_actions.emplace_back(action);
0386 action->addRef();
0387 }
0388 }
0389
0390
0391 void DigiMultiContainerProcessor::execute(context_t& context) const {
0392 work_items_t items;
0393 auto& mask = m_input_masks;
0394 auto& event = *context.event;
0395 auto& input = event.get_segment(m_input_segment);
0396
0397 items.reserve(input.size());
0398 for( auto& i : input ) {
0399 Key key(i.first);
0400 key.set_mask(0);
0401 bool use = mask.empty() || std::find(mask.begin(), mask.end(), key.mask()) != mask.end();
0402 if ( use ) {
0403 use = m_work_items.empty() || m_work_items.find(key) != m_work_items.end();
0404 if ( use ) {
0405 items.push_back({ &input, i.first, &i.second});
0406 }
0407 }
0408 }
0409 if ( !items.empty() ) {
0410 auto& output = event.get_segment(m_output_segment);
0411 output_t out { m_output_mask, output };
0412 env_t env { context, properties(), out };
0413 work_t arg { env, items, *this };
0414 m_kernel.submit(context, m_workers.get_group(), m_workers.size(), &arg, m_parallel);
0415 }
0416 }
0417
0418
0419 template <> void DigiParallelWorker<DigiContainerProcessor,
0420 DigiMultiContainerProcessor::work_t,
0421 std::size_t,
0422 DigiMultiContainerProcessor&>::execute(void* data) const {
0423 calldata_t* arg = reinterpret_cast<calldata_t*>(data);
0424 const auto& par = arg->parent;
0425 const auto& keys = par.worker_keys(this->options);
0426 const auto& masks = par.input_masks();
0427 for( const auto& item : arg->items ) {
0428 Key key(item.key);
0429 key.set_mask(0);
0430 const char* tag = "";
0431 if ( masks.empty() || std::find(masks.begin(), masks.end(), key.mask()) != masks.end() ) {
0432 tag = "mask accepted";
0433 if ( keys.empty() ) {
0434 DigiContainerProcessor::work_t work { arg->environ, item };
0435 action->execute(work.environ.context, work, predicate.m_worker_predicate);
0436 continue;
0437 }
0438 else if ( std::find(keys.begin(), keys.end(), key) != keys.end() ) {
0439 DigiContainerProcessor::work_t work { arg->environ, item };
0440 action->execute(work.environ.context, work, predicate.m_worker_predicate);
0441 continue;
0442 }
0443 tag = "no keys matching";
0444 }
0445 if ( tag ) {}
0446 #if 0
0447 par.info("%s+++ Ignore container: %016lX --> %04X %08X %s [%s]",
0448 arg->context.event->id(), key.value(), key.mask(), key.item(), Key::key_name(key).c_str(), tag);
0449 #endif
0450 }
0451 }