Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:17:36

0001 
0002 // Copyright 2020, Jefferson Science Associates, LLC.
0003 // Subject to the terms in the LICENSE file found in the top-level directory.
0004 
0005 
0006 #include "JTopologyBuilder.h"
0007 
0008 #include "JEventSourceArrow.h"
0009 #include "JEventMapArrow.h"
0010 #include "JEventTapArrow.h"
0011 #include "JUnfoldArrow.h"
0012 #include "JFoldArrow.h"
0013 #include <JANA/JEventProcessor.h>
0014 #include <JANA/Utils/JTablePrinter.h>
0015 
0016 
0017 JTopologyBuilder::JTopologyBuilder() {
0018     SetPrefix("jana");
0019 }
0020 
0021 JTopologyBuilder::~JTopologyBuilder() {
0022     for (auto arrow : arrows) {
0023         delete arrow;
0024     }
0025     for (auto queue : queues) {
0026         delete queue;
0027     }
0028     for (auto pool : pools) {
0029         delete pool;
0030     }
0031 }
0032 
0033 std::string JTopologyBuilder::print_topology() {
0034     JTablePrinter t;
0035     t.AddColumn("Arrow", JTablePrinter::Justify::Left, 0);
0036     t.AddColumn("Parallel", JTablePrinter::Justify::Center, 0);
0037     t.AddColumn("Direction", JTablePrinter::Justify::Left, 0);
0038     t.AddColumn("Place", JTablePrinter::Justify::Left, 0);
0039     t.AddColumn("ID", JTablePrinter::Justify::Left, 0);
0040 
0041     // Build index lookup for queues
0042     int i = 0;
0043     std::map<void*, int> lookup;
0044     for (JEventQueue* queue : queues) {
0045         lookup[queue] = i;
0046         i += 1;
0047     }
0048     // Build index lookup for pools
0049     for (JEventPool* pool : pools) {
0050         lookup[pool] = i;
0051         i += 1;
0052     }
0053     // Build table
0054 
0055     bool show_row = true;
0056     
0057     for (JArrow* arrow : arrows) {
0058 
0059         show_row = true;
0060         for (JArrow::Port& port : arrow->m_ports) {
0061             if (show_row) {
0062                 t | arrow->get_name();
0063                 t | arrow->is_parallel();
0064                 show_row = false;
0065             }
0066             else {
0067                 t | "" | "" ;
0068             }
0069             auto place_index = lookup[(port.queue!=nullptr) ? (void*) port.queue : (void*) port.pool];
0070 
0071             t | ((port.is_input) ? "Input ": "Output");
0072             t | ((port.queue != nullptr) ? "Queue ": "Pool");
0073             t | place_index;
0074         }
0075     }
0076     return t.Render();
0077 }
0078 
0079 
0080 /// set_cofigure_fn lets the user provide a lambda that sets up a topology after all components have been loaded.
0081 /// It provides an 'empty' JArrowTopology which has been furnished with a pointer to the JComponentManager, the JEventPool,
0082 /// and the JProcessorMapping (in case you care about NUMA details). However, it does not contain any queues or arrows.
0083 /// You have to furnish those yourself.
0084 void JTopologyBuilder::set_configure_fn(std::function<void(JTopologyBuilder&)> configure_fn) {
0085     m_configure_topology = std::move(configure_fn);
0086 }
0087 
0088 void JTopologyBuilder::create_topology() {
0089     mapping.initialize(static_cast<JProcessorMapping::AffinityStrategy>(m_affinity),
0090                        static_cast<JProcessorMapping::LocalityStrategy>(m_locality));
0091 
0092     if (m_configure_topology) {
0093         m_configure_topology(*this);
0094         LOG_WARN(GetLogger()) << "Found custom topology configurator! Modified arrow topology is: \n" << print_topology() << LOG_END;
0095     }
0096     else {
0097         attach_level(JEventLevel::Run, nullptr, nullptr);
0098         LOG_INFO(GetLogger()) << "Arrow topology is:\n" << print_topology() << LOG_END;
0099     }
0100     for (auto* arrow : arrows) {
0101         arrow->set_logger(GetLogger());
0102     }
0103 }
0104 
0105 
0106 void JTopologyBuilder::acquire_services(JServiceLocator *sl) {
0107 
0108     m_components = sl->get<JComponentManager>();
0109 
0110     // We default event pool size to be equal to nthreads
0111     // We parse the 'nthreads' parameter two different ways for backwards compatibility.
0112     if (m_params->Exists("nthreads")) {
0113         if (m_params->GetParameterValue<std::string>("nthreads") == "Ncores") {
0114             m_max_inflight_events = JCpuInfo::GetNumCpus();
0115         } else {
0116             m_max_inflight_events = m_params->GetParameterValue<int>("nthreads");
0117         }
0118     }
0119 
0120     m_params->SetDefaultParameter("jana:max_inflight_events", m_max_inflight_events,
0121                                     "The number of events which may be in-flight at once. Should be at least `nthreads` to prevent starvation; more gives better load balancing.")
0122             ->SetIsAdvanced(true);
0123 
0124     /*
0125     m_params->SetDefaultParameter("jana:enable_stealing", m_enable_stealing,
0126                                     "Enable work stealing. Improves load balancing when jana:locality != 0; otherwise does nothing.")
0127             ->SetIsAdvanced(true);
0128     */
0129     m_params->SetDefaultParameter("jana:affinity", m_affinity,
0130                                     "Constrain worker thread CPU affinity. 0=Let the OS decide. 1=Avoid extra memory movement at the expense of using hyperthreads. 2=Avoid hyperthreads at the expense of extra memory movement")
0131             ->SetIsAdvanced(true);
0132     m_params->SetDefaultParameter("jana:locality", m_locality,
0133                                     "Constrain memory locality. 0=No constraint. 1=Events stay on the same socket. 2=Events stay on the same NUMA domain. 3=Events stay on same core. 4=Events stay on same cpu/hyperthread.")
0134             ->SetIsAdvanced(true);
0135 };
0136 
0137 
0138 void JTopologyBuilder::connect(JArrow* upstream, size_t up_index, JArrow* downstream, size_t down_index) {
0139 
0140     auto queue = new JEventQueue(m_max_inflight_events, mapping.get_loc_count());
0141     queues.push_back(queue);
0142 
0143     size_t i = 0;
0144     for (JArrow::Port& port : upstream->m_ports) {
0145         if (!port.is_input) {
0146             if (i++ == up_index) {
0147                 // Found the correct output
0148                 port.queue = queue;
0149                 port.pool = nullptr;
0150             }
0151         }
0152     }
0153     i = 0;
0154     for (JArrow::Port& port : downstream->m_ports) {
0155         if (port.is_input) {
0156             if (i++ == down_index) {
0157                 // Found the correct input
0158                 port.queue = queue;
0159                 port.pool = nullptr;
0160             }
0161         }
0162     }
0163 }
0164 
0165 
0166 void JTopologyBuilder::connect_to_first_available(JArrow* upstream, std::vector<JArrow*> downstreams) {
0167     for (JArrow* downstream : downstreams) {
0168         if (downstream != nullptr) {
0169             // Arrows at the same level all connect at index 0 (even the input for the parent JFoldArrow)
0170             connect(upstream, 0, downstream, 0);
0171             return;
0172         }
0173     }
0174 }
0175 
0176 
0177 void JTopologyBuilder::attach_level(JEventLevel current_level, JUnfoldArrow* parent_unfolder, JFoldArrow* parent_folder) {
0178     std::stringstream ss;
0179     ss << current_level;
0180     auto level_str = ss.str();
0181 
0182     // Find all event sources at this level
0183     std::vector<JEventSource*> sources_at_level;
0184     for (JEventSource* source : m_components->get_evt_srces()) {
0185         if (source->GetLevel() == current_level) {
0186             sources_at_level.push_back(source);
0187         }
0188     }
0189     
0190     // Find all unfolders at this level
0191     std::vector<JEventUnfolder*> unfolders_at_level;
0192     for (JEventUnfolder* unfolder : m_components->get_unfolders()) {
0193         if (unfolder->GetLevel() == current_level) {
0194             unfolders_at_level.push_back(unfolder);
0195         }
0196     }
0197     
0198     // Find all processors at this level
0199     std::vector<JEventProcessor*> mappable_procs_at_level;
0200     std::vector<JEventProcessor*> tappable_procs_at_level;
0201 
0202     for (JEventProcessor* proc : m_components->get_evt_procs()) {
0203         if (proc->GetLevel() == current_level) {
0204             mappable_procs_at_level.push_back(proc);
0205             if (proc->GetCallbackStyle() != JEventProcessor::CallbackStyle::LegacyMode) {
0206                 tappable_procs_at_level.push_back(proc);
0207             }
0208         }
0209     }
0210 
0211 
0212     bool is_top_level = (parent_unfolder == nullptr);
0213     if (is_top_level && sources_at_level.size() == 0) {
0214         // Skip level entirely when no source is present.
0215         LOG_TRACE(GetLogger()) << "JTopologyBuilder: No sources found at level " << current_level << ", skipping" << LOG_END;
0216         JEventLevel next = next_level(current_level);
0217         if (next == JEventLevel::None) {
0218             LOG_WARN(GetLogger()) << "No sources found: Processing topology will be empty." << LOG_END;
0219             return;
0220         }
0221         return attach_level(next, nullptr, nullptr);
0222     }
0223 
0224     // Enforce constraints on what our builder will accept (at least for now)
0225     if (!is_top_level && !sources_at_level.empty()) {
0226         throw JException("Topology forbids event sources at lower event levels in the topology");
0227     }
0228     if ((parent_unfolder == nullptr && parent_folder != nullptr) || (parent_unfolder != nullptr && parent_folder == nullptr)) {
0229         throw JException("Topology requires matching unfolder/folder arrow pairs");
0230     }
0231     if (unfolders_at_level.size() > 1) {
0232         throw JException("Multiple JEventUnfolders provided for level %s", level_str.c_str());
0233     }
0234     // Another constraint is that the highest level of the topology has an event sources, but this is automatically handled by
0235     // the level-skipping logic above
0236 
0237 
0238     // Fill out arrow grid from components at this event level
0239     // --------------------------
0240     // 0. Pool
0241     // --------------------------
0242     JEventPool* pool_at_level = new JEventPool(m_components, m_max_inflight_events, m_location_count, current_level);
0243     pools.push_back(pool_at_level); // Hand over ownership of the pool to the topology
0244     LOG_INFO(GetLogger()) << "Created event pool with level=" << current_level << " and size=" << m_max_inflight_events;
0245 
0246     // --------------------------
0247     // 1. Source
0248     // --------------------------
0249     JEventSourceArrow* src_arrow = nullptr;
0250     bool need_source = !sources_at_level.empty();
0251     if (need_source) {
0252         src_arrow = new JEventSourceArrow(level_str+"Source", sources_at_level);
0253         src_arrow->attach(pool_at_level, JEventSourceArrow::EVENT_IN);
0254         src_arrow->attach(pool_at_level, JEventSourceArrow::EVENT_OUT);
0255         arrows.push_back(src_arrow);
0256     }
0257 
0258     // --------------------------
0259     // 2. Map1
0260     // --------------------------
0261     bool have_parallel_sources = false;
0262     for (JEventSource* source: sources_at_level) {
0263         have_parallel_sources |= source->IsPreprocessEnabled();
0264     }
0265     bool have_unfolder = !unfolders_at_level.empty();
0266     JEventMapArrow* map1_arrow = nullptr;
0267     bool need_map1 = (have_parallel_sources || have_unfolder);
0268 
0269     if (need_map1) {
0270         map1_arrow = new JEventMapArrow(level_str+"Map1");
0271         for (JEventSource* source: sources_at_level) {
0272             if (source->IsPreprocessEnabled()) {
0273                 map1_arrow->add_source(source);
0274             }
0275         }
0276         for (JEventUnfolder* unf: unfolders_at_level) {
0277             map1_arrow->add_unfolder(unf);
0278         }
0279         map1_arrow->attach(pool_at_level, JEventMapArrow::EVENT_IN);
0280         map1_arrow->attach(pool_at_level, JEventMapArrow::EVENT_OUT);
0281         arrows.push_back(map1_arrow);
0282     }
0283 
0284     // --------------------------
0285     // 3. Unfold
0286     // --------------------------
0287     JUnfoldArrow* unfold_arrow = nullptr;
0288     bool need_unfold = have_unfolder;
0289     if (need_unfold) {
0290         unfold_arrow = new JUnfoldArrow(level_str+"Unfold", unfolders_at_level[0]);
0291         arrows.push_back(unfold_arrow);
0292     }
0293 
0294     // --------------------------
0295     // 4. Fold
0296     // --------------------------
0297     JFoldArrow* fold_arrow = nullptr;
0298     bool need_fold = have_unfolder;
0299     if(need_fold) {
0300         fold_arrow = new JFoldArrow(level_str+"Fold", current_level, unfolders_at_level[0]->GetChildLevel());
0301         arrows.push_back(fold_arrow);
0302         fold_arrow->attach(pool_at_level, JFoldArrow::PARENT_OUT); 
0303     }
0304 
0305     // --------------------------
0306     // 5. Map2
0307     // --------------------------
0308     JEventMapArrow* map2_arrow = nullptr;
0309     bool need_map2 = !mappable_procs_at_level.empty();
0310     if (need_map2) {
0311         map2_arrow = new JEventMapArrow(level_str+"Map2");
0312         for (JEventProcessor* proc : mappable_procs_at_level) {
0313             map2_arrow->add_processor(proc);
0314             map2_arrow->attach(pool_at_level, JEventMapArrow::EVENT_IN);
0315             map2_arrow->attach(pool_at_level, JEventMapArrow::EVENT_OUT);
0316         }
0317         arrows.push_back(map2_arrow);
0318     }
0319 
0320     // --------------------------
0321     // 6. Tap
0322     // --------------------------
0323     JEventTapArrow* tap_arrow = nullptr;
0324     bool need_tap = !tappable_procs_at_level.empty();
0325     if (need_tap) {
0326         tap_arrow = new JEventTapArrow(level_str+"Tap");
0327         for (JEventProcessor* proc : tappable_procs_at_level) {
0328             tap_arrow->add_processor(proc);
0329             tap_arrow->attach(pool_at_level, JEventTapArrow::EVENT_IN);
0330             tap_arrow->attach(pool_at_level, JEventTapArrow::EVENT_OUT);
0331         }
0332         arrows.push_back(tap_arrow);
0333     }
0334 
0335 
0336     // Now that we've set up our component grid, we can do wiring!
0337     // --------------------------
0338     // 1. Source
0339     // --------------------------
0340     if (parent_unfolder != nullptr) {
0341         parent_unfolder->attach(pool_at_level, JUnfoldArrow::CHILD_IN);
0342         connect_to_first_available(parent_unfolder, {map1_arrow, unfold_arrow, map2_arrow, tap_arrow, parent_folder});
0343     }
0344     if (src_arrow != nullptr) {
0345         connect_to_first_available(src_arrow, {map1_arrow, unfold_arrow, map2_arrow, tap_arrow, parent_folder});
0346     }
0347     if (map1_arrow != nullptr) {
0348         connect_to_first_available(map1_arrow, {unfold_arrow, map2_arrow, tap_arrow, parent_folder});
0349     }
0350     if (fold_arrow != nullptr) {
0351         connect_to_first_available(fold_arrow, {map2_arrow, tap_arrow, parent_folder});
0352     }
0353     if (map2_arrow != nullptr) {
0354         connect_to_first_available(map2_arrow, {tap_arrow, parent_folder});
0355     }
0356     if (tap_arrow != nullptr) {
0357         connect_to_first_available(tap_arrow, {parent_folder});
0358     }
0359     if (parent_folder != nullptr) {
0360         parent_folder->attach(pool_at_level, JFoldArrow::CHILD_OUT);
0361     }
0362 
0363     // Finally, we recur over lower levels!
0364     if (need_unfold) {
0365         auto next_level = unfolders_at_level[0]->GetChildLevel();
0366         attach_level(next_level, unfold_arrow, fold_arrow);
0367     }
0368     else {
0369         // This is the lowest level
0370         // TODO: Improve logic for determining event counts for multilevel topologies
0371         if (tap_arrow != nullptr) {
0372             tap_arrow->set_is_sink(true);
0373         }
0374         else if (map2_arrow != nullptr) {
0375             map2_arrow->set_is_sink(true);
0376         }
0377         else if (map1_arrow != nullptr) {
0378             map1_arrow->set_is_sink(true);
0379         }
0380         else if (src_arrow != nullptr) {
0381             src_arrow->set_is_sink(true);
0382         }
0383     }
0384 }
0385 
0386 
0387