Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-06 08:34:04

0001 
0002 // Copyright 2020, Jefferson Science Associates, LLC.
0003 // Subject to the terms in the LICENSE file found in the top-level directory.
0004 
0005 
0006 #include "JTopologyBuilder.h"
0007 
0008 #include "JEventSourceArrow.h"
0009 #include "JEventMapArrow.h"
0010 #include "JEventTapArrow.h"
0011 #include "JUnfoldArrow.h"
0012 #include "JFoldArrow.h"
0013 #include <JANA/JEventProcessor.h>
0014 #include <JANA/Utils/JTablePrinter.h>
0015 #include <string>
0016 #include <vector>
0017 
0018 
0019 JTopologyBuilder::JTopologyBuilder() {
0020     SetPrefix("jana");
0021 }
0022 
0023 JTopologyBuilder::~JTopologyBuilder() {
0024     for (auto arrow : arrows) {
0025         delete arrow;
0026     }
0027     for (auto queue : queues) {
0028         delete queue;
0029     }
0030     for (auto pool : pools) {
0031         delete pool;
0032     }
0033 }
0034 
0035 std::string JTopologyBuilder::print_topology() {
0036     JTablePrinter t;
0037     t.AddColumn("Arrow", JTablePrinter::Justify::Left, 0);
0038     t.AddColumn("Parallel", JTablePrinter::Justify::Center, 0);
0039     t.AddColumn("Direction", JTablePrinter::Justify::Left, 0);
0040     t.AddColumn("Place", JTablePrinter::Justify::Left, 0);
0041     t.AddColumn("ID", JTablePrinter::Justify::Left, 0);
0042 
0043     // Build index lookup for queues
0044     int i = 0;
0045     std::map<void*, int> lookup;
0046     for (JEventQueue* queue : queues) {
0047         lookup[queue] = i;
0048         i += 1;
0049     }
0050     // Build index lookup for pools
0051     for (JEventPool* pool : pools) {
0052         lookup[pool] = i;
0053         i += 1;
0054     }
0055     // Build table
0056 
0057     bool show_row = true;
0058     
0059     for (JArrow* arrow : arrows) {
0060 
0061         show_row = true;
0062         for (JArrow::Port& port : arrow->m_ports) {
0063             if (show_row) {
0064                 t | arrow->get_name();
0065                 t | arrow->is_parallel();
0066                 show_row = false;
0067             }
0068             else {
0069                 t | "" | "" ;
0070             }
0071             auto place_index = lookup[(port.queue!=nullptr) ? (void*) port.queue : (void*) port.pool];
0072 
0073             t | ((port.is_input) ? "Input ": "Output");
0074             t | ((port.queue != nullptr) ? "Queue ": "Pool");
0075             t | place_index;
0076         }
0077     }
0078     return t.Render();
0079 }
0080 
0081 
0082 /// set_cofigure_fn lets the user provide a lambda that sets up a topology after all components have been loaded.
0083 /// It provides an 'empty' JArrowTopology which has been furnished with a pointer to the JComponentManager, the JEventPool,
0084 /// and the JProcessorMapping (in case you care about NUMA details). However, it does not contain any queues or arrows.
0085 /// You have to furnish those yourself.
0086 void JTopologyBuilder::set_configure_fn(std::function<void(JTopologyBuilder&)> configure_fn) {
0087     m_configure_topology = std::move(configure_fn);
0088 }
0089 
0090 void JTopologyBuilder::create_topology() {
0091     mapping.initialize(static_cast<JProcessorMapping::AffinityStrategy>(m_affinity),
0092                        static_cast<JProcessorMapping::LocalityStrategy>(m_locality));
0093 
0094     if (m_configure_topology) {
0095         m_configure_topology(*this);
0096         LOG_WARN(GetLogger()) << "Found custom topology configurator! Modified arrow topology is: \n" << print_topology() << LOG_END;
0097     }
0098     else {
0099         attach_level(JEventLevel::Run, nullptr, nullptr);
0100         LOG_INFO(GetLogger()) << "Arrow topology is:\n" << print_topology() << LOG_END;
0101     }
0102     for (auto* arrow : arrows) {
0103         arrow->set_logger(GetLogger());
0104     }
0105 
0106     // _Don't_ establish ordering if nobody needs it!
0107     // This hopefully prevents a small hit to NUMA performance
0108     // because we don't need to update next_event_index across NUMA nodes
0109     bool need_ordering = false;
0110     for (auto* queue : queues) {
0111         if (queue->GetEnforcesOrdering()) {
0112             need_ordering = true;
0113         }
0114     }
0115     if (!need_ordering) {
0116         for (auto* queue : queues) {
0117             queue->SetEstablishesOrdering(false);
0118         }
0119     }
0120     size_t i = 0;
0121     for (auto* queue: queues) {
0122         LOG_DEBUG(GetLogger()) << "Queue " << i++ << ": establishes_ordering: " << queue->GetEstablishesOrdering() << ", enforces_ordering: " << queue->GetEnforcesOrdering();
0123     }
0124 }
0125 
0126 
0127 void JTopologyBuilder::Init() {
0128 
0129     m_components = GetApplication()->GetService<JComponentManager>();
0130 
0131     // We default event pool size to be equal to nthreads
0132     // We parse the 'nthreads' parameter two different ways for backwards compatibility.
0133     if (m_params->Exists("nthreads")) {
0134         if (m_params->GetParameterValue<std::string>("nthreads") == "Ncores") {
0135             m_max_inflight_events = JCpuInfo::GetNumCpus();
0136         } else {
0137             m_max_inflight_events = m_params->GetParameterValue<int>("nthreads");
0138         }
0139     }
0140 
0141     m_params->SetDefaultParameter("jana:max_inflight_events", m_max_inflight_events,
0142                                     "The number of events which may be in-flight at once. Should be at least `nthreads` to prevent starvation; more gives better load balancing.")
0143             ->SetIsAdvanced(true);
0144 
0145     /*
0146     m_params->SetDefaultParameter("jana:enable_stealing", m_enable_stealing,
0147                                     "Enable work stealing. Improves load balancing when jana:locality != 0; otherwise does nothing.")
0148             ->SetIsAdvanced(true);
0149     */
0150     m_params->SetDefaultParameter("jana:affinity", m_affinity,
0151                                     "Constrain worker thread CPU affinity. 0=Let the OS decide. 1=Avoid extra memory movement at the expense of using hyperthreads. 2=Avoid hyperthreads at the expense of extra memory movement")
0152             ->SetIsAdvanced(true);
0153     m_params->SetDefaultParameter("jana:locality", m_locality,
0154                                     "Constrain memory locality. 0=No constraint. 1=Events stay on the same socket. 2=Events stay on the same NUMA domain. 3=Events stay on same core. 4=Events stay on same cpu/hyperthread.")
0155             ->SetIsAdvanced(true);
0156 };
0157 
0158 
0159 void JTopologyBuilder::connect(JArrow* upstream, size_t upstream_port_id, JArrow* downstream, size_t downstream_port_id) {
0160 
0161     JEventQueue* queue = nullptr;
0162 
0163     JArrow::Port& downstream_port = downstream->m_ports.at(downstream_port_id);
0164     if (downstream_port.queue != nullptr) {
0165         // If the queue already exists, use that!
0166         queue = downstream_port.queue;
0167     }
0168     else {
0169         // Create a new queue
0170         queue = new JEventQueue(m_max_inflight_events, mapping.get_loc_count());
0171         downstream_port.queue = queue;
0172         queues.push_back(queue);
0173     }
0174     downstream_port.pool = nullptr;
0175     if (downstream_port.enforces_ordering) {
0176         queue->SetEnforcesOrdering();
0177     }
0178 
0179     JArrow::Port& upstream_port = upstream->m_ports.at(upstream_port_id);
0180     upstream_port.queue = queue;
0181     if (upstream_port.establishes_ordering) {
0182         queue->SetEstablishesOrdering(true);
0183     }
0184     upstream_port.pool = nullptr;
0185 }
0186 
0187 
0188 void JTopologyBuilder::connect_to_first_available(JArrow* upstream, size_t upstream_port, std::vector<std::pair<JArrow*, size_t>> downstreams) {
0189     for (auto& [downstream, downstream_port_id] : downstreams) {
0190         if (downstream != nullptr) {
0191             connect(upstream, upstream_port, downstream, downstream_port_id);
0192             return;
0193         }
0194     }
0195 }
0196 
0197 std::pair<JEventTapArrow*, JEventTapArrow*> JTopologyBuilder::create_tap_chain(std::vector<JEventProcessor*>& procs, std::string level) {
0198 
0199     JEventTapArrow* first = nullptr;
0200     JEventTapArrow* last = nullptr;
0201 
0202     int i=1;
0203     std::string arrow_name = level + "Tap";
0204     for (JEventProcessor* proc : procs) {
0205         if (procs.size() > 1) {
0206             arrow_name += std::to_string(i++);
0207         }
0208         JEventTapArrow* current = new JEventTapArrow(arrow_name);
0209         current->add_processor(proc);
0210         arrows.push_back(current);
0211         if (first == nullptr) {
0212             first = current;
0213         }
0214         if (last != nullptr) {
0215             connect(last, JEventTapArrow::EVENT_OUT, current, JEventTapArrow::EVENT_IN);
0216         }
0217         last = current;
0218     }
0219     return {first, last};
0220 }
0221 
0222 
0223 void JTopologyBuilder::attach_level(JEventLevel current_level, JUnfoldArrow* parent_unfolder, JFoldArrow* parent_folder) {
0224     std::stringstream ss;
0225     ss << current_level;
0226     auto level_str = ss.str();
0227 
0228     // Find all event sources at this level
0229     std::vector<JEventSource*> sources_at_level;
0230     for (JEventSource* source : m_components->get_evt_srces()) {
0231         if (source->GetLevel() == current_level && source->IsEnabled()) {
0232             sources_at_level.push_back(source);
0233         }
0234     }
0235     
0236     // Find all unfolders at this level
0237     std::vector<JEventUnfolder*> unfolders_at_level;
0238     for (JEventUnfolder* unfolder : m_components->get_unfolders()) {
0239         if (unfolder->GetLevel() == current_level && unfolder->IsEnabled()) {
0240             unfolders_at_level.push_back(unfolder);
0241         }
0242     }
0243     
0244     // Find all processors at this level
0245     std::vector<JEventProcessor*> mappable_procs_at_level;
0246     std::vector<JEventProcessor*> tappable_procs_at_level;
0247 
0248     for (JEventProcessor* proc : m_components->get_evt_procs()) {
0249 
0250         if (proc->GetLevel() == current_level && proc->IsEnabled()) {
0251 
0252             // This may be a weird place to do it, but let's quickly validate that users aren't
0253             // trying to enable ordering on a legacy event processor. We don't do this in the constructor
0254             // because we don't want to put constraints on the order in which setters can be called, apart from "before Init()"
0255 
0256             if (proc->GetCallbackStyle() == JEventProcessor::CallbackStyle::LegacyMode && proc->IsOrderingEnabled()) {
0257                 throw JException("%s: Ordering can only be used with non-legacy JEventProcessors", proc->GetTypeName().c_str());
0258             }
0259 
0260             mappable_procs_at_level.push_back(proc);
0261             if (proc->GetCallbackStyle() != JEventProcessor::CallbackStyle::LegacyMode) {
0262                 tappable_procs_at_level.push_back(proc);
0263             }
0264         }
0265     }
0266 
0267 
0268     bool is_top_level = (parent_unfolder == nullptr);
0269     if (is_top_level && sources_at_level.size() == 0) {
0270         // Skip level entirely when no source is present.
0271         LOG_TRACE(GetLogger()) << "JTopologyBuilder: No sources found at level " << current_level << ", skipping" << LOG_END;
0272         JEventLevel next = next_level(current_level);
0273         if (next == JEventLevel::None) {
0274             LOG_WARN(GetLogger()) << "No sources found: Processing topology will be empty." << LOG_END;
0275             return;
0276         }
0277         return attach_level(next, nullptr, nullptr);
0278     }
0279 
0280     // Enforce constraints on what our builder will accept (at least for now)
0281     if (!is_top_level && !sources_at_level.empty()) {
0282         throw JException("Topology forbids event sources at lower event levels in the topology");
0283     }
0284     if ((parent_unfolder == nullptr && parent_folder != nullptr) || (parent_unfolder != nullptr && parent_folder == nullptr)) {
0285         throw JException("Topology requires matching unfolder/folder arrow pairs");
0286     }
0287     if (unfolders_at_level.size() > 1) {
0288         throw JException("Multiple JEventUnfolders provided for level %s", level_str.c_str());
0289     }
0290     // Another constraint is that the highest level of the topology has an event sources, but this is automatically handled by
0291     // the level-skipping logic above
0292 
0293 
0294     // Fill out arrow grid from components at this event level
0295     // --------------------------
0296     // 0. Pool
0297     // --------------------------
0298     LOG_INFO(GetLogger()) << "Creating event pool with level=" << current_level << " and size=" << m_max_inflight_events;
0299     JEventPool* pool_at_level = new JEventPool(m_components, m_max_inflight_events, m_location_count, current_level);
0300     pools.push_back(pool_at_level); // Hand over ownership of the pool to the topology
0301     LOG_INFO(GetLogger()) << "Created event pool with level=" << current_level << " and size=" << m_max_inflight_events;
0302 
0303     // --------------------------
0304     // 1. Source
0305     // --------------------------
0306     JEventSourceArrow* src_arrow = nullptr;
0307     bool need_source = !sources_at_level.empty();
0308     if (need_source) {
0309         src_arrow = new JEventSourceArrow(level_str+"Source", sources_at_level);
0310         src_arrow->attach(pool_at_level, JEventSourceArrow::EVENT_IN);
0311         src_arrow->attach(pool_at_level, JEventSourceArrow::EVENT_OUT);
0312         arrows.push_back(src_arrow);
0313     }
0314 
0315     // --------------------------
0316     // 2. Map1
0317     // --------------------------
0318     bool have_parallel_sources = false;
0319     for (JEventSource* source: sources_at_level) {
0320         have_parallel_sources |= source->IsProcessParallelEnabled();
0321     }
0322     bool have_unfolder = !unfolders_at_level.empty();
0323     JEventMapArrow* map1_arrow = nullptr;
0324     bool need_map1 = (have_parallel_sources || have_unfolder);
0325 
0326     if (need_map1) {
0327         map1_arrow = new JEventMapArrow(level_str+"Map1");
0328         for (JEventSource* source: sources_at_level) {
0329             if (source->IsProcessParallelEnabled()) {
0330                 map1_arrow->add_source(source);
0331             }
0332         }
0333         for (JEventUnfolder* unf: unfolders_at_level) {
0334             map1_arrow->add_unfolder(unf);
0335         }
0336         map1_arrow->attach(pool_at_level, JEventMapArrow::EVENT_IN);
0337         map1_arrow->attach(pool_at_level, JEventMapArrow::EVENT_OUT);
0338         arrows.push_back(map1_arrow);
0339     }
0340 
0341     // --------------------------
0342     // 3. Unfold
0343     // --------------------------
0344     JUnfoldArrow* unfold_arrow = nullptr;
0345     bool need_unfold = have_unfolder;
0346     if (need_unfold) {
0347         unfold_arrow = new JUnfoldArrow(level_str+"Unfold", unfolders_at_level[0]);
0348         unfold_arrow->attach(pool_at_level, JUnfoldArrow::REJECTED_PARENT_OUT); 
0349         arrows.push_back(unfold_arrow);
0350     }
0351 
0352     // --------------------------
0353     // 4. Fold
0354     // --------------------------
0355     JFoldArrow* fold_arrow = nullptr;
0356     bool need_fold = have_unfolder;
0357     if(need_fold) {
0358         fold_arrow = new JFoldArrow(level_str+"Fold", current_level, unfolders_at_level[0]->GetChildLevel());
0359         arrows.push_back(fold_arrow);
0360         fold_arrow->attach(pool_at_level, JFoldArrow::PARENT_OUT); 
0361     }
0362 
0363     // --------------------------
0364     // 5. Map2
0365     // --------------------------
0366     JEventMapArrow* map2_arrow = nullptr;
0367     bool need_map2 = !mappable_procs_at_level.empty();
0368     if (need_map2) {
0369         map2_arrow = new JEventMapArrow(level_str+"Map2");
0370         for (JEventProcessor* proc : mappable_procs_at_level) {
0371             map2_arrow->add_processor(proc);
0372             map2_arrow->attach(pool_at_level, JEventMapArrow::EVENT_IN);
0373             map2_arrow->attach(pool_at_level, JEventMapArrow::EVENT_OUT);
0374         }
0375         arrows.push_back(map2_arrow);
0376     }
0377 
0378     // --------------------------
0379     // 6. Tap
0380     // --------------------------
0381     JEventTapArrow* first_tap_arrow = nullptr;
0382     JEventTapArrow* last_tap_arrow = nullptr;
0383     bool need_tap = !tappable_procs_at_level.empty();
0384     if (need_tap) {
0385         std::tie(first_tap_arrow, last_tap_arrow) = create_tap_chain(tappable_procs_at_level, level_str);
0386         first_tap_arrow->attach(pool_at_level, JEventTapArrow::EVENT_IN);
0387         last_tap_arrow->attach(pool_at_level, JEventTapArrow::EVENT_OUT);
0388     }
0389 
0390 
0391     // Now that we've set up our component grid, we can do wiring!
0392     // --------------------------
0393     // 1. Source
0394     // --------------------------
0395     if (parent_unfolder != nullptr) {
0396         parent_unfolder->attach(pool_at_level, JUnfoldArrow::CHILD_IN);
0397         connect_to_first_available(parent_unfolder, JUnfoldArrow::CHILD_OUT,
0398                                    {{map1_arrow, JEventMapArrow::EVENT_IN}, {unfold_arrow, JUnfoldArrow::PARENT_IN}, {map2_arrow, JEventMapArrow::EVENT_IN}, {first_tap_arrow, JEventTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}});
0399     }
0400     if (src_arrow != nullptr) {
0401         connect_to_first_available(src_arrow, JEventSourceArrow::EVENT_OUT,
0402                                    {{map1_arrow, JEventMapArrow::EVENT_IN}, {unfold_arrow, JUnfoldArrow::PARENT_IN}, {map2_arrow, JEventMapArrow::EVENT_IN}, {first_tap_arrow, JEventTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}});
0403     }
0404     if (map1_arrow != nullptr) {
0405         connect_to_first_available(map1_arrow, JEventMapArrow::EVENT_OUT,
0406                                    {{unfold_arrow, JUnfoldArrow::PARENT_IN}, {map2_arrow, JEventMapArrow::EVENT_IN}, {first_tap_arrow, JEventTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}});
0407     }
0408     if (unfold_arrow != nullptr) {
0409         connect_to_first_available(unfold_arrow, JUnfoldArrow::REJECTED_PARENT_OUT,
0410                                    {{map2_arrow, JEventMapArrow::EVENT_IN}, {first_tap_arrow, JEventTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}});
0411     }
0412     if (fold_arrow != nullptr) {
0413         connect_to_first_available(fold_arrow, JFoldArrow::CHILD_OUT,
0414                                    {{map2_arrow, JEventMapArrow::EVENT_IN}, {first_tap_arrow, JEventTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}});
0415     }
0416     if (map2_arrow != nullptr) {
0417         connect_to_first_available(map2_arrow, JEventMapArrow::EVENT_OUT,
0418                                    {{first_tap_arrow, JEventTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}});
0419     }
0420     if (last_tap_arrow != nullptr) {
0421         connect_to_first_available(last_tap_arrow, JEventTapArrow::EVENT_OUT,
0422                                    {{parent_folder, JFoldArrow::CHILD_IN}});
0423     }
0424     if (parent_folder != nullptr) {
0425         parent_folder->attach(pool_at_level, JFoldArrow::CHILD_OUT);
0426     }
0427 
0428     // Finally, we recur over lower levels!
0429     if (need_unfold) {
0430         auto next_level = unfolders_at_level[0]->GetChildLevel();
0431         attach_level(next_level, unfold_arrow, fold_arrow);
0432     }
0433     else {
0434         // This is the lowest level
0435         // TODO: Improve logic for determining event counts for multilevel topologies
0436         if (last_tap_arrow != nullptr) {
0437             last_tap_arrow->set_is_sink(true);
0438         }
0439         else if (map2_arrow != nullptr) {
0440             map2_arrow->set_is_sink(true);
0441         }
0442         else if (map1_arrow != nullptr) {
0443             map1_arrow->set_is_sink(true);
0444         }
0445         else if (src_arrow != nullptr) {
0446             src_arrow->set_is_sink(true);
0447         }
0448     }
0449 }
0450 
0451 
0452