Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-06-01 08:43:34

0001 
0002 // Copyright 2020, Jefferson Science Associates, LLC.
0003 // Subject to the terms in the LICENSE file found in the top-level directory.
0004 
0005 #include "JTopologyBuilder.h"
0006 
0007 #include <string>
0008 #include <vector>
0009 
0010 #include "JANA/Topology/JArrow.h"
0011 #include "JANA/Utils/JEventLevel.h"
0012 #include "JSourceArrow.h"
0013 #include "JMultilevelSourceArrow.h"
0014 #include "JMapArrow.h"
0015 #include "JTapArrow.h"
0016 #include "JUnfoldArrow.h"
0017 #include "JFoldArrow.h"
0018 #include <JANA/JEventProcessor.h>
0019 #include <JANA/Services/JComponentManager.h>
0020 #include <JANA/Utils/JTablePrinter.h>
0021 
0022 
0023 JTopologyBuilder::JTopologyBuilder() {
0024     SetPrefix("jana");
0025 }
0026 
0027 JTopologyBuilder::~JTopologyBuilder() {
0028     for (auto arrow : arrows) {
0029         delete arrow;
0030     }
0031     for (auto queue : queues) {
0032         delete queue;
0033     }
0034     for (auto pool : pools) {
0035         delete pool;
0036     }
0037 }
0038 
0039 void JTopologyBuilder::AddArrow(JArrow* arrow) {
0040     arrows.push_back(arrow);
0041     arrow->SetId(arrows.size()-1);
0042     auto it = arrow_lookup.find(arrow->GetName());
0043     if (it != arrow_lookup.end()) {
0044         throw JException("AddArrow(): Arrow with name '%s' has already been added", arrow->GetName().c_str());
0045     }
0046     arrow_lookup[arrow->GetName()] = arrow;
0047 }
0048 
0049 JArrow* JTopologyBuilder::GetArrow(const std::string& arrow_name) {
0050     auto it = arrow_lookup.find(arrow_name);
0051     if (it == arrow_lookup.end()) {
0052         return nullptr;
0053     }
0054     return it->second;
0055 }
0056 
0057 JEventPool* JTopologyBuilder::GetOrCreatePool(JEventLevel level) {
0058     auto pool_it = pool_lookup.find(level);
0059     if (pool_it != pool_lookup.end()) {
0060         return pool_it->second;
0061     }
0062     else {
0063         auto* pool = new JEventPool(m_components, m_max_inflight_events[level], m_location_count, level);
0064         pools.push_back(pool);
0065         pool_lookup[level] = pool;
0066         return pool;
0067     }
0068 }
0069 
0070 void JTopologyBuilder::ConnectPool(std::string arrow_name, std::string port_name, JEventLevel level) {
0071     auto& arrow = *arrow_lookup.at(arrow_name);
0072     auto port_index = arrow.GetPortIndex(port_name);
0073     auto& port = arrow.GetPort(port_index);
0074     auto* pool = GetOrCreatePool(level);
0075     port.Attach(pool);
0076 }
0077 
0078 void JTopologyBuilder::ConnectPool(JEventLevel upstream_level, JEventLevel downstream_level) {
0079     auto* upstream_pool = GetOrCreatePool(upstream_level);
0080     auto* downstream_pool = GetOrCreatePool(downstream_level);
0081     upstream_pool->AttachForwardingPool(downstream_pool);
0082 }
0083 
0084 void JTopologyBuilder::ConnectQueue(std::string upstream_arrow_name, 
0085                                     std::string upstream_port_name,
0086                                     std::string downstream_arrow_name, 
0087                                     std::string downstream_port_name) {
0088 
0089     auto& upstream_arrow = *arrow_lookup.at(upstream_arrow_name);
0090     auto upstream_port_id = upstream_arrow.GetPortIndex(upstream_port_name);
0091     auto& downstream_arrow = *arrow_lookup.at(downstream_arrow_name);
0092     auto downstream_port_id = downstream_arrow.GetPortIndex(downstream_port_name);
0093 
0094     Connect(&upstream_arrow, upstream_port_id,
0095             &downstream_arrow, downstream_port_id);
0096 }
0097 
0098 std::string JTopologyBuilder::PrintTopology() {
0099     JTablePrinter t;
0100     t.AddColumn("Arrow", JTablePrinter::Justify::Left, 0);
0101     t.AddColumn("Parallel", JTablePrinter::Justify::Center, 0);
0102     t.AddColumn("Port", JTablePrinter::Justify::Left, 0);
0103     t.AddColumn("Place", JTablePrinter::Justify::Left, 0);
0104     t.AddColumn("ID", JTablePrinter::Justify::Left, 0);
0105     t.AddColumn("Order", JTablePrinter::Justify::Left, 0);
0106 
0107     // Build index lookup for queues
0108     int i = 0;
0109     std::map<void*, int> lookup;
0110     // Build index lookup for pools
0111     for (JEventPool* pool : pools) {
0112         lookup[pool] = i;
0113         i += 1;
0114     }
0115     for (JEventQueue* queue : queues) {
0116         lookup[queue] = i;
0117         i += 1;
0118     }
0119     // Build table
0120 
0121     bool show_row = true;
0122     
0123     for (JArrow* arrow : arrows) {
0124 
0125         show_row = true;
0126         for (auto& port : arrow->m_ports) {
0127             if (show_row) {
0128                 t | arrow->GetName();
0129                 t | arrow->IsParallel();
0130                 show_row = false;
0131             }
0132             else {
0133                 t | "" | "" ;
0134             }
0135             auto place_index = lookup[(port->GetQueue()!=nullptr) ? (void*) port->GetQueue() : (void*) port->GetPool()];
0136 
0137             t | port->GetName();
0138             t | ((port->GetQueue() != nullptr) ? "Queue ": "Pool");
0139             t | place_index;
0140             if (port->GetEnforcesOrdering() && port->GetEstablishesOrdering()) {
0141                 t | "Both";
0142             }
0143             else if (port->GetEnforcesOrdering()) {
0144                 t | "Enf";
0145             }
0146             else if (port->GetEstablishesOrdering()) {
0147                 t | "Est";
0148             }
0149             else {
0150                 t | "";
0151             }
0152         }
0153     }
0154     return t.Render();
0155 }
0156 
0157 
0158 /// set_cofigure_fn lets the user provide a lambda that sets up a topology after all components have been loaded.
0159 /// It provides an 'empty' JArrowTopology which has been furnished with a pointer to the JComponentManager, the JEventPool,
0160 /// and the JProcessorMapping (in case you care about NUMA details). However, it does not contain any queues or arrows.
0161 /// You have to furnish those yourself.
0162 void JTopologyBuilder::SetConfigureFn(std::function<void(JTopologyBuilder&, JComponentManager&)> configure_fn) {
0163     m_configure_topology = std::move(configure_fn);
0164 }
0165 
0166 void JTopologyBuilder::CreateTopology() {
0167     mapping.initialize(static_cast<JProcessorMapping::AffinityStrategy>(m_affinity),
0168                        static_cast<JProcessorMapping::LocalityStrategy>(m_locality));
0169 
0170     if (m_configure_topology) {
0171         m_configure_topology(*this, *m_components);
0172         LOG_INFO(GetLogger()) << "Using custom topology configuration function" << LOG_END;
0173     }
0174     else {
0175         CreateTopologyFromScratch();
0176     }
0177     for (auto* arrow : arrows) {
0178         arrow->SetLogger(GetLogger());
0179     }
0180 
0181     // _Don't_ establish ordering if nobody needs it!
0182     // This hopefully prevents a small hit to NUMA performance
0183     // because we don't need to update next_event_index across NUMA nodes
0184     bool need_ordering = false;
0185     for (auto* queue : queues) {
0186         if (queue->GetEnforcesOrdering()) {
0187             need_ordering = true;
0188         }
0189     }
0190     if (!need_ordering) {
0191         for (auto* queue : queues) {
0192             queue->SetEstablishesOrdering(false);
0193         }
0194     }
0195     LOG_INFO(GetLogger()) << "Arrow topology is:\n" << PrintTopology() << LOG_END;
0196 }
0197 
0198 void JTopologyBuilder::CreateTopologyFromScratch() {
0199 
0200     enum class Column { Source, UnfoldAbove, BatchBefore, UnfoldBelow, FoldBelow, BatchAfter, Tap, FoldAbove};
0201     std::vector<Column> columns = { Column::Source, Column::UnfoldAbove, Column::BatchBefore, Column::UnfoldBelow, Column::FoldBelow, Column::BatchAfter, Column::Tap, Column::FoldAbove};
0202     struct Cell {
0203         JArrow* start = nullptr;
0204         JArrow* end = nullptr;
0205     };
0206 
0207     std::map<std::pair<JEventLevel, Column>, Cell> grid;
0208 
0209     // -----------------------------
0210     // Phase 1: Iterate over all components, adding the corresponding arrows to the grid
0211     // -----------------------------
0212 
0213     int map_counter = 1;
0214     std::set<JEventLevel> levels_present;
0215 
0216     // Place all sources on grid
0217     // -----------------------------
0218     std::map<JEventLevel, std::vector<JEventSource*>> sources;
0219     for (JEventSource* source : m_components->GetSources()) {
0220         if (source->IsEnabled()) {
0221             sources[source->GetLevel()].push_back(source);
0222             levels_present.insert(source->GetLevel());
0223         }
0224     }
0225     for (auto& it : sources) {
0226         auto level = it.first;
0227         auto level_str = toString(level);
0228         bool need_map = false;
0229         bool need_multi_arrow = false;
0230         for (auto* source : it.second) {
0231             need_map |= source->IsProcessParallelEnabled();
0232             need_multi_arrow |= (source->GetParentLevels().size() > 0);
0233         }
0234 
0235         if (need_multi_arrow && sources.size() > 1) {
0236             throw JException("Multiple multilevel JEventSources not supported yet");
0237         }
0238 
0239         JArrow* src_arrow;
0240         if (need_multi_arrow) {
0241             src_arrow = new JMultilevelSourceArrow(level_str+"MultiSource", it.second.at(0));
0242             // Add parent levels now. Child level is added further below.
0243             for (auto parent_level: it.second.at(0)->GetParentLevels()) {
0244                 levels_present.insert(parent_level);
0245                 grid[{parent_level, Column::Source}] = {src_arrow, src_arrow};
0246             }
0247         }
0248         else {
0249             src_arrow = new JSourceArrow(level_str+"Source", level, it.second);
0250         }
0251         AddArrow(src_arrow);
0252 
0253         if (need_map) {
0254             auto* map_arrow = new JMapArrow(toString(level)+"Map"+std::to_string(map_counter++), level);
0255             map_arrow->SetParallelSource(true);
0256             AddArrow(map_arrow);
0257             Connect(src_arrow, 1, map_arrow, 0);
0258             grid[{level, Column::Source}] = {src_arrow, map_arrow};
0259         }
0260         else {
0261             grid[{level, Column::Source}] = {src_arrow, src_arrow};
0262         }
0263     }
0264 
0265     // Place all unfolders on grid
0266     // -----------------------------
0267     for (auto* unfolder: m_components->GetUnfolders()) {
0268 
0269         if (!unfolder->IsEnabled()) continue;
0270 
0271         // Create unfold arrow
0272         // Publish at _each_ grid location
0273         auto parent_level = unfolder->GetLevel();
0274         auto child_level = unfolder->GetChildLevel();
0275         levels_present.insert(parent_level);
0276         levels_present.insert(child_level);
0277 
0278         auto* map_arrow = new JMapArrow(toString(parent_level)+"Map"+std::to_string(map_counter++), parent_level);
0279         auto* unfold_arrow = new JUnfoldArrow(toString(child_level)+"Unfold", unfolder);
0280         map_arrow->AddUnfolder(unfolder);
0281         AddArrow(map_arrow);
0282         AddArrow(unfold_arrow);
0283         Connect(map_arrow, map_arrow->EVENT_OUT, unfold_arrow, unfold_arrow->PARENT_IN);
0284 
0285         if (grid.find({parent_level, Column::UnfoldBelow}) != grid.end()) {
0286             throw JException("Only one unfolder allowed for parent level=%s", toString(parent_level).c_str());
0287         }
0288         if (grid.find({child_level, Column::UnfoldAbove}) != grid.end()) {
0289             throw JException("Only one unfolder allowed for child level=%s", toString(child_level).c_str());
0290         }
0291         grid[{parent_level, Column::UnfoldBelow}] = {map_arrow, unfold_arrow};
0292         grid[{child_level, Column::UnfoldAbove}] = {unfold_arrow, unfold_arrow};
0293     }
0294 
0295     // Place all folders on grid
0296     // -----------------------------
0297     for (auto* folder: m_components->GetFolders()) {
0298 
0299         if (!folder->IsEnabled()) continue;
0300 
0301         // Create unfold arrow
0302         // Publish at _each_ grid location
0303         auto parent_level = folder->GetLevel();
0304         auto child_level = folder->GetChildLevel();
0305         levels_present.insert(parent_level);
0306         levels_present.insert(child_level);
0307 
0308         auto* map_arrow = new JMapArrow(toString(child_level)+"Map"+std::to_string(map_counter++), parent_level);
0309         auto* fold_arrow = new JFoldArrow(toString(parent_level)+"Fold", parent_level, child_level);
0310         fold_arrow->SetFolder(folder);
0311         map_arrow->AddFolder(folder);
0312         AddArrow(map_arrow);
0313         AddArrow(fold_arrow);
0314         Connect(map_arrow, map_arrow->EVENT_OUT, fold_arrow, fold_arrow->CHILD_IN);
0315 
0316         if (grid.find({parent_level, Column::FoldBelow}) != grid.end()) {
0317             throw JException("Only one folder allowed for parent level=%s", toString(parent_level).c_str());
0318         }
0319         if (grid.find({child_level, Column::FoldAbove}) != grid.end()) {
0320             throw JException("Only one folder allowed for child level=%s", toString(child_level).c_str());
0321         }
0322         grid[{parent_level, Column::FoldBelow}] = {fold_arrow, fold_arrow};
0323         grid[{child_level, Column::FoldAbove}] = {map_arrow, fold_arrow};
0324     }
0325 
0326     // Place all processors on grid
0327     // -----------------------------
0328     std::map<JEventLevel, std::vector<JEventProcessor*>> mappable_processors;
0329     std::map<JEventLevel, std::vector<JEventProcessor*>> tappable_processors;
0330     for (auto* proc : m_components->GetProcessors()) {
0331         if (proc->IsEnabled()) {
0332             levels_present.insert(proc->GetLevel());
0333             if (proc->GetCallbackStyle() == JEventProcessor::CallbackStyle::LegacyMode && proc->IsOrderingEnabled()) {
0334                 throw JException("%s: Ordering can only be used with non-legacy JEventProcessors", proc->GetTypeName().c_str());
0335             }
0336             mappable_processors[proc->GetLevel()].push_back(proc);
0337             if (proc->GetCallbackStyle() != JEventProcessor::CallbackStyle::LegacyMode) {
0338                 tappable_processors[proc->GetLevel()].push_back(proc);
0339             }
0340         }
0341     }
0342     for (auto it : mappable_processors) {
0343         auto level = it.first;
0344         auto level_str = toString(level);
0345         auto* map_arrow = new JMapArrow(level_str+"Map"+std::to_string(map_counter++), level);
0346         for (JEventProcessor* proc : it.second) {
0347             map_arrow->AddProcessor(proc);
0348         }
0349         AddArrow(map_arrow);
0350 
0351         auto tappable_procs_it = tappable_processors.find(level);
0352         if (tappable_procs_it != tappable_processors.end()) {
0353             JArrow* first_tap_arrow = nullptr;
0354             JArrow* last_tap_arrow = nullptr;
0355             std::tie(first_tap_arrow, last_tap_arrow) = CreateTapChain(it.second, level_str);
0356             Connect(map_arrow, map_arrow->EVENT_OUT, first_tap_arrow, JTapArrow::EVENT_IN);
0357             grid[{level, Column::Tap}] = {map_arrow, last_tap_arrow};
0358         }
0359         else {
0360             // ONLY legacy processors, no tap chain
0361             grid[{level, Column::Tap}] = {map_arrow, map_arrow};
0362         }
0363     }
0364 
0365     // -----------------------------
0366     // Phase 2: Iterate over all rows and all adjacent occupied column pairs, wiring horizontally
0367     // -----------------------------
0368 
0369     for (JEventLevel level : levels_present) {
0370 
0371         auto* pool = GetOrCreatePool(level);
0372         JArrow* last_arrow = nullptr;
0373         for (auto column : columns) {
0374             auto it = grid.find({level, column});
0375             if (it == grid.end()) { continue; }
0376 
0377             JArrow* current_arrow = it->second.start;
0378             if (last_arrow == nullptr) {
0379                 // This is the first arrow we've found, so connect the pool here
0380                 auto port_index = current_arrow->GetPortIndex(level, JArrow::PortDirection::In);
0381                 current_arrow->GetPort(port_index).Attach(pool);
0382             }
0383             else {
0384                 Connect(last_arrow,
0385                         last_arrow->GetPortIndex(level, JArrow::PortDirection::Out),
0386                         current_arrow, 
0387                         current_arrow->GetPortIndex(level, JArrow::PortDirection::In));
0388             }
0389             last_arrow = it->second.end;
0390 
0391         }
0392         // Connect last_arrow to pool
0393         auto port_index = last_arrow->GetPortIndex(level, JArrow::PortDirection::Out);
0394         if (level == JEventLevel::PhysicsEvent) {
0395             last_arrow->SetIsSink(true);
0396         }
0397         last_arrow->GetPort(port_index).Attach(pool);
0398     }
0399 
0400     // -----------------------------
0401     // Phase 3: Traverse event hierarchy and attach levels accordingly
0402     // -----------------------------
0403     // Because we haven't fully implemented the event hierarchy yet, let's just go with the fully connected graph
0404 
0405     for (auto outer_level : levels_present) {
0406         for (auto inner_level : levels_present) {
0407             if (outer_level != inner_level) {
0408                 ConnectPool(outer_level, inner_level);
0409             }
0410         }
0411     }
0412 }
0413 
0414 
0415 void JTopologyBuilder::Init() {
0416 
0417     m_components = GetApplication()->GetService<JComponentManager>();
0418 
0419     // We default event pool size to be equal to nthreads
0420     // We parse the 'nthreads' parameter two different ways for backwards compatibility.
0421     size_t nthreads = 1;
0422     if (m_params->Exists("nthreads")) {
0423         if (m_params->GetParameterValue<std::string>("nthreads") == "Ncores") {
0424             nthreads = JCpuInfo::GetNumCpus();
0425         } else {
0426             nthreads = m_params->GetParameterValue<int>("nthreads");
0427         }
0428     }
0429 
0430     m_max_inflight_events[JEventLevel::Run] = m_params->RegisterParameter("jana:max_inflight_runs", nthreads,
0431                                 "The number of runs which may be in-flight at once.");
0432 
0433     m_max_inflight_events[JEventLevel::Subrun] = m_params->RegisterParameter("jana:max_inflight_subruns", nthreads,
0434                                 "The number of subruns which may be in-flight at once.");
0435 
0436     m_max_inflight_events[JEventLevel::Timeslice] = m_params->RegisterParameter("jana:max_inflight_timeslices", nthreads,
0437                                 "The number of timeslices which may be in-flight at once.");
0438 
0439     m_max_inflight_events[JEventLevel::Block] = m_params->RegisterParameter("jana:max_inflight_blocks", nthreads,
0440                                 "The number of blocks which may be in-flight at once.");
0441 
0442     m_max_inflight_events[JEventLevel::SlowControls] = m_params->RegisterParameter("jana:max_inflight_slowcontrols", nthreads,
0443                                 "The number of slow control events which may be in-flight at once.");
0444 
0445     m_max_inflight_events[JEventLevel::PhysicsEvent] = m_params->RegisterParameter("jana:max_inflight_events", nthreads,
0446                                 "The number of physics events which may be in-flight at once. Should be at least `nthreads` to prevent starvation; more gives better load balancing.");
0447 
0448     m_max_inflight_events[JEventLevel::Subevent] = m_params->RegisterParameter("jana:max_inflight_subevents", 4*nthreads,
0449                                 "The number of subevents which may be in-flight at once.");
0450 
0451     m_max_inflight_events[JEventLevel::Task] = m_params->RegisterParameter("jana:max_inflight_tasks", 8*nthreads,
0452                                 "The number of tasks which may be in-flight at once.");
0453 
0454     /*
0455     m_params->SetDefaultParameter("jana:enable_stealing", m_enable_stealing,
0456                                     "Enable work stealing. Improves load balancing when jana:locality != 0; otherwise does nothing.")
0457             ->SetIsAdvanced(true);
0458     */
0459     m_params->SetDefaultParameter("jana:affinity", m_affinity,
0460                                     "Constrain worker thread CPU affinity. 0=Let the OS decide. 1=Avoid extra memory movement at the expense of using hyperthreads. 2=Avoid hyperthreads at the expense of extra memory movement")
0461             ->SetIsAdvanced(true);
0462     m_params->SetDefaultParameter("jana:locality", m_locality,
0463                                     "Constrain memory locality. 0=No constraint. 1=Events stay on the same socket. 2=Events stay on the same NUMA domain. 3=Events stay on same core. 4=Events stay on same cpu/hyperthread.")
0464             ->SetIsAdvanced(true);
0465 };
0466 
0467 
0468 void JTopologyBuilder::Connect(JArrow* upstream, size_t upstream_port_id, JArrow* downstream, size_t downstream_port_id) {
0469 
0470     JArrow::Port& upstream_port = upstream->GetPort(upstream_port_id);
0471     JArrow::Port& downstream_port = downstream->GetPort(downstream_port_id);
0472 
0473     LOG_DEBUG(GetLogger()) << "Connecting arrows: " << upstream->GetName() << ":" << upstream_port.GetName() << " --> " << downstream->GetName() << ":" << downstream_port.GetName();
0474 
0475     // Enforce that multiple upstreams can share a downstream, but not vice versa
0476     if (upstream_port.GetQueue() != nullptr) {
0477         throw JException("Upstream port '%s' on arrow '%s' already has a queue", upstream_port.GetName().c_str(), upstream->GetName().c_str());
0478     }
0479 
0480     // Enforce that any event levels that are produced upstream must be accepted downstream
0481     for (auto level: upstream_port.GetLevels()) {
0482         bool level_found = false;
0483         for (auto downstream_level : downstream_port.GetLevels()) {
0484             if (downstream_level == level) {
0485                 level_found = true;
0486                 break;
0487             }
0488         }
0489         if (!level_found) {
0490             LOG_FATAL(GetLogger()) << "Level " << toString(level) << " produced upstream but not accepted downstream: "
0491                 << upstream->GetName() << ":" << upstream_port.GetName() << " --> " << downstream->GetName() << ":" << downstream_port.GetName();
0492             throw JException("Level produced upstream but not accepted downstream");
0493         }
0494     }
0495 
0496     JEventQueue* queue = nullptr;
0497     if (downstream_port.GetQueue() != nullptr) {
0498         queue = downstream_port.GetQueue();
0499     }
0500     else {
0501         // Create new queue
0502         size_t queue_capacity = 0;
0503         for (auto level : downstream_port.GetLevels()) {
0504             queue_capacity += m_max_inflight_events[level];
0505         }
0506         queue = new JEventQueue(queue_capacity, mapping.get_loc_count());
0507         queues.push_back(queue);
0508         downstream_port.Attach(queue);
0509     }
0510 
0511     upstream_port.Attach(queue);
0512 
0513     if (downstream_port.GetEnforcesOrdering()) {
0514         queue->SetEnforcesOrdering();
0515     }
0516     if (upstream_port.GetEstablishesOrdering()) {
0517         queue->SetEstablishesOrdering(true);
0518     }
0519 }
0520 
0521 
0522 std::pair<JTapArrow*, JTapArrow*> JTopologyBuilder::CreateTapChain(std::vector<JEventProcessor*>& procs, std::string level) {
0523 
0524     JTapArrow* first = nullptr;
0525     JTapArrow* last = nullptr;
0526 
0527     int i=1;
0528     for (JEventProcessor* proc : procs) {
0529         std::string arrow_name = level + "Tap";
0530         if (procs.size() > 1) {
0531             arrow_name += std::to_string(i++);
0532         }
0533         JTapArrow* current = new JTapArrow(arrow_name, proc->GetLevel());
0534         current->AddProcessor(proc);
0535         AddArrow(current);
0536         if (first == nullptr) {
0537             first = current;
0538         }
0539         if (last != nullptr) {
0540             Connect(last, JTapArrow::EVENT_OUT, current, JTapArrow::EVENT_IN);
0541         }
0542         last = current;
0543     }
0544     return {first, last};
0545 }
0546 
0547