File indexing completed on 2026-05-04 08:56:34
0001
0002
0003
0004
0005 #include "JTopologyBuilder.h"
0006
0007 #include <string>
0008 #include <vector>
0009
0010 #include "JANA/Utils/JEventLevel.h"
0011 #include "JSourceArrow.h"
0012 #include "JMapArrow.h"
0013 #include "JTapArrow.h"
0014 #include "JUnfoldArrow.h"
0015 #include "JFoldArrow.h"
0016 #include <JANA/JEventProcessor.h>
0017 #include <JANA/Services/JComponentManager.h>
0018 #include <JANA/Utils/JTablePrinter.h>
0019
0020
0021 JTopologyBuilder::JTopologyBuilder() {
0022 SetPrefix("jana");
0023 }
0024
0025 JTopologyBuilder::~JTopologyBuilder() {
0026 for (auto arrow : arrows) {
0027 delete arrow;
0028 }
0029 for (auto queue : queues) {
0030 delete queue;
0031 }
0032 for (auto pool : pools) {
0033 delete pool;
0034 }
0035 }
0036
0037 void JTopologyBuilder::AddArrow(JArrow* arrow) {
0038 arrows.push_back(arrow);
0039 auto it = arrow_lookup.find(arrow->GetName());
0040 if (it != arrow_lookup.end()) {
0041 throw JException("AddArrow(): Arrow with name '%s' has already been added", arrow->GetName().c_str());
0042 }
0043 arrow_lookup[arrow->GetName()] = arrow;
0044 }
0045
0046
0047 JEventPool* JTopologyBuilder::GetOrCreatePool(JEventLevel level) {
0048 auto pool_it = pool_lookup.find(level);
0049 if (pool_it != pool_lookup.end()) {
0050 return pool_it->second;
0051 }
0052 else {
0053 auto* pool = new JEventPool(m_components, m_max_inflight_events[level], m_location_count, level);
0054 pools.push_back(pool);
0055 pool_lookup[level] = pool;
0056 return pool;
0057 }
0058 }
0059
0060 void JTopologyBuilder::ConnectPool(std::string arrow_name, std::string port_name, JEventLevel level) {
0061 auto& arrow = *arrow_lookup.at(arrow_name);
0062 auto port_index = arrow.GetPortIndex(port_name);
0063 auto& port = arrow.GetPort(port_index);
0064 auto* pool = GetOrCreatePool(level);
0065 port.Attach(pool);
0066 }
0067
0068 void JTopologyBuilder::ConnectPool(JEventLevel upstream_level, JEventLevel downstream_level) {
0069 auto* upstream_pool = GetOrCreatePool(upstream_level);
0070 auto* downstream_pool = GetOrCreatePool(downstream_level);
0071 upstream_pool->AttachForwardingPool(downstream_pool);
0072 }
0073
0074 void JTopologyBuilder::ConnectQueue(std::string upstream_arrow_name,
0075 std::string upstream_port_name,
0076 std::string downstream_arrow_name,
0077 std::string downstream_port_name) {
0078
0079 auto& upstream_arrow = *arrow_lookup.at(upstream_arrow_name);
0080 auto upstream_port_id = upstream_arrow.GetPortIndex(upstream_port_name);
0081 auto& downstream_arrow = *arrow_lookup.at(downstream_arrow_name);
0082 auto downstream_port_id = downstream_arrow.GetPortIndex(downstream_port_name);
0083
0084 Connect(&upstream_arrow, upstream_port_id,
0085 &downstream_arrow, downstream_port_id);
0086 }
0087
0088 std::string JTopologyBuilder::PrintTopology() {
0089 JTablePrinter t;
0090 t.AddColumn("Arrow", JTablePrinter::Justify::Left, 0);
0091 t.AddColumn("Parallel", JTablePrinter::Justify::Center, 0);
0092 t.AddColumn("Port", JTablePrinter::Justify::Left, 0);
0093 t.AddColumn("Place", JTablePrinter::Justify::Left, 0);
0094 t.AddColumn("ID", JTablePrinter::Justify::Left, 0);
0095
0096
0097 int i = 0;
0098 std::map<void*, int> lookup;
0099 for (JEventQueue* queue : queues) {
0100 lookup[queue] = i;
0101 i += 1;
0102 }
0103
0104 for (JEventPool* pool : pools) {
0105 lookup[pool] = i;
0106 i += 1;
0107 }
0108
0109
0110 bool show_row = true;
0111
0112 for (JArrow* arrow : arrows) {
0113
0114 show_row = true;
0115 for (auto& port : arrow->m_ports) {
0116 if (show_row) {
0117 t | arrow->GetName();
0118 t | arrow->IsParallel();
0119 show_row = false;
0120 }
0121 else {
0122 t | "" | "" ;
0123 }
0124 auto place_index = lookup[(port->GetQueue()!=nullptr) ? (void*) port->GetQueue() : (void*) port->GetPool()];
0125
0126 t | port->GetName();
0127 t | ((port->GetQueue() != nullptr) ? "Queue ": "Pool");
0128 t | place_index;
0129 }
0130 }
0131 return t.Render();
0132 }
0133
0134
0135
0136
0137
0138
0139 void JTopologyBuilder::SetConfigureFn(std::function<void(JTopologyBuilder&, JComponentManager&)> configure_fn) {
0140 m_configure_topology = std::move(configure_fn);
0141 }
0142
0143 void JTopologyBuilder::CreateTopology() {
0144 mapping.initialize(static_cast<JProcessorMapping::AffinityStrategy>(m_affinity),
0145 static_cast<JProcessorMapping::LocalityStrategy>(m_locality));
0146
0147 if (m_configure_topology) {
0148 m_configure_topology(*this, *m_components);
0149 LOG_WARN(GetLogger()) << "Found custom topology configurator! Modified arrow topology is: \n" << PrintTopology() << LOG_END;
0150 }
0151 else {
0152 AttachLevel(JEventLevel::Run, nullptr, nullptr);
0153 LOG_INFO(GetLogger()) << "Arrow topology is:\n" << PrintTopology() << LOG_END;
0154 }
0155 for (auto* arrow : arrows) {
0156 arrow->SetLogger(GetLogger());
0157 }
0158
0159
0160
0161
0162 bool need_ordering = false;
0163 for (auto* queue : queues) {
0164 if (queue->GetEnforcesOrdering()) {
0165 need_ordering = true;
0166 }
0167 }
0168 if (!need_ordering) {
0169 for (auto* queue : queues) {
0170 queue->SetEstablishesOrdering(false);
0171 }
0172 }
0173 size_t i = 0;
0174 for (auto* queue: queues) {
0175 LOG_DEBUG(GetLogger()) << "Queue " << i++ << ": establishes_ordering: " << queue->GetEstablishesOrdering() << ", enforces_ordering: " << queue->GetEnforcesOrdering();
0176 }
0177 }
0178
0179
0180 void JTopologyBuilder::Init() {
0181
0182 m_components = GetApplication()->GetService<JComponentManager>();
0183
0184
0185
0186 size_t nthreads = 1;
0187 if (m_params->Exists("nthreads")) {
0188 if (m_params->GetParameterValue<std::string>("nthreads") == "Ncores") {
0189 nthreads = JCpuInfo::GetNumCpus();
0190 } else {
0191 nthreads = m_params->GetParameterValue<int>("nthreads");
0192 }
0193 }
0194
0195 m_max_inflight_events[JEventLevel::Run] = m_params->RegisterParameter("jana:max_inflight_runs", nthreads,
0196 "The number of runs which may be in-flight at once.");
0197
0198 m_max_inflight_events[JEventLevel::Subrun] = m_params->RegisterParameter("jana:max_inflight_subruns", nthreads,
0199 "The number of subruns which may be in-flight at once.");
0200
0201 m_max_inflight_events[JEventLevel::Timeslice] = m_params->RegisterParameter("jana:max_inflight_timeslices", nthreads,
0202 "The number of timeslices which may be in-flight at once.");
0203
0204 m_max_inflight_events[JEventLevel::Block] = m_params->RegisterParameter("jana:max_inflight_blocks", nthreads,
0205 "The number of blocks which may be in-flight at once.");
0206
0207 m_max_inflight_events[JEventLevel::SlowControls] = m_params->RegisterParameter("jana:max_inflight_slowcontrols", nthreads,
0208 "The number of slow control events which may be in-flight at once.");
0209
0210 m_max_inflight_events[JEventLevel::PhysicsEvent] = m_params->RegisterParameter("jana:max_inflight_events", nthreads,
0211 "The number of physics events which may be in-flight at once. Should be at least `nthreads` to prevent starvation; more gives better load balancing.");
0212
0213 m_max_inflight_events[JEventLevel::Subevent] = m_params->RegisterParameter("jana:max_inflight_subevents", 4*nthreads,
0214 "The number of subevents which may be in-flight at once.");
0215
0216 m_max_inflight_events[JEventLevel::Task] = m_params->RegisterParameter("jana:max_inflight_tasks", 8*nthreads,
0217 "The number of tasks which may be in-flight at once.");
0218
0219
0220
0221
0222
0223
0224 m_params->SetDefaultParameter("jana:affinity", m_affinity,
0225 "Constrain worker thread CPU affinity. 0=Let the OS decide. 1=Avoid extra memory movement at the expense of using hyperthreads. 2=Avoid hyperthreads at the expense of extra memory movement")
0226 ->SetIsAdvanced(true);
0227 m_params->SetDefaultParameter("jana:locality", m_locality,
0228 "Constrain memory locality. 0=No constraint. 1=Events stay on the same socket. 2=Events stay on the same NUMA domain. 3=Events stay on same core. 4=Events stay on same cpu/hyperthread.")
0229 ->SetIsAdvanced(true);
0230 };
0231
0232
0233 void JTopologyBuilder::Connect(JArrow* upstream, size_t upstream_port_id, JArrow* downstream, size_t downstream_port_id) {
0234
0235 JArrow::Port& upstream_port = upstream->GetPort(upstream_port_id);
0236 JArrow::Port& downstream_port = downstream->GetPort(downstream_port_id);
0237
0238 LOG_DEBUG(GetLogger()) << "Connecting arrows: " << upstream->GetName() << ":" << upstream_port.GetName() << " --> " << downstream->GetName() << ":" << downstream_port.GetName();
0239
0240
0241 if (upstream_port.GetQueue() != nullptr) {
0242 throw JException("Upstream port '%s' on arrow '%s' already has a queue", upstream_port.GetName().c_str(), upstream->GetName().c_str());
0243 }
0244
0245
0246 for (auto level: upstream_port.GetLevels()) {
0247 bool level_found = false;
0248 for (auto downstream_level : downstream_port.GetLevels()) {
0249 if (downstream_level == level) {
0250 level_found = true;
0251 break;
0252 }
0253 }
0254 if (!level_found) {
0255 LOG_FATAL(GetLogger()) << "Level " << toString(level) << " produced upstream but not accepted downstream: "
0256 << upstream->GetName() << ":" << upstream_port.GetName() << " --> " << downstream->GetName() << ":" << downstream_port.GetName();
0257 throw JException("Level produced upstream but not accepted downstream");
0258 }
0259 }
0260
0261 JEventQueue* queue = nullptr;
0262 if (downstream_port.GetQueue() != nullptr) {
0263 queue = downstream_port.GetQueue();
0264 }
0265 else {
0266
0267 size_t queue_capacity = 0;
0268 for (auto level : downstream_port.GetLevels()) {
0269 queue_capacity += m_max_inflight_events[level];
0270 }
0271 queue = new JEventQueue(queue_capacity, mapping.get_loc_count());
0272 queues.push_back(queue);
0273 downstream_port.Attach(queue);
0274 }
0275
0276
0277
0278 upstream_port.Attach(queue);
0279
0280 if (downstream_port.GetEnforcesOrdering()) {
0281 queue->SetEnforcesOrdering();
0282 }
0283 if (upstream_port.GetEstablishesOrdering()) {
0284 queue->SetEstablishesOrdering(true);
0285 }
0286 }
0287
0288
0289 void JTopologyBuilder::ConnectToFirstAvailable(JArrow* upstream, size_t upstream_port, std::vector<std::pair<JArrow*, size_t>> downstreams) {
0290 for (auto& [downstream, downstream_port_id] : downstreams) {
0291 if (downstream != nullptr) {
0292 Connect(upstream, upstream_port, downstream, downstream_port_id);
0293 return;
0294 }
0295 }
0296 }
0297
0298 std::pair<JTapArrow*, JTapArrow*> JTopologyBuilder::CreateTapChain(std::vector<JEventProcessor*>& procs, std::string level) {
0299
0300 JTapArrow* first = nullptr;
0301 JTapArrow* last = nullptr;
0302
0303 int i=1;
0304 std::string arrow_name = level + "Tap";
0305 for (JEventProcessor* proc : procs) {
0306 if (procs.size() > 1) {
0307 arrow_name += std::to_string(i++);
0308 }
0309 JTapArrow* current = new JTapArrow(arrow_name, proc->GetLevel());
0310 current->AddProcessor(proc);
0311 arrows.push_back(current);
0312 if (first == nullptr) {
0313 first = current;
0314 }
0315 if (last != nullptr) {
0316 Connect(last, JTapArrow::EVENT_OUT, current, JTapArrow::EVENT_IN);
0317 }
0318 last = current;
0319 }
0320 return {first, last};
0321 }
0322
0323
0324 void JTopologyBuilder::AttachLevel(JEventLevel current_level, JUnfoldArrow* parent_unfolder, JFoldArrow* parent_folder) {
0325 std::stringstream ss;
0326 ss << current_level;
0327 auto level_str = ss.str();
0328
0329
0330 std::vector<JEventSource*> sources_at_level;
0331 for (JEventSource* source : m_components->get_evt_srces()) {
0332 if (source->GetLevel() == current_level && source->IsEnabled()) {
0333 sources_at_level.push_back(source);
0334 }
0335 }
0336
0337
0338 std::vector<JEventUnfolder*> unfolders_at_level;
0339 for (JEventUnfolder* unfolder : m_components->get_unfolders()) {
0340 if (unfolder->GetLevel() == current_level && unfolder->IsEnabled()) {
0341 unfolders_at_level.push_back(unfolder);
0342 }
0343 }
0344
0345
0346 std::vector<JEventProcessor*> mappable_procs_at_level;
0347 std::vector<JEventProcessor*> tappable_procs_at_level;
0348
0349 for (JEventProcessor* proc : m_components->get_evt_procs()) {
0350
0351 if (proc->GetLevel() == current_level && proc->IsEnabled()) {
0352
0353
0354
0355
0356
0357 if (proc->GetCallbackStyle() == JEventProcessor::CallbackStyle::LegacyMode && proc->IsOrderingEnabled()) {
0358 throw JException("%s: Ordering can only be used with non-legacy JEventProcessors", proc->GetTypeName().c_str());
0359 }
0360
0361 mappable_procs_at_level.push_back(proc);
0362 if (proc->GetCallbackStyle() != JEventProcessor::CallbackStyle::LegacyMode) {
0363 tappable_procs_at_level.push_back(proc);
0364 }
0365 }
0366 }
0367
0368
0369 bool is_top_level = (parent_unfolder == nullptr);
0370 if (is_top_level && sources_at_level.size() == 0) {
0371
0372 LOG_TRACE(GetLogger()) << "JTopologyBuilder: No sources found at level " << current_level << ", skipping" << LOG_END;
0373 JEventLevel next = next_level(current_level);
0374 if (next == JEventLevel::None) {
0375 LOG_WARN(GetLogger()) << "No sources found: Processing topology will be empty." << LOG_END;
0376 return;
0377 }
0378 return AttachLevel(next, nullptr, nullptr);
0379 }
0380
0381
0382 if (!is_top_level && !sources_at_level.empty()) {
0383 throw JException("Topology forbids event sources at lower event levels in the topology");
0384 }
0385 if ((parent_unfolder == nullptr && parent_folder != nullptr) || (parent_unfolder != nullptr && parent_folder == nullptr)) {
0386 throw JException("Topology requires matching unfolder/folder arrow pairs");
0387 }
0388 if (unfolders_at_level.size() > 1) {
0389 throw JException("Multiple JEventUnfolders provided for level %s", level_str.c_str());
0390 }
0391
0392
0393
0394
0395
0396
0397
0398
0399 LOG_INFO(GetLogger()) << "Creating event pool with level=" << current_level << " and size=" << m_max_inflight_events[current_level];
0400 JEventPool* pool_at_level = new JEventPool(m_components, m_max_inflight_events[current_level], m_location_count, current_level);
0401 pools.push_back(pool_at_level);
0402 LOG_INFO(GetLogger()) << "Finished creating event pool";
0403
0404
0405
0406
0407 JSourceArrow* src_arrow = nullptr;
0408 bool need_source = !sources_at_level.empty();
0409 if (need_source) {
0410 src_arrow = new JSourceArrow(level_str+"Source", current_level, sources_at_level);
0411 src_arrow->GetPort(JSourceArrow::EVENT_IN).Attach(pool_at_level);
0412 src_arrow->GetPort(JSourceArrow::EVENT_OUT).Attach(pool_at_level);
0413 arrows.push_back(src_arrow);
0414 }
0415
0416
0417
0418
0419 bool have_parallel_sources = false;
0420 for (JEventSource* source: sources_at_level) {
0421 have_parallel_sources |= source->IsProcessParallelEnabled();
0422 }
0423 bool have_unfolder = !unfolders_at_level.empty();
0424 JMapArrow* map1_arrow = nullptr;
0425 bool need_map1 = (have_parallel_sources || have_unfolder);
0426
0427 if (need_map1) {
0428 map1_arrow = new JMapArrow(level_str+"Map1", current_level);
0429 for (JEventSource* source: sources_at_level) {
0430 if (source->IsProcessParallelEnabled()) {
0431 map1_arrow->AddSource(source);
0432 }
0433 }
0434 for (JEventUnfolder* unf: unfolders_at_level) {
0435 map1_arrow->AddUnfolder(unf);
0436 }
0437 map1_arrow->GetPort(JMapArrow::EVENT_IN).Attach(pool_at_level);
0438 map1_arrow->GetPort(JMapArrow::EVENT_OUT).Attach(pool_at_level);
0439 arrows.push_back(map1_arrow);
0440 }
0441
0442
0443
0444
0445 JUnfoldArrow* unfold_arrow = nullptr;
0446 bool need_unfold = have_unfolder;
0447 if (need_unfold) {
0448 unfold_arrow = new JUnfoldArrow(level_str+"Unfold", unfolders_at_level[0]);
0449 unfold_arrow->GetPort(JUnfoldArrow::REJECTED_PARENT_OUT).Attach(pool_at_level);
0450 arrows.push_back(unfold_arrow);
0451 }
0452
0453
0454
0455
0456 JFoldArrow* fold_arrow = nullptr;
0457 bool need_fold = have_unfolder;
0458 if(need_fold) {
0459 fold_arrow = new JFoldArrow(level_str+"Fold", current_level, unfolders_at_level[0]->GetChildLevel());
0460 arrows.push_back(fold_arrow);
0461 fold_arrow->GetPort(JFoldArrow::PARENT_OUT).Attach(pool_at_level);
0462 }
0463
0464
0465
0466
0467 JMapArrow* map2_arrow = nullptr;
0468 bool need_map2 = !mappable_procs_at_level.empty();
0469 if (need_map2) {
0470 map2_arrow = new JMapArrow(level_str+"Map2", current_level);
0471 for (JEventProcessor* proc : mappable_procs_at_level) {
0472 map2_arrow->AddProcessor(proc);
0473 map2_arrow->GetPort(JMapArrow::EVENT_IN).Attach(pool_at_level);
0474 map2_arrow->GetPort(JMapArrow::EVENT_OUT).Attach(pool_at_level);
0475 }
0476 arrows.push_back(map2_arrow);
0477 }
0478
0479
0480
0481
0482 JTapArrow* first_tap_arrow = nullptr;
0483 JTapArrow* last_tap_arrow = nullptr;
0484 bool need_tap = !tappable_procs_at_level.empty();
0485 if (need_tap) {
0486 std::tie(first_tap_arrow, last_tap_arrow) = CreateTapChain(tappable_procs_at_level, level_str);
0487 first_tap_arrow->GetPort(JTapArrow::EVENT_IN).Attach(pool_at_level);
0488 last_tap_arrow->GetPort(JTapArrow::EVENT_OUT).Attach(pool_at_level);
0489 }
0490
0491
0492
0493
0494
0495
0496 if (parent_unfolder != nullptr) {
0497 parent_unfolder->GetPort(JUnfoldArrow::CHILD_IN).Attach(pool_at_level);
0498 ConnectToFirstAvailable(parent_unfolder, JUnfoldArrow::CHILD_OUT,
0499 {{map1_arrow, JMapArrow::EVENT_IN}, {unfold_arrow, JUnfoldArrow::PARENT_IN}, {map2_arrow, JMapArrow::EVENT_IN}, {first_tap_arrow, JTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}});
0500 }
0501 if (src_arrow != nullptr) {
0502 ConnectToFirstAvailable(src_arrow, JSourceArrow::EVENT_OUT,
0503 {{map1_arrow, JMapArrow::EVENT_IN}, {unfold_arrow, JUnfoldArrow::PARENT_IN}, {map2_arrow, JMapArrow::EVENT_IN}, {first_tap_arrow, JTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}});
0504 }
0505 if (map1_arrow != nullptr) {
0506 ConnectToFirstAvailable(map1_arrow, JMapArrow::EVENT_OUT,
0507 {{unfold_arrow, JUnfoldArrow::PARENT_IN}, {map2_arrow, JMapArrow::EVENT_IN}, {first_tap_arrow, JTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}});
0508 }
0509 if (unfold_arrow != nullptr) {
0510 ConnectToFirstAvailable(unfold_arrow, JUnfoldArrow::REJECTED_PARENT_OUT,
0511 {{map2_arrow, JMapArrow::EVENT_IN}, {first_tap_arrow, JTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}});
0512 }
0513 if (fold_arrow != nullptr) {
0514 ConnectToFirstAvailable(fold_arrow, JFoldArrow::PARENT_OUT,
0515 {{map2_arrow, JMapArrow::EVENT_IN}, {first_tap_arrow, JTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}});
0516 }
0517 if (map2_arrow != nullptr) {
0518 ConnectToFirstAvailable(map2_arrow, JMapArrow::EVENT_OUT,
0519 {{first_tap_arrow, JTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}});
0520 }
0521 if (last_tap_arrow != nullptr) {
0522 ConnectToFirstAvailable(last_tap_arrow, JTapArrow::EVENT_OUT,
0523 {{parent_folder, JFoldArrow::CHILD_IN}});
0524 }
0525 if (parent_folder != nullptr) {
0526 parent_folder->GetPort(JFoldArrow::CHILD_OUT).Attach(pool_at_level);
0527 }
0528
0529
0530 if (need_unfold) {
0531 auto next_level = unfolders_at_level[0]->GetChildLevel();
0532 AttachLevel(next_level, unfold_arrow, fold_arrow);
0533 }
0534 else {
0535
0536
0537 if (last_tap_arrow != nullptr) {
0538 last_tap_arrow->SetIsSink(true);
0539 }
0540 else if (map2_arrow != nullptr) {
0541 map2_arrow->SetIsSink(true);
0542 }
0543 else if (map1_arrow != nullptr) {
0544 map1_arrow->SetIsSink(true);
0545 }
0546 else if (src_arrow != nullptr) {
0547 src_arrow->SetIsSink(true);
0548 }
0549 }
0550 }
0551
0552
0553