Back to home page

EIC code displayed by LXR

 
 

    


Warning, file /DD4hep/DDDigi/src/DigiContainerProcessor.cpp was not indexed or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).

0001 //==========================================================================
0002 //  AIDA Detector description implementation 
0003 //--------------------------------------------------------------------------
0004 // Copyright (C) Organisation europeenne pour la Recherche nucleaire (CERN)
0005 // All rights reserved.
0006 //
0007 // For the licensing terms see $DD4hepINSTALL/LICENSE.
0008 // For the list of contributors see $DD4hepINSTALL/doc/CREDITS.
0009 //
0010 // Author     : M.Frank
0011 //
0012 //==========================================================================
0013 
0014 // Framework include files
0015 #include <DD4hep/InstanceCount.h>
0016 #include <DDDigi/DigiData.h>
0017 #include <DDDigi/DigiKernel.h>
0018 #include <DDDigi/DigiContext.h>
0019 #include <DDDigi/DigiContainerProcessor.h>
0020 #include <DDDigi/DigiSegmentSplitter.h>
0021 
0022 /// C/C++ include files
0023 #include <sstream>
0024 
0025 using namespace dd4hep::digi;
0026 
0027 /// Namespace for the AIDA detector description toolkit
0028 namespace dd4hep {
0029 
0030   /// Namespace for the Digitization part of the AIDA detector description toolkit
0031   namespace digi {
0032 
0033     template <typename T> T* DigiContainerProcessor::work_t::get_input(bool exc)   {
0034       if ( input.data->has_value() )  {
0035         T* object = std::any_cast<T>(input.data);
0036         if ( object )   {
0037           return object;
0038         }
0039       }
0040       if ( exc )   {
0041         dd4hep::except("DigiContainerProcessor",
0042                        "+++ Cannot access input %s. Invalid data handle of type: %s",
0043                        Key::key_name(input.key).c_str(), input_type_name().c_str());
0044       }
0045       return nullptr;
0046     }
0047 
0048     template <typename T> const T* DigiContainerProcessor::work_t::get_input(bool exc)  const  {
0049       if ( input.data->has_value() )  {
0050         const T* object = std::any_cast<T>(input.data);
0051         if ( object )   {
0052           return object;
0053         }
0054       }
0055       if ( exc )   {
0056         dd4hep::except("DigiContainerProcessor",
0057                        "+++ Cannot access input %s. Invalid data handle of type: %s",
0058                        Key::key_name(input.key).c_str(), input_type_name().c_str());
0059       }
0060       return nullptr;
0061     }
0062   }    // End namespace digi
0063 }      // End namespace dd4hep
0064 
0065 template       DepositVector*    DigiContainerProcessor::work_t::get_input(bool exc);
0066 template const DepositVector*    DigiContainerProcessor::work_t::get_input(bool exc)  const;
0067 template       DepositMapping*   DigiContainerProcessor::work_t::get_input(bool exc);
0068 template const DepositMapping*   DigiContainerProcessor::work_t::get_input(bool exc)  const;
0069 template       ParticleMapping*  DigiContainerProcessor::work_t::get_input(bool exc);
0070 template const ParticleMapping*  DigiContainerProcessor::work_t::get_input(bool exc)  const;
0071 template       DetectorHistory*  DigiContainerProcessor::work_t::get_input(bool exc);
0072 template const DetectorHistory*  DigiContainerProcessor::work_t::get_input(bool exc)  const;
0073 template       DetectorResponse* DigiContainerProcessor::work_t::get_input(bool exc);
0074 template const DetectorResponse* DigiContainerProcessor::work_t::get_input(bool exc)  const;
0075 
0076 /// input data type
0077 const std::type_info& DigiContainerProcessor::work_t::input_type()  const   {
0078   return input.data->type();
0079 }
0080 
0081 /// String form of the input data type
0082 std::string DigiContainerProcessor::work_t::input_type_name()  const   {
0083   return typeName(input.data->type());
0084 }
0085 
0086 /// Access to default callback 
0087 const DigiContainerProcessor::predicate_t& DigiContainerProcessor::accept_all()  {
0088   static predicate_t s_pred { std::bind(predicate_t::always_true, std::placeholders::_1), 0, nullptr };
0089   return s_pred;
0090 }
0091 
0092 /// Access to default callback 
0093 const DigiContainerProcessor::predicate_t& DigiContainerProcessor::accept_not_killed()  {
0094   static predicate_t s_pred { std::bind(predicate_t::not_killed, std::placeholders::_1), 0, nullptr };
0095   return s_pred;
0096 }
0097 
0098 /// Standard constructor
0099 DigiContainerProcessor::DigiContainerProcessor(const kernel_t& kernel, const std::string& name)   
0100   : DigiAction(kernel, name)
0101 {
0102   InstanceCount::increment(this);
0103 }
0104 
0105 /// Default destructor
0106 DigiContainerProcessor::~DigiContainerProcessor() {
0107   InstanceCount::decrement(this);
0108 }
0109 
0110 /// Adopt monitoring action
0111 void DigiContainerProcessor::adopt_monitor(DigiDepositMonitor* monitor)    {
0112   if ( monitor ) monitor->addRef();
0113   if ( m_monitor ) m_monitor->release();
0114   m_monitor = monitor;
0115 }
0116 
0117 /// Main functional callback if specific work is known
0118 void DigiContainerProcessor::execute(context_t&         /* context   */,
0119                                      work_t&            /* work      */,
0120                                      const predicate_t& /* predicate */)  const   {
0121 }
0122 
0123 /// Main functional callback adapter
0124 void DigiDepositsProcessor::execute(context_t& context, work_t& work, const predicate_t& predicate)  const   {
0125   if ( auto* vector_data = work.get_input<DepositVector>() )
0126     m_handleVector(context,  *vector_data, work, predicate);
0127   else if ( auto* mapped_data = work.get_input<DepositMapping>() )
0128     m_handleMapping(context, *mapped_data, work, predicate);
0129   else
0130     except("Request to handle unknown data type: %s", work.input_type_name().c_str());
0131 }
0132 
0133 /// Standard constructor
0134 DigiContainerSequence::DigiContainerSequence(const kernel_t& krnl, const std::string& nam)
0135   : DigiContainerProcessor(krnl, nam)
0136 {
0137   declareProperty("parallel", m_parallel = false);
0138   InstanceCount::increment(this);
0139 }
0140 
0141 /// Default destructor
0142 DigiContainerSequence::~DigiContainerSequence() {
0143   InstanceCount::decrement(this);
0144 }
0145 
0146 /// Set the default predicate
0147 void DigiContainerSequence::set_predicate(const predicate_t& predicate)   {
0148   m_worker_predicate = predicate;
0149 }
0150 
0151 /// Adopt new parallel worker
0152 void DigiContainerSequence::adopt_processor(DigiContainerProcessor* action)   {
0153   if ( !action )  {
0154     except("+++ FAILED: attempt to add invalid processor!");
0155     return;
0156   }
0157   m_workers.insert(new worker_t(action, m_workers.size(), *this));
0158 }
0159 
0160 /// Main functional callback if specific work is known
0161 void DigiContainerSequence::execute(context_t& context, work_t& work, const predicate_t& /* predicate */)  const   {
0162   auto group = m_workers.get_group();
0163   m_kernel.submit(context, group, m_workers.size(), &work, m_parallel);
0164 }
0165 
0166 /// Worker adaptor for caller DigiContainerSequence
0167 template <> void DigiParallelWorker<DigiContainerProcessor,
0168                                     DigiContainerSequence::work_t,
0169                                     std::size_t,
0170                                     DigiContainerSequence&>::execute(void* data) const  {
0171   calldata_t* arg  = reinterpret_cast<calldata_t*>(data);
0172   action->execute(arg->environ.context, *arg, predicate.m_worker_predicate);
0173 }
0174 
0175 /// Standard constructor
0176 DigiContainerSequenceAction::DigiContainerSequenceAction(const kernel_t& krnl, const std::string& nam)
0177   : DigiEventAction(krnl, nam)
0178 {
0179   declareProperty("input_mask",     m_input_mask);
0180   declareProperty("input_segment",  m_input_segment);
0181   declareProperty("output_mask",    m_output_mask);
0182   declareProperty("output_segment", m_output_segment);
0183   m_kernel.register_initialize(std::bind(&DigiContainerSequenceAction::initialize,this));
0184   m_kernel.register_terminate(std::bind(&DigiContainerSequenceAction::finalize,this));
0185   InstanceCount::increment(this);
0186 }
0187 
0188 /// Default destructor
0189 DigiContainerSequenceAction::~DigiContainerSequenceAction() {
0190   InstanceCount::decrement(this);
0191 }
0192 
0193 /// Initialization callback
0194 void dd4hep::digi::DigiContainerSequenceAction::initialize()   {
0195   for( auto& ent : m_registered_processors )   {
0196     worker_t* w = new worker_t(ent.second, m_registered_workers.size(), *this);
0197     m_registered_workers.emplace(ent.first, w);
0198     m_workers.insert(w);
0199     ent.second->release();
0200   }
0201 }
0202 
0203 /// Finalization callback
0204 void dd4hep::digi::DigiContainerSequenceAction::finalize()    {
0205   m_workers.clear();
0206 }
0207 
0208 /// Set the default predicate
0209 void DigiContainerSequenceAction::set_predicate(const predicate_t& predicate)   {
0210   m_worker_predicate = predicate;
0211 }
0212 
0213 /// Adopt new parallel worker
0214 void DigiContainerSequenceAction::adopt_processor(DigiContainerProcessor* action,
0215                                                   const std::string& container)
0216 {
0217   Key key(container.c_str(), 0x0);
0218   auto it = m_registered_processors.find(key);
0219   if ( it != m_registered_processors.end() )   {
0220     if ( action != it->second )   {
0221       except("+++ The action %s was already registered to mask:%04X container:%s!",
0222              action->c_name(), m_input_mask, container.c_str());
0223     }
0224     else   {
0225       warning("+++ The action %s was already registered to mask:%04X container:%s!",
0226               action->c_name(), m_input_mask, container.c_str());
0227     }
0228     return;
0229   }
0230   action->addRef();
0231   m_registered_processors.emplace(key, action);
0232   info("+++ Adding processor: %s for container: [%08X] %s",
0233        action->c_name(), key.item(), ('"'+container+'"').c_str());
0234 }
0235 
0236 /// Adopt new parallel worker acting on multiple containers
0237 void DigiContainerSequenceAction::adopt_processor(DigiContainerProcessor* action,
0238                                                   const std::vector<std::string>& containers)
0239 {
0240   info("+++ Adding bulk processor: %s for %ld container", action->c_name(), containers.size());
0241   for( const auto& cont : containers )   {
0242     adopt_processor(action, cont);
0243   }
0244 }
0245 
0246 /// Get hold of the registered processor for a given container
0247 DigiContainerSequenceAction::worker_t*
0248 DigiContainerSequenceAction::need_registered_worker(Key key, bool exc)   const  {
0249   Key item_key;
0250   item_key.set_item(key.item());
0251   auto it = m_registered_workers.find(item_key);
0252   if ( it != m_registered_workers.end() )  {
0253     return it->second;
0254   }
0255   if ( exc )   {
0256     except("No worker registered for input: %08X", item_key.item());
0257   }
0258   return nullptr;
0259 }
0260 
0261 /// Main functional callback if specific work is known
0262 void DigiContainerSequenceAction::execute(context_t& context)  const   {
0263   std::vector<ParallelWorker*> event_workers;
0264   work_items_t items;
0265   auto& event  = *context.event;
0266   auto& input  = event.get_segment(m_input_segment);
0267   auto& output = event.get_segment(m_output_segment);
0268   output_t    out { m_output_mask, output };
0269   env_t       env { context, m_properties, out };
0270   work_item_t itm { nullptr, { }, nullptr };
0271   work_t      arg { env, items, *this };
0272 
0273   arg.input_items.resize(m_workers.size(), itm);
0274   event_workers.reserve(input.size());
0275   for( auto& i : input )   {
0276     Key key(i.first);
0277     if ( key.mask() == m_input_mask )   {
0278       if ( worker_t* w = need_registered_worker(key, false) )  {
0279         event_workers.emplace_back(w);
0280         arg.input_items[w->options] = { &input, std::move(key), &i.second };
0281       }
0282     }
0283   }
0284   if ( !event_workers.empty() )   {
0285     m_kernel.submit(context, &event_workers.at(0), event_workers.size(), &arg, m_parallel);
0286   }
0287 }
0288 
0289 /// Worker adaptor for caller DigiContainerSequenceAction
0290 template <> void DigiParallelWorker<DigiContainerProcessor,
0291                                     DigiContainerSequenceAction::work_t,
0292                                     std::size_t,
0293                                     DigiContainerSequenceAction&>::execute(void* data) const  {
0294   auto* args = reinterpret_cast<calldata_t*>(data);
0295   auto& item = args->input_items[this->options];
0296   DigiContainerProcessor::work_t work { args->environ, item };
0297   action->execute(args->environ.context, work, predicate.m_worker_predicate);
0298 }
0299 
0300 /// Standard constructor
0301 DigiMultiContainerProcessor::DigiMultiContainerProcessor(const kernel_t& krnl, const std::string& nam)
0302   : DigiEventAction(krnl, nam)
0303 {
0304   declareProperty("input_masks",    m_input_masks);
0305   declareProperty("input_segment",  m_input_segment);
0306   declareProperty("output_mask",    m_output_mask);
0307   declareProperty("output_segment", m_output_segment);
0308   m_kernel.register_initialize(std::bind(&DigiMultiContainerProcessor::initialize,this));
0309   InstanceCount::increment(this);
0310 }
0311 
0312 /// Default destructor
0313 DigiMultiContainerProcessor::~DigiMultiContainerProcessor() {
0314   InstanceCount::decrement(this);
0315 }
0316 
0317 /// Initialize action object
0318 void DigiMultiContainerProcessor::initialize()   {
0319   std::map<processor_t*,worker_t*> action_workers;
0320   std::map<processor_t*,std::vector<Key> > keys;
0321   std::map<processor_t*,std::vector<std::string> > action_containers;
0322 
0323   m_worker_keys.clear();
0324   m_worker_map.clear();
0325   m_work_items.clear();
0326   m_workers.clear();
0327   for( auto& proc : m_processors )    {
0328     Key key(proc.first, 0x0);
0329     m_work_items.insert(key);
0330     for ( auto* action : proc.second )   {
0331       keys[action].push_back(key);
0332       action_containers[action].push_back(proc.first);
0333     }
0334   }
0335   /// We need to preserve the order the actions were submitted
0336   /// and remove later added duplicates (for different containers)
0337   for( auto* action : m_actions )   {
0338     auto worker_keys = keys[action];
0339     worker_t* w = nullptr;
0340     auto iw = action_workers.find(action);
0341     if ( iw != action_workers.end() )   {
0342       w = iw->second;
0343     }
0344     else   {
0345       w = new worker_t(action, m_workers.size(), *this);
0346       m_worker_keys.emplace_back(worker_keys);
0347       m_workers.insert(w);
0348       action_workers[action] = w;
0349     }
0350     for( const auto& key : worker_keys )
0351       m_worker_map[key.item()].push_back(w);
0352   }
0353   /// Add some printout about the configuration
0354   for(auto* action : m_actions )   {
0355     auto conts = action_containers[action];
0356     std::stringstream str;
0357     str << "[ ";
0358     for( const auto& cont : conts )
0359       str << cont << " ";
0360     str << "]";
0361     info("+++ Use processor: %-32s for processing: %s", action->c_name(), str.str().c_str());
0362   }
0363   /// All done: release reference count aquired in adopt_processor
0364   for(auto* action : m_actions )
0365     action->release();
0366 }
0367 
0368 /// Set the default predicate
0369 void DigiMultiContainerProcessor::set_predicate(const predicate_t& predicate)   {
0370   m_worker_predicate = predicate;
0371 }
0372 
0373 /// Adopt new parallel worker
0374 void DigiMultiContainerProcessor::adopt_processor(DigiContainerProcessor* action, const std::vector<std::string>& containers)    {
0375   if ( !action )   {
0376     except("+++ Attempt to use invalid processor. Request FAILED.");
0377   }
0378   else if ( containers.empty() )   {
0379     except("+++ Processor %s is defined, but no workload was assigned. Request FAILED.");
0380   }
0381   for(const auto& cont : containers)    {
0382     m_processors[cont].push_back(action);
0383   }
0384   if ( std::find(m_actions.begin(), m_actions.end(), action) == m_actions.end() )   {
0385     m_actions.emplace_back(action);
0386     action->addRef();
0387   }
0388 }
0389 
0390 /// Main functional callback
0391 void DigiMultiContainerProcessor::execute(context_t& context)  const  {
0392   work_items_t items;
0393   auto& mask  = m_input_masks;
0394   auto& event = *context.event;
0395   auto& input = event.get_segment(m_input_segment);
0396 
0397   items.reserve(input.size());
0398   for( auto& i : input )   {
0399     Key key(i.first);
0400     key.set_mask(0);
0401     bool use = mask.empty() || std::find(mask.begin(), mask.end(), key.mask()) != mask.end();
0402     if ( use )   {
0403       use = m_work_items.empty() || m_work_items.find(key) != m_work_items.end();
0404       if ( use )   {
0405         items.push_back({ &input, i.first, &i.second});
0406       }
0407     }
0408   }
0409   if ( !items.empty() )   {
0410     auto& output = event.get_segment(m_output_segment);
0411     output_t out { m_output_mask, output };
0412     env_t    env { context, properties(), out };
0413     work_t   arg { env, items, *this };
0414     m_kernel.submit(context, m_workers.get_group(), m_workers.size(), &arg, m_parallel);
0415   }
0416 }
0417 
0418 /// Worker adaptor for caller DigiMultiContainerProcessor
0419 template <> void DigiParallelWorker<DigiContainerProcessor,
0420                                     DigiMultiContainerProcessor::work_t,
0421                                     std::size_t,
0422                                     DigiMultiContainerProcessor&>::execute(void* data) const  {
0423   calldata_t* arg   = reinterpret_cast<calldata_t*>(data);
0424   const auto& par   = arg->parent;
0425   const auto& keys  = par.worker_keys(this->options);
0426   const auto& masks = par.input_masks();
0427   for( const auto& item : arg->items )  {
0428     Key key(item.key);
0429     key.set_mask(0);
0430     const char* tag = "";
0431     if ( masks.empty() || std::find(masks.begin(), masks.end(), key.mask()) != masks.end() )  {
0432       tag = "mask accepted";
0433       if ( keys.empty() )  {
0434         DigiContainerProcessor::work_t  work { arg->environ, item };
0435         action->execute(work.environ.context, work, predicate.m_worker_predicate);
0436         continue;
0437       }
0438       else if ( std::find(keys.begin(), keys.end(), key) != keys.end() )    {
0439         DigiContainerProcessor::work_t work { arg->environ, item };
0440         action->execute(work.environ.context, work, predicate.m_worker_predicate);
0441         continue;
0442       }
0443       tag = "no keys matching";
0444     }
0445     if ( tag )  {}
0446 #if 0
0447     par.info("%s+++ Ignore container: %016lX  --> %04X %08X %s [%s]",
0448              arg->context.event->id(), key.value(), key.mask(), key.item(), Key::key_name(key).c_str(), tag);
0449 #endif
0450   }
0451 }