File indexing completed on 2026-06-01 08:43:34
0001
0002
0003
0004
0005 #include "JTopologyBuilder.h"
0006
0007 #include <string>
0008 #include <vector>
0009
0010 #include "JANA/Topology/JArrow.h"
0011 #include "JANA/Utils/JEventLevel.h"
0012 #include "JSourceArrow.h"
0013 #include "JMultilevelSourceArrow.h"
0014 #include "JMapArrow.h"
0015 #include "JTapArrow.h"
0016 #include "JUnfoldArrow.h"
0017 #include "JFoldArrow.h"
0018 #include <JANA/JEventProcessor.h>
0019 #include <JANA/Services/JComponentManager.h>
0020 #include <JANA/Utils/JTablePrinter.h>
0021
0022
0023 JTopologyBuilder::JTopologyBuilder() {
0024 SetPrefix("jana");
0025 }
0026
0027 JTopologyBuilder::~JTopologyBuilder() {
0028 for (auto arrow : arrows) {
0029 delete arrow;
0030 }
0031 for (auto queue : queues) {
0032 delete queue;
0033 }
0034 for (auto pool : pools) {
0035 delete pool;
0036 }
0037 }
0038
0039 void JTopologyBuilder::AddArrow(JArrow* arrow) {
0040 arrows.push_back(arrow);
0041 arrow->SetId(arrows.size()-1);
0042 auto it = arrow_lookup.find(arrow->GetName());
0043 if (it != arrow_lookup.end()) {
0044 throw JException("AddArrow(): Arrow with name '%s' has already been added", arrow->GetName().c_str());
0045 }
0046 arrow_lookup[arrow->GetName()] = arrow;
0047 }
0048
0049 JArrow* JTopologyBuilder::GetArrow(const std::string& arrow_name) {
0050 auto it = arrow_lookup.find(arrow_name);
0051 if (it == arrow_lookup.end()) {
0052 return nullptr;
0053 }
0054 return it->second;
0055 }
0056
0057 JEventPool* JTopologyBuilder::GetOrCreatePool(JEventLevel level) {
0058 auto pool_it = pool_lookup.find(level);
0059 if (pool_it != pool_lookup.end()) {
0060 return pool_it->second;
0061 }
0062 else {
0063 auto* pool = new JEventPool(m_components, m_max_inflight_events[level], m_location_count, level);
0064 pools.push_back(pool);
0065 pool_lookup[level] = pool;
0066 return pool;
0067 }
0068 }
0069
0070 void JTopologyBuilder::ConnectPool(std::string arrow_name, std::string port_name, JEventLevel level) {
0071 auto& arrow = *arrow_lookup.at(arrow_name);
0072 auto port_index = arrow.GetPortIndex(port_name);
0073 auto& port = arrow.GetPort(port_index);
0074 auto* pool = GetOrCreatePool(level);
0075 port.Attach(pool);
0076 }
0077
0078 void JTopologyBuilder::ConnectPool(JEventLevel upstream_level, JEventLevel downstream_level) {
0079 auto* upstream_pool = GetOrCreatePool(upstream_level);
0080 auto* downstream_pool = GetOrCreatePool(downstream_level);
0081 upstream_pool->AttachForwardingPool(downstream_pool);
0082 }
0083
0084 void JTopologyBuilder::ConnectQueue(std::string upstream_arrow_name,
0085 std::string upstream_port_name,
0086 std::string downstream_arrow_name,
0087 std::string downstream_port_name) {
0088
0089 auto& upstream_arrow = *arrow_lookup.at(upstream_arrow_name);
0090 auto upstream_port_id = upstream_arrow.GetPortIndex(upstream_port_name);
0091 auto& downstream_arrow = *arrow_lookup.at(downstream_arrow_name);
0092 auto downstream_port_id = downstream_arrow.GetPortIndex(downstream_port_name);
0093
0094 Connect(&upstream_arrow, upstream_port_id,
0095 &downstream_arrow, downstream_port_id);
0096 }
0097
0098 std::string JTopologyBuilder::PrintTopology() {
0099 JTablePrinter t;
0100 t.AddColumn("Arrow", JTablePrinter::Justify::Left, 0);
0101 t.AddColumn("Parallel", JTablePrinter::Justify::Center, 0);
0102 t.AddColumn("Port", JTablePrinter::Justify::Left, 0);
0103 t.AddColumn("Place", JTablePrinter::Justify::Left, 0);
0104 t.AddColumn("ID", JTablePrinter::Justify::Left, 0);
0105 t.AddColumn("Order", JTablePrinter::Justify::Left, 0);
0106
0107
0108 int i = 0;
0109 std::map<void*, int> lookup;
0110
0111 for (JEventPool* pool : pools) {
0112 lookup[pool] = i;
0113 i += 1;
0114 }
0115 for (JEventQueue* queue : queues) {
0116 lookup[queue] = i;
0117 i += 1;
0118 }
0119
0120
0121 bool show_row = true;
0122
0123 for (JArrow* arrow : arrows) {
0124
0125 show_row = true;
0126 for (auto& port : arrow->m_ports) {
0127 if (show_row) {
0128 t | arrow->GetName();
0129 t | arrow->IsParallel();
0130 show_row = false;
0131 }
0132 else {
0133 t | "" | "" ;
0134 }
0135 auto place_index = lookup[(port->GetQueue()!=nullptr) ? (void*) port->GetQueue() : (void*) port->GetPool()];
0136
0137 t | port->GetName();
0138 t | ((port->GetQueue() != nullptr) ? "Queue ": "Pool");
0139 t | place_index;
0140 if (port->GetEnforcesOrdering() && port->GetEstablishesOrdering()) {
0141 t | "Both";
0142 }
0143 else if (port->GetEnforcesOrdering()) {
0144 t | "Enf";
0145 }
0146 else if (port->GetEstablishesOrdering()) {
0147 t | "Est";
0148 }
0149 else {
0150 t | "";
0151 }
0152 }
0153 }
0154 return t.Render();
0155 }
0156
0157
0158
0159
0160
0161
0162 void JTopologyBuilder::SetConfigureFn(std::function<void(JTopologyBuilder&, JComponentManager&)> configure_fn) {
0163 m_configure_topology = std::move(configure_fn);
0164 }
0165
0166 void JTopologyBuilder::CreateTopology() {
0167 mapping.initialize(static_cast<JProcessorMapping::AffinityStrategy>(m_affinity),
0168 static_cast<JProcessorMapping::LocalityStrategy>(m_locality));
0169
0170 if (m_configure_topology) {
0171 m_configure_topology(*this, *m_components);
0172 LOG_INFO(GetLogger()) << "Using custom topology configuration function" << LOG_END;
0173 }
0174 else {
0175 CreateTopologyFromScratch();
0176 }
0177 for (auto* arrow : arrows) {
0178 arrow->SetLogger(GetLogger());
0179 }
0180
0181
0182
0183
0184 bool need_ordering = false;
0185 for (auto* queue : queues) {
0186 if (queue->GetEnforcesOrdering()) {
0187 need_ordering = true;
0188 }
0189 }
0190 if (!need_ordering) {
0191 for (auto* queue : queues) {
0192 queue->SetEstablishesOrdering(false);
0193 }
0194 }
0195 LOG_INFO(GetLogger()) << "Arrow topology is:\n" << PrintTopology() << LOG_END;
0196 }
0197
0198 void JTopologyBuilder::CreateTopologyFromScratch() {
0199
0200 enum class Column { Source, UnfoldAbove, BatchBefore, UnfoldBelow, FoldBelow, BatchAfter, Tap, FoldAbove};
0201 std::vector<Column> columns = { Column::Source, Column::UnfoldAbove, Column::BatchBefore, Column::UnfoldBelow, Column::FoldBelow, Column::BatchAfter, Column::Tap, Column::FoldAbove};
0202 struct Cell {
0203 JArrow* start = nullptr;
0204 JArrow* end = nullptr;
0205 };
0206
0207 std::map<std::pair<JEventLevel, Column>, Cell> grid;
0208
0209
0210
0211
0212
0213 int map_counter = 1;
0214 std::set<JEventLevel> levels_present;
0215
0216
0217
0218 std::map<JEventLevel, std::vector<JEventSource*>> sources;
0219 for (JEventSource* source : m_components->GetSources()) {
0220 if (source->IsEnabled()) {
0221 sources[source->GetLevel()].push_back(source);
0222 levels_present.insert(source->GetLevel());
0223 }
0224 }
0225 for (auto& it : sources) {
0226 auto level = it.first;
0227 auto level_str = toString(level);
0228 bool need_map = false;
0229 bool need_multi_arrow = false;
0230 for (auto* source : it.second) {
0231 need_map |= source->IsProcessParallelEnabled();
0232 need_multi_arrow |= (source->GetParentLevels().size() > 0);
0233 }
0234
0235 if (need_multi_arrow && sources.size() > 1) {
0236 throw JException("Multiple multilevel JEventSources not supported yet");
0237 }
0238
0239 JArrow* src_arrow;
0240 if (need_multi_arrow) {
0241 src_arrow = new JMultilevelSourceArrow(level_str+"MultiSource", it.second.at(0));
0242
0243 for (auto parent_level: it.second.at(0)->GetParentLevels()) {
0244 levels_present.insert(parent_level);
0245 grid[{parent_level, Column::Source}] = {src_arrow, src_arrow};
0246 }
0247 }
0248 else {
0249 src_arrow = new JSourceArrow(level_str+"Source", level, it.second);
0250 }
0251 AddArrow(src_arrow);
0252
0253 if (need_map) {
0254 auto* map_arrow = new JMapArrow(toString(level)+"Map"+std::to_string(map_counter++), level);
0255 map_arrow->SetParallelSource(true);
0256 AddArrow(map_arrow);
0257 Connect(src_arrow, 1, map_arrow, 0);
0258 grid[{level, Column::Source}] = {src_arrow, map_arrow};
0259 }
0260 else {
0261 grid[{level, Column::Source}] = {src_arrow, src_arrow};
0262 }
0263 }
0264
0265
0266
0267 for (auto* unfolder: m_components->GetUnfolders()) {
0268
0269 if (!unfolder->IsEnabled()) continue;
0270
0271
0272
0273 auto parent_level = unfolder->GetLevel();
0274 auto child_level = unfolder->GetChildLevel();
0275 levels_present.insert(parent_level);
0276 levels_present.insert(child_level);
0277
0278 auto* map_arrow = new JMapArrow(toString(parent_level)+"Map"+std::to_string(map_counter++), parent_level);
0279 auto* unfold_arrow = new JUnfoldArrow(toString(child_level)+"Unfold", unfolder);
0280 map_arrow->AddUnfolder(unfolder);
0281 AddArrow(map_arrow);
0282 AddArrow(unfold_arrow);
0283 Connect(map_arrow, map_arrow->EVENT_OUT, unfold_arrow, unfold_arrow->PARENT_IN);
0284
0285 if (grid.find({parent_level, Column::UnfoldBelow}) != grid.end()) {
0286 throw JException("Only one unfolder allowed for parent level=%s", toString(parent_level).c_str());
0287 }
0288 if (grid.find({child_level, Column::UnfoldAbove}) != grid.end()) {
0289 throw JException("Only one unfolder allowed for child level=%s", toString(child_level).c_str());
0290 }
0291 grid[{parent_level, Column::UnfoldBelow}] = {map_arrow, unfold_arrow};
0292 grid[{child_level, Column::UnfoldAbove}] = {unfold_arrow, unfold_arrow};
0293 }
0294
0295
0296
0297 for (auto* folder: m_components->GetFolders()) {
0298
0299 if (!folder->IsEnabled()) continue;
0300
0301
0302
0303 auto parent_level = folder->GetLevel();
0304 auto child_level = folder->GetChildLevel();
0305 levels_present.insert(parent_level);
0306 levels_present.insert(child_level);
0307
0308 auto* map_arrow = new JMapArrow(toString(child_level)+"Map"+std::to_string(map_counter++), parent_level);
0309 auto* fold_arrow = new JFoldArrow(toString(parent_level)+"Fold", parent_level, child_level);
0310 fold_arrow->SetFolder(folder);
0311 map_arrow->AddFolder(folder);
0312 AddArrow(map_arrow);
0313 AddArrow(fold_arrow);
0314 Connect(map_arrow, map_arrow->EVENT_OUT, fold_arrow, fold_arrow->CHILD_IN);
0315
0316 if (grid.find({parent_level, Column::FoldBelow}) != grid.end()) {
0317 throw JException("Only one folder allowed for parent level=%s", toString(parent_level).c_str());
0318 }
0319 if (grid.find({child_level, Column::FoldAbove}) != grid.end()) {
0320 throw JException("Only one folder allowed for child level=%s", toString(child_level).c_str());
0321 }
0322 grid[{parent_level, Column::FoldBelow}] = {fold_arrow, fold_arrow};
0323 grid[{child_level, Column::FoldAbove}] = {map_arrow, fold_arrow};
0324 }
0325
0326
0327
0328 std::map<JEventLevel, std::vector<JEventProcessor*>> mappable_processors;
0329 std::map<JEventLevel, std::vector<JEventProcessor*>> tappable_processors;
0330 for (auto* proc : m_components->GetProcessors()) {
0331 if (proc->IsEnabled()) {
0332 levels_present.insert(proc->GetLevel());
0333 if (proc->GetCallbackStyle() == JEventProcessor::CallbackStyle::LegacyMode && proc->IsOrderingEnabled()) {
0334 throw JException("%s: Ordering can only be used with non-legacy JEventProcessors", proc->GetTypeName().c_str());
0335 }
0336 mappable_processors[proc->GetLevel()].push_back(proc);
0337 if (proc->GetCallbackStyle() != JEventProcessor::CallbackStyle::LegacyMode) {
0338 tappable_processors[proc->GetLevel()].push_back(proc);
0339 }
0340 }
0341 }
0342 for (auto it : mappable_processors) {
0343 auto level = it.first;
0344 auto level_str = toString(level);
0345 auto* map_arrow = new JMapArrow(level_str+"Map"+std::to_string(map_counter++), level);
0346 for (JEventProcessor* proc : it.second) {
0347 map_arrow->AddProcessor(proc);
0348 }
0349 AddArrow(map_arrow);
0350
0351 auto tappable_procs_it = tappable_processors.find(level);
0352 if (tappable_procs_it != tappable_processors.end()) {
0353 JArrow* first_tap_arrow = nullptr;
0354 JArrow* last_tap_arrow = nullptr;
0355 std::tie(first_tap_arrow, last_tap_arrow) = CreateTapChain(it.second, level_str);
0356 Connect(map_arrow, map_arrow->EVENT_OUT, first_tap_arrow, JTapArrow::EVENT_IN);
0357 grid[{level, Column::Tap}] = {map_arrow, last_tap_arrow};
0358 }
0359 else {
0360
0361 grid[{level, Column::Tap}] = {map_arrow, map_arrow};
0362 }
0363 }
0364
0365
0366
0367
0368
0369 for (JEventLevel level : levels_present) {
0370
0371 auto* pool = GetOrCreatePool(level);
0372 JArrow* last_arrow = nullptr;
0373 for (auto column : columns) {
0374 auto it = grid.find({level, column});
0375 if (it == grid.end()) { continue; }
0376
0377 JArrow* current_arrow = it->second.start;
0378 if (last_arrow == nullptr) {
0379
0380 auto port_index = current_arrow->GetPortIndex(level, JArrow::PortDirection::In);
0381 current_arrow->GetPort(port_index).Attach(pool);
0382 }
0383 else {
0384 Connect(last_arrow,
0385 last_arrow->GetPortIndex(level, JArrow::PortDirection::Out),
0386 current_arrow,
0387 current_arrow->GetPortIndex(level, JArrow::PortDirection::In));
0388 }
0389 last_arrow = it->second.end;
0390
0391 }
0392
0393 auto port_index = last_arrow->GetPortIndex(level, JArrow::PortDirection::Out);
0394 if (level == JEventLevel::PhysicsEvent) {
0395 last_arrow->SetIsSink(true);
0396 }
0397 last_arrow->GetPort(port_index).Attach(pool);
0398 }
0399
0400
0401
0402
0403
0404
0405 for (auto outer_level : levels_present) {
0406 for (auto inner_level : levels_present) {
0407 if (outer_level != inner_level) {
0408 ConnectPool(outer_level, inner_level);
0409 }
0410 }
0411 }
0412 }
0413
0414
0415 void JTopologyBuilder::Init() {
0416
0417 m_components = GetApplication()->GetService<JComponentManager>();
0418
0419
0420
0421 size_t nthreads = 1;
0422 if (m_params->Exists("nthreads")) {
0423 if (m_params->GetParameterValue<std::string>("nthreads") == "Ncores") {
0424 nthreads = JCpuInfo::GetNumCpus();
0425 } else {
0426 nthreads = m_params->GetParameterValue<int>("nthreads");
0427 }
0428 }
0429
0430 m_max_inflight_events[JEventLevel::Run] = m_params->RegisterParameter("jana:max_inflight_runs", nthreads,
0431 "The number of runs which may be in-flight at once.");
0432
0433 m_max_inflight_events[JEventLevel::Subrun] = m_params->RegisterParameter("jana:max_inflight_subruns", nthreads,
0434 "The number of subruns which may be in-flight at once.");
0435
0436 m_max_inflight_events[JEventLevel::Timeslice] = m_params->RegisterParameter("jana:max_inflight_timeslices", nthreads,
0437 "The number of timeslices which may be in-flight at once.");
0438
0439 m_max_inflight_events[JEventLevel::Block] = m_params->RegisterParameter("jana:max_inflight_blocks", nthreads,
0440 "The number of blocks which may be in-flight at once.");
0441
0442 m_max_inflight_events[JEventLevel::SlowControls] = m_params->RegisterParameter("jana:max_inflight_slowcontrols", nthreads,
0443 "The number of slow control events which may be in-flight at once.");
0444
0445 m_max_inflight_events[JEventLevel::PhysicsEvent] = m_params->RegisterParameter("jana:max_inflight_events", nthreads,
0446 "The number of physics events which may be in-flight at once. Should be at least `nthreads` to prevent starvation; more gives better load balancing.");
0447
0448 m_max_inflight_events[JEventLevel::Subevent] = m_params->RegisterParameter("jana:max_inflight_subevents", 4*nthreads,
0449 "The number of subevents which may be in-flight at once.");
0450
0451 m_max_inflight_events[JEventLevel::Task] = m_params->RegisterParameter("jana:max_inflight_tasks", 8*nthreads,
0452 "The number of tasks which may be in-flight at once.");
0453
0454
0455
0456
0457
0458
0459 m_params->SetDefaultParameter("jana:affinity", m_affinity,
0460 "Constrain worker thread CPU affinity. 0=Let the OS decide. 1=Avoid extra memory movement at the expense of using hyperthreads. 2=Avoid hyperthreads at the expense of extra memory movement")
0461 ->SetIsAdvanced(true);
0462 m_params->SetDefaultParameter("jana:locality", m_locality,
0463 "Constrain memory locality. 0=No constraint. 1=Events stay on the same socket. 2=Events stay on the same NUMA domain. 3=Events stay on same core. 4=Events stay on same cpu/hyperthread.")
0464 ->SetIsAdvanced(true);
0465 };
0466
0467
0468 void JTopologyBuilder::Connect(JArrow* upstream, size_t upstream_port_id, JArrow* downstream, size_t downstream_port_id) {
0469
0470 JArrow::Port& upstream_port = upstream->GetPort(upstream_port_id);
0471 JArrow::Port& downstream_port = downstream->GetPort(downstream_port_id);
0472
0473 LOG_DEBUG(GetLogger()) << "Connecting arrows: " << upstream->GetName() << ":" << upstream_port.GetName() << " --> " << downstream->GetName() << ":" << downstream_port.GetName();
0474
0475
0476 if (upstream_port.GetQueue() != nullptr) {
0477 throw JException("Upstream port '%s' on arrow '%s' already has a queue", upstream_port.GetName().c_str(), upstream->GetName().c_str());
0478 }
0479
0480
0481 for (auto level: upstream_port.GetLevels()) {
0482 bool level_found = false;
0483 for (auto downstream_level : downstream_port.GetLevels()) {
0484 if (downstream_level == level) {
0485 level_found = true;
0486 break;
0487 }
0488 }
0489 if (!level_found) {
0490 LOG_FATAL(GetLogger()) << "Level " << toString(level) << " produced upstream but not accepted downstream: "
0491 << upstream->GetName() << ":" << upstream_port.GetName() << " --> " << downstream->GetName() << ":" << downstream_port.GetName();
0492 throw JException("Level produced upstream but not accepted downstream");
0493 }
0494 }
0495
0496 JEventQueue* queue = nullptr;
0497 if (downstream_port.GetQueue() != nullptr) {
0498 queue = downstream_port.GetQueue();
0499 }
0500 else {
0501
0502 size_t queue_capacity = 0;
0503 for (auto level : downstream_port.GetLevels()) {
0504 queue_capacity += m_max_inflight_events[level];
0505 }
0506 queue = new JEventQueue(queue_capacity, mapping.get_loc_count());
0507 queues.push_back(queue);
0508 downstream_port.Attach(queue);
0509 }
0510
0511 upstream_port.Attach(queue);
0512
0513 if (downstream_port.GetEnforcesOrdering()) {
0514 queue->SetEnforcesOrdering();
0515 }
0516 if (upstream_port.GetEstablishesOrdering()) {
0517 queue->SetEstablishesOrdering(true);
0518 }
0519 }
0520
0521
0522 std::pair<JTapArrow*, JTapArrow*> JTopologyBuilder::CreateTapChain(std::vector<JEventProcessor*>& procs, std::string level) {
0523
0524 JTapArrow* first = nullptr;
0525 JTapArrow* last = nullptr;
0526
0527 int i=1;
0528 for (JEventProcessor* proc : procs) {
0529 std::string arrow_name = level + "Tap";
0530 if (procs.size() > 1) {
0531 arrow_name += std::to_string(i++);
0532 }
0533 JTapArrow* current = new JTapArrow(arrow_name, proc->GetLevel());
0534 current->AddProcessor(proc);
0535 AddArrow(current);
0536 if (first == nullptr) {
0537 first = current;
0538 }
0539 if (last != nullptr) {
0540 Connect(last, JTapArrow::EVENT_OUT, current, JTapArrow::EVENT_IN);
0541 }
0542 last = current;
0543 }
0544 return {first, last};
0545 }
0546
0547