Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-05-04 08:56:34

0001 
0002 // Copyright 2020, Jefferson Science Associates, LLC.
0003 // Subject to the terms in the LICENSE file found in the top-level directory.
0004 
0005 #include "JTopologyBuilder.h"
0006 
0007 #include <string>
0008 #include <vector>
0009 
0010 #include "JANA/Utils/JEventLevel.h"
0011 #include "JSourceArrow.h"
0012 #include "JMapArrow.h"
0013 #include "JTapArrow.h"
0014 #include "JUnfoldArrow.h"
0015 #include "JFoldArrow.h"
0016 #include <JANA/JEventProcessor.h>
0017 #include <JANA/Services/JComponentManager.h>
0018 #include <JANA/Utils/JTablePrinter.h>
0019 
0020 
0021 JTopologyBuilder::JTopologyBuilder() {
0022     SetPrefix("jana");
0023 }
0024 
0025 JTopologyBuilder::~JTopologyBuilder() {
0026     for (auto arrow : arrows) {
0027         delete arrow;
0028     }
0029     for (auto queue : queues) {
0030         delete queue;
0031     }
0032     for (auto pool : pools) {
0033         delete pool;
0034     }
0035 }
0036 
0037 void JTopologyBuilder::AddArrow(JArrow* arrow) {
0038     arrows.push_back(arrow);
0039     auto it = arrow_lookup.find(arrow->GetName());
0040     if (it != arrow_lookup.end()) {
0041         throw JException("AddArrow(): Arrow with name '%s' has already been added", arrow->GetName().c_str());
0042     }
0043     arrow_lookup[arrow->GetName()] = arrow;
0044 }
0045 
0046 
0047 JEventPool* JTopologyBuilder::GetOrCreatePool(JEventLevel level) {
0048     auto pool_it = pool_lookup.find(level);
0049     if (pool_it != pool_lookup.end()) {
0050         return pool_it->second;
0051     }
0052     else {
0053         auto* pool = new JEventPool(m_components, m_max_inflight_events[level], m_location_count, level);
0054         pools.push_back(pool);
0055         pool_lookup[level] = pool;
0056         return pool;
0057     }
0058 }
0059 
0060 void JTopologyBuilder::ConnectPool(std::string arrow_name, std::string port_name, JEventLevel level) {
0061     auto& arrow = *arrow_lookup.at(arrow_name);
0062     auto port_index = arrow.GetPortIndex(port_name);
0063     auto& port = arrow.GetPort(port_index);
0064     auto* pool = GetOrCreatePool(level);
0065     port.Attach(pool);
0066 }
0067 
0068 void JTopologyBuilder::ConnectPool(JEventLevel upstream_level, JEventLevel downstream_level) {
0069     auto* upstream_pool = GetOrCreatePool(upstream_level);
0070     auto* downstream_pool = GetOrCreatePool(downstream_level);
0071     upstream_pool->AttachForwardingPool(downstream_pool);
0072 }
0073 
0074 void JTopologyBuilder::ConnectQueue(std::string upstream_arrow_name, 
0075                                     std::string upstream_port_name,
0076                                     std::string downstream_arrow_name, 
0077                                     std::string downstream_port_name) {
0078 
0079     auto& upstream_arrow = *arrow_lookup.at(upstream_arrow_name);
0080     auto upstream_port_id = upstream_arrow.GetPortIndex(upstream_port_name);
0081     auto& downstream_arrow = *arrow_lookup.at(downstream_arrow_name);
0082     auto downstream_port_id = downstream_arrow.GetPortIndex(downstream_port_name);
0083 
0084     Connect(&upstream_arrow, upstream_port_id,
0085             &downstream_arrow, downstream_port_id);
0086 }
0087 
0088 std::string JTopologyBuilder::PrintTopology() {
0089     JTablePrinter t;
0090     t.AddColumn("Arrow", JTablePrinter::Justify::Left, 0);
0091     t.AddColumn("Parallel", JTablePrinter::Justify::Center, 0);
0092     t.AddColumn("Port", JTablePrinter::Justify::Left, 0);
0093     t.AddColumn("Place", JTablePrinter::Justify::Left, 0);
0094     t.AddColumn("ID", JTablePrinter::Justify::Left, 0);
0095 
0096     // Build index lookup for queues
0097     int i = 0;
0098     std::map<void*, int> lookup;
0099     for (JEventQueue* queue : queues) {
0100         lookup[queue] = i;
0101         i += 1;
0102     }
0103     // Build index lookup for pools
0104     for (JEventPool* pool : pools) {
0105         lookup[pool] = i;
0106         i += 1;
0107     }
0108     // Build table
0109 
0110     bool show_row = true;
0111     
0112     for (JArrow* arrow : arrows) {
0113 
0114         show_row = true;
0115         for (auto& port : arrow->m_ports) {
0116             if (show_row) {
0117                 t | arrow->GetName();
0118                 t | arrow->IsParallel();
0119                 show_row = false;
0120             }
0121             else {
0122                 t | "" | "" ;
0123             }
0124             auto place_index = lookup[(port->GetQueue()!=nullptr) ? (void*) port->GetQueue() : (void*) port->GetPool()];
0125 
0126             t | port->GetName();
0127             t | ((port->GetQueue() != nullptr) ? "Queue ": "Pool");
0128             t | place_index;
0129         }
0130     }
0131     return t.Render();
0132 }
0133 
0134 
0135 /// set_cofigure_fn lets the user provide a lambda that sets up a topology after all components have been loaded.
0136 /// It provides an 'empty' JArrowTopology which has been furnished with a pointer to the JComponentManager, the JEventPool,
0137 /// and the JProcessorMapping (in case you care about NUMA details). However, it does not contain any queues or arrows.
0138 /// You have to furnish those yourself.
0139 void JTopologyBuilder::SetConfigureFn(std::function<void(JTopologyBuilder&, JComponentManager&)> configure_fn) {
0140     m_configure_topology = std::move(configure_fn);
0141 }
0142 
0143 void JTopologyBuilder::CreateTopology() {
0144     mapping.initialize(static_cast<JProcessorMapping::AffinityStrategy>(m_affinity),
0145                        static_cast<JProcessorMapping::LocalityStrategy>(m_locality));
0146 
0147     if (m_configure_topology) {
0148         m_configure_topology(*this, *m_components);
0149         LOG_WARN(GetLogger()) << "Found custom topology configurator! Modified arrow topology is: \n" << PrintTopology() << LOG_END;
0150     }
0151     else {
0152         AttachLevel(JEventLevel::Run, nullptr, nullptr);
0153         LOG_INFO(GetLogger()) << "Arrow topology is:\n" << PrintTopology() << LOG_END;
0154     }
0155     for (auto* arrow : arrows) {
0156         arrow->SetLogger(GetLogger());
0157     }
0158 
0159     // _Don't_ establish ordering if nobody needs it!
0160     // This hopefully prevents a small hit to NUMA performance
0161     // because we don't need to update next_event_index across NUMA nodes
0162     bool need_ordering = false;
0163     for (auto* queue : queues) {
0164         if (queue->GetEnforcesOrdering()) {
0165             need_ordering = true;
0166         }
0167     }
0168     if (!need_ordering) {
0169         for (auto* queue : queues) {
0170             queue->SetEstablishesOrdering(false);
0171         }
0172     }
0173     size_t i = 0;
0174     for (auto* queue: queues) {
0175         LOG_DEBUG(GetLogger()) << "Queue " << i++ << ": establishes_ordering: " << queue->GetEstablishesOrdering() << ", enforces_ordering: " << queue->GetEnforcesOrdering();
0176     }
0177 }
0178 
0179 
0180 void JTopologyBuilder::Init() {
0181 
0182     m_components = GetApplication()->GetService<JComponentManager>();
0183 
0184     // We default event pool size to be equal to nthreads
0185     // We parse the 'nthreads' parameter two different ways for backwards compatibility.
0186     size_t nthreads = 1;
0187     if (m_params->Exists("nthreads")) {
0188         if (m_params->GetParameterValue<std::string>("nthreads") == "Ncores") {
0189             nthreads = JCpuInfo::GetNumCpus();
0190         } else {
0191             nthreads = m_params->GetParameterValue<int>("nthreads");
0192         }
0193     }
0194 
0195     m_max_inflight_events[JEventLevel::Run] = m_params->RegisterParameter("jana:max_inflight_runs", nthreads,
0196                                 "The number of runs which may be in-flight at once.");
0197 
0198     m_max_inflight_events[JEventLevel::Subrun] = m_params->RegisterParameter("jana:max_inflight_subruns", nthreads,
0199                                 "The number of subruns which may be in-flight at once.");
0200 
0201     m_max_inflight_events[JEventLevel::Timeslice] = m_params->RegisterParameter("jana:max_inflight_timeslices", nthreads,
0202                                 "The number of timeslices which may be in-flight at once.");
0203 
0204     m_max_inflight_events[JEventLevel::Block] = m_params->RegisterParameter("jana:max_inflight_blocks", nthreads,
0205                                 "The number of blocks which may be in-flight at once.");
0206 
0207     m_max_inflight_events[JEventLevel::SlowControls] = m_params->RegisterParameter("jana:max_inflight_slowcontrols", nthreads,
0208                                 "The number of slow control events which may be in-flight at once.");
0209 
0210     m_max_inflight_events[JEventLevel::PhysicsEvent] = m_params->RegisterParameter("jana:max_inflight_events", nthreads,
0211                                 "The number of physics events which may be in-flight at once. Should be at least `nthreads` to prevent starvation; more gives better load balancing.");
0212 
0213     m_max_inflight_events[JEventLevel::Subevent] = m_params->RegisterParameter("jana:max_inflight_subevents", 4*nthreads,
0214                                 "The number of subevents which may be in-flight at once.");
0215 
0216     m_max_inflight_events[JEventLevel::Task] = m_params->RegisterParameter("jana:max_inflight_tasks", 8*nthreads,
0217                                 "The number of tasks which may be in-flight at once.");
0218 
0219     /*
0220     m_params->SetDefaultParameter("jana:enable_stealing", m_enable_stealing,
0221                                     "Enable work stealing. Improves load balancing when jana:locality != 0; otherwise does nothing.")
0222             ->SetIsAdvanced(true);
0223     */
0224     m_params->SetDefaultParameter("jana:affinity", m_affinity,
0225                                     "Constrain worker thread CPU affinity. 0=Let the OS decide. 1=Avoid extra memory movement at the expense of using hyperthreads. 2=Avoid hyperthreads at the expense of extra memory movement")
0226             ->SetIsAdvanced(true);
0227     m_params->SetDefaultParameter("jana:locality", m_locality,
0228                                     "Constrain memory locality. 0=No constraint. 1=Events stay on the same socket. 2=Events stay on the same NUMA domain. 3=Events stay on same core. 4=Events stay on same cpu/hyperthread.")
0229             ->SetIsAdvanced(true);
0230 };
0231 
0232 
0233 void JTopologyBuilder::Connect(JArrow* upstream, size_t upstream_port_id, JArrow* downstream, size_t downstream_port_id) {
0234 
0235     JArrow::Port& upstream_port = upstream->GetPort(upstream_port_id);
0236     JArrow::Port& downstream_port = downstream->GetPort(downstream_port_id);
0237 
0238     LOG_DEBUG(GetLogger()) << "Connecting arrows: " << upstream->GetName() << ":" << upstream_port.GetName() << " --> " << downstream->GetName() << ":" << downstream_port.GetName();
0239 
0240     // Enforce that multiple upstreams can share a downstream, but not vice versa
0241     if (upstream_port.GetQueue() != nullptr) {
0242         throw JException("Upstream port '%s' on arrow '%s' already has a queue", upstream_port.GetName().c_str(), upstream->GetName().c_str());
0243     }
0244 
0245     // Enforce that any event levels that are produced upstream must be accepted downstream
0246     for (auto level: upstream_port.GetLevels()) {
0247         bool level_found = false;
0248         for (auto downstream_level : downstream_port.GetLevels()) {
0249             if (downstream_level == level) {
0250                 level_found = true;
0251                 break;
0252             }
0253         }
0254         if (!level_found) {
0255             LOG_FATAL(GetLogger()) << "Level " << toString(level) << " produced upstream but not accepted downstream: "
0256                 << upstream->GetName() << ":" << upstream_port.GetName() << " --> " << downstream->GetName() << ":" << downstream_port.GetName();
0257             throw JException("Level produced upstream but not accepted downstream");
0258         }
0259     }
0260 
0261     JEventQueue* queue = nullptr;
0262     if (downstream_port.GetQueue() != nullptr) {
0263         queue = downstream_port.GetQueue();
0264     }
0265     else {
0266         // Create new queue
0267         size_t queue_capacity = 0;
0268         for (auto level : downstream_port.GetLevels()) {
0269             queue_capacity += m_max_inflight_events[level];
0270         }
0271         queue = new JEventQueue(queue_capacity, mapping.get_loc_count());
0272         queues.push_back(queue);
0273         downstream_port.Attach(queue);
0274     }
0275 
0276     
0277 
0278     upstream_port.Attach(queue);
0279 
0280     if (downstream_port.GetEnforcesOrdering()) {
0281         queue->SetEnforcesOrdering();
0282     }
0283     if (upstream_port.GetEstablishesOrdering()) {
0284         queue->SetEstablishesOrdering(true);
0285     }
0286 }
0287 
0288 
0289 void JTopologyBuilder::ConnectToFirstAvailable(JArrow* upstream, size_t upstream_port, std::vector<std::pair<JArrow*, size_t>> downstreams) {
0290     for (auto& [downstream, downstream_port_id] : downstreams) {
0291         if (downstream != nullptr) {
0292             Connect(upstream, upstream_port, downstream, downstream_port_id);
0293             return;
0294         }
0295     }
0296 }
0297 
0298 std::pair<JTapArrow*, JTapArrow*> JTopologyBuilder::CreateTapChain(std::vector<JEventProcessor*>& procs, std::string level) {
0299 
0300     JTapArrow* first = nullptr;
0301     JTapArrow* last = nullptr;
0302 
0303     int i=1;
0304     std::string arrow_name = level + "Tap";
0305     for (JEventProcessor* proc : procs) {
0306         if (procs.size() > 1) {
0307             arrow_name += std::to_string(i++);
0308         }
0309         JTapArrow* current = new JTapArrow(arrow_name, proc->GetLevel());
0310         current->AddProcessor(proc);
0311         arrows.push_back(current);
0312         if (first == nullptr) {
0313             first = current;
0314         }
0315         if (last != nullptr) {
0316             Connect(last, JTapArrow::EVENT_OUT, current, JTapArrow::EVENT_IN);
0317         }
0318         last = current;
0319     }
0320     return {first, last};
0321 }
0322 
0323 
0324 void JTopologyBuilder::AttachLevel(JEventLevel current_level, JUnfoldArrow* parent_unfolder, JFoldArrow* parent_folder) {
0325     std::stringstream ss;
0326     ss << current_level;
0327     auto level_str = ss.str();
0328 
0329     // Find all event sources at this level
0330     std::vector<JEventSource*> sources_at_level;
0331     for (JEventSource* source : m_components->get_evt_srces()) {
0332         if (source->GetLevel() == current_level && source->IsEnabled()) {
0333             sources_at_level.push_back(source);
0334         }
0335     }
0336     
0337     // Find all unfolders at this level
0338     std::vector<JEventUnfolder*> unfolders_at_level;
0339     for (JEventUnfolder* unfolder : m_components->get_unfolders()) {
0340         if (unfolder->GetLevel() == current_level && unfolder->IsEnabled()) {
0341             unfolders_at_level.push_back(unfolder);
0342         }
0343     }
0344     
0345     // Find all processors at this level
0346     std::vector<JEventProcessor*> mappable_procs_at_level;
0347     std::vector<JEventProcessor*> tappable_procs_at_level;
0348 
0349     for (JEventProcessor* proc : m_components->get_evt_procs()) {
0350 
0351         if (proc->GetLevel() == current_level && proc->IsEnabled()) {
0352 
0353             // This may be a weird place to do it, but let's quickly validate that users aren't
0354             // trying to enable ordering on a legacy event processor. We don't do this in the constructor
0355             // because we don't want to put constraints on the order in which setters can be called, apart from "before Init()"
0356 
0357             if (proc->GetCallbackStyle() == JEventProcessor::CallbackStyle::LegacyMode && proc->IsOrderingEnabled()) {
0358                 throw JException("%s: Ordering can only be used with non-legacy JEventProcessors", proc->GetTypeName().c_str());
0359             }
0360 
0361             mappable_procs_at_level.push_back(proc);
0362             if (proc->GetCallbackStyle() != JEventProcessor::CallbackStyle::LegacyMode) {
0363                 tappable_procs_at_level.push_back(proc);
0364             }
0365         }
0366     }
0367 
0368 
0369     bool is_top_level = (parent_unfolder == nullptr);
0370     if (is_top_level && sources_at_level.size() == 0) {
0371         // Skip level entirely when no source is present.
0372         LOG_TRACE(GetLogger()) << "JTopologyBuilder: No sources found at level " << current_level << ", skipping" << LOG_END;
0373         JEventLevel next = next_level(current_level);
0374         if (next == JEventLevel::None) {
0375             LOG_WARN(GetLogger()) << "No sources found: Processing topology will be empty." << LOG_END;
0376             return;
0377         }
0378         return AttachLevel(next, nullptr, nullptr);
0379     }
0380 
0381     // Enforce constraints on what our builder will accept (at least for now)
0382     if (!is_top_level && !sources_at_level.empty()) {
0383         throw JException("Topology forbids event sources at lower event levels in the topology");
0384     }
0385     if ((parent_unfolder == nullptr && parent_folder != nullptr) || (parent_unfolder != nullptr && parent_folder == nullptr)) {
0386         throw JException("Topology requires matching unfolder/folder arrow pairs");
0387     }
0388     if (unfolders_at_level.size() > 1) {
0389         throw JException("Multiple JEventUnfolders provided for level %s", level_str.c_str());
0390     }
0391     // Another constraint is that the highest level of the topology has an event sources, but this is automatically handled by
0392     // the level-skipping logic above
0393 
0394 
0395     // Fill out arrow grid from components at this event level
0396     // --------------------------
0397     // 0. Pool
0398     // --------------------------
0399     LOG_INFO(GetLogger()) << "Creating event pool with level=" << current_level << " and size=" << m_max_inflight_events[current_level];
0400     JEventPool* pool_at_level = new JEventPool(m_components, m_max_inflight_events[current_level], m_location_count, current_level);
0401     pools.push_back(pool_at_level); // Hand over ownership of the pool to the topology
0402     LOG_INFO(GetLogger()) << "Finished creating event pool";
0403 
0404     // --------------------------
0405     // 1. Source
0406     // --------------------------
0407     JSourceArrow* src_arrow = nullptr;
0408     bool need_source = !sources_at_level.empty();
0409     if (need_source) {
0410         src_arrow = new JSourceArrow(level_str+"Source", current_level, sources_at_level);
0411         src_arrow->GetPort(JSourceArrow::EVENT_IN).Attach(pool_at_level);
0412         src_arrow->GetPort(JSourceArrow::EVENT_OUT).Attach(pool_at_level);
0413         arrows.push_back(src_arrow);
0414     }
0415 
0416     // --------------------------
0417     // 2. Map1
0418     // --------------------------
0419     bool have_parallel_sources = false;
0420     for (JEventSource* source: sources_at_level) {
0421         have_parallel_sources |= source->IsProcessParallelEnabled();
0422     }
0423     bool have_unfolder = !unfolders_at_level.empty();
0424     JMapArrow* map1_arrow = nullptr;
0425     bool need_map1 = (have_parallel_sources || have_unfolder);
0426 
0427     if (need_map1) {
0428         map1_arrow = new JMapArrow(level_str+"Map1", current_level);
0429         for (JEventSource* source: sources_at_level) {
0430             if (source->IsProcessParallelEnabled()) {
0431                 map1_arrow->AddSource(source);
0432             }
0433         }
0434         for (JEventUnfolder* unf: unfolders_at_level) {
0435             map1_arrow->AddUnfolder(unf);
0436         }
0437         map1_arrow->GetPort(JMapArrow::EVENT_IN).Attach(pool_at_level);
0438         map1_arrow->GetPort(JMapArrow::EVENT_OUT).Attach(pool_at_level);
0439         arrows.push_back(map1_arrow);
0440     }
0441 
0442     // --------------------------
0443     // 3. Unfold
0444     // --------------------------
0445     JUnfoldArrow* unfold_arrow = nullptr;
0446     bool need_unfold = have_unfolder;
0447     if (need_unfold) {
0448         unfold_arrow = new JUnfoldArrow(level_str+"Unfold", unfolders_at_level[0]);
0449         unfold_arrow->GetPort(JUnfoldArrow::REJECTED_PARENT_OUT).Attach(pool_at_level);
0450         arrows.push_back(unfold_arrow);
0451     }
0452 
0453     // --------------------------
0454     // 4. Fold
0455     // --------------------------
0456     JFoldArrow* fold_arrow = nullptr;
0457     bool need_fold = have_unfolder;
0458     if(need_fold) {
0459         fold_arrow = new JFoldArrow(level_str+"Fold", current_level, unfolders_at_level[0]->GetChildLevel());
0460         arrows.push_back(fold_arrow);
0461         fold_arrow->GetPort(JFoldArrow::PARENT_OUT).Attach(pool_at_level);
0462     }
0463 
0464     // --------------------------
0465     // 5. Map2
0466     // --------------------------
0467     JMapArrow* map2_arrow = nullptr;
0468     bool need_map2 = !mappable_procs_at_level.empty();
0469     if (need_map2) {
0470         map2_arrow = new JMapArrow(level_str+"Map2", current_level);
0471         for (JEventProcessor* proc : mappable_procs_at_level) {
0472             map2_arrow->AddProcessor(proc);
0473             map2_arrow->GetPort(JMapArrow::EVENT_IN).Attach(pool_at_level);
0474             map2_arrow->GetPort(JMapArrow::EVENT_OUT).Attach(pool_at_level);
0475         }
0476         arrows.push_back(map2_arrow);
0477     }
0478 
0479     // --------------------------
0480     // 6. Tap
0481     // --------------------------
0482     JTapArrow* first_tap_arrow = nullptr;
0483     JTapArrow* last_tap_arrow = nullptr;
0484     bool need_tap = !tappable_procs_at_level.empty();
0485     if (need_tap) {
0486         std::tie(first_tap_arrow, last_tap_arrow) = CreateTapChain(tappable_procs_at_level, level_str);
0487         first_tap_arrow->GetPort(JTapArrow::EVENT_IN).Attach(pool_at_level);
0488         last_tap_arrow->GetPort(JTapArrow::EVENT_OUT).Attach(pool_at_level);
0489     }
0490 
0491 
0492     // Now that we've set up our component grid, we can do wiring!
0493     // --------------------------
0494     // 1. Source
0495     // --------------------------
0496     if (parent_unfolder != nullptr) {
0497         parent_unfolder->GetPort(JUnfoldArrow::CHILD_IN).Attach(pool_at_level);
0498         ConnectToFirstAvailable(parent_unfolder, JUnfoldArrow::CHILD_OUT,
0499                                    {{map1_arrow, JMapArrow::EVENT_IN}, {unfold_arrow, JUnfoldArrow::PARENT_IN}, {map2_arrow, JMapArrow::EVENT_IN}, {first_tap_arrow, JTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}});
0500     }
0501     if (src_arrow != nullptr) {
0502         ConnectToFirstAvailable(src_arrow, JSourceArrow::EVENT_OUT,
0503                                    {{map1_arrow, JMapArrow::EVENT_IN}, {unfold_arrow, JUnfoldArrow::PARENT_IN}, {map2_arrow, JMapArrow::EVENT_IN}, {first_tap_arrow, JTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}});
0504     }
0505     if (map1_arrow != nullptr) {
0506         ConnectToFirstAvailable(map1_arrow, JMapArrow::EVENT_OUT,
0507                                    {{unfold_arrow, JUnfoldArrow::PARENT_IN}, {map2_arrow, JMapArrow::EVENT_IN}, {first_tap_arrow, JTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}});
0508     }
0509     if (unfold_arrow != nullptr) {
0510         ConnectToFirstAvailable(unfold_arrow, JUnfoldArrow::REJECTED_PARENT_OUT,
0511                                    {{map2_arrow, JMapArrow::EVENT_IN}, {first_tap_arrow, JTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}});
0512     }
0513     if (fold_arrow != nullptr) {
0514         ConnectToFirstAvailable(fold_arrow, JFoldArrow::PARENT_OUT,
0515                                    {{map2_arrow, JMapArrow::EVENT_IN}, {first_tap_arrow, JTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}});
0516     }
0517     if (map2_arrow != nullptr) {
0518         ConnectToFirstAvailable(map2_arrow, JMapArrow::EVENT_OUT,
0519                                    {{first_tap_arrow, JTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}});
0520     }
0521     if (last_tap_arrow != nullptr) {
0522         ConnectToFirstAvailable(last_tap_arrow, JTapArrow::EVENT_OUT,
0523                                    {{parent_folder, JFoldArrow::CHILD_IN}});
0524     }
0525     if (parent_folder != nullptr) {
0526         parent_folder->GetPort(JFoldArrow::CHILD_OUT).Attach(pool_at_level);
0527     }
0528 
0529     // Finally, we recur over lower levels!
0530     if (need_unfold) {
0531         auto next_level = unfolders_at_level[0]->GetChildLevel();
0532         AttachLevel(next_level, unfold_arrow, fold_arrow);
0533     }
0534     else {
0535         // This is the lowest level
0536         // TODO: Improve logic for determining event counts for multilevel topologies
0537         if (last_tap_arrow != nullptr) {
0538             last_tap_arrow->SetIsSink(true);
0539         }
0540         else if (map2_arrow != nullptr) {
0541             map2_arrow->SetIsSink(true);
0542         }
0543         else if (map1_arrow != nullptr) {
0544             map1_arrow->SetIsSink(true);
0545         }
0546         else if (src_arrow != nullptr) {
0547             src_arrow->SetIsSink(true);
0548         }
0549     }
0550 }
0551 
0552 
0553