File indexing completed on 2025-10-30 09:01:24
0001
0002
0003
0004
0005
0006 #include "JTopologyBuilder.h"
0007
0008 #include "JEventSourceArrow.h"
0009 #include "JEventMapArrow.h"
0010 #include "JEventTapArrow.h"
0011 #include "JUnfoldArrow.h"
0012 #include "JFoldArrow.h"
0013 #include <JANA/JEventProcessor.h>
0014 #include <JANA/Utils/JTablePrinter.h>
0015 #include <string>
0016 #include <vector>
0017
0018
0019 JTopologyBuilder::JTopologyBuilder() {
0020 SetPrefix("jana");
0021 }
0022
0023 JTopologyBuilder::~JTopologyBuilder() {
0024 for (auto arrow : arrows) {
0025 delete arrow;
0026 }
0027 for (auto queue : queues) {
0028 delete queue;
0029 }
0030 for (auto pool : pools) {
0031 delete pool;
0032 }
0033 }
0034
0035 std::string JTopologyBuilder::print_topology() {
0036 JTablePrinter t;
0037 t.AddColumn("Arrow", JTablePrinter::Justify::Left, 0);
0038 t.AddColumn("Parallel", JTablePrinter::Justify::Center, 0);
0039 t.AddColumn("Direction", JTablePrinter::Justify::Left, 0);
0040 t.AddColumn("Place", JTablePrinter::Justify::Left, 0);
0041 t.AddColumn("ID", JTablePrinter::Justify::Left, 0);
0042
0043
0044 int i = 0;
0045 std::map<void*, int> lookup;
0046 for (JEventQueue* queue : queues) {
0047 lookup[queue] = i;
0048 i += 1;
0049 }
0050
0051 for (JEventPool* pool : pools) {
0052 lookup[pool] = i;
0053 i += 1;
0054 }
0055
0056
0057 bool show_row = true;
0058
0059 for (JArrow* arrow : arrows) {
0060
0061 show_row = true;
0062 for (JArrow::Port& port : arrow->m_ports) {
0063 if (show_row) {
0064 t | arrow->get_name();
0065 t | arrow->is_parallel();
0066 show_row = false;
0067 }
0068 else {
0069 t | "" | "" ;
0070 }
0071 auto place_index = lookup[(port.queue!=nullptr) ? (void*) port.queue : (void*) port.pool];
0072
0073 t | ((port.is_input) ? "Input ": "Output");
0074 t | ((port.queue != nullptr) ? "Queue ": "Pool");
0075 t | place_index;
0076 }
0077 }
0078 return t.Render();
0079 }
0080
0081
0082
0083
0084
0085
0086 void JTopologyBuilder::set_configure_fn(std::function<void(JTopologyBuilder&)> configure_fn) {
0087 m_configure_topology = std::move(configure_fn);
0088 }
0089
0090 void JTopologyBuilder::create_topology() {
0091 mapping.initialize(static_cast<JProcessorMapping::AffinityStrategy>(m_affinity),
0092 static_cast<JProcessorMapping::LocalityStrategy>(m_locality));
0093
0094 if (m_configure_topology) {
0095 m_configure_topology(*this);
0096 LOG_WARN(GetLogger()) << "Found custom topology configurator! Modified arrow topology is: \n" << print_topology() << LOG_END;
0097 }
0098 else {
0099 attach_level(JEventLevel::Run, nullptr, nullptr);
0100 LOG_INFO(GetLogger()) << "Arrow topology is:\n" << print_topology() << LOG_END;
0101 }
0102 for (auto* arrow : arrows) {
0103 arrow->set_logger(GetLogger());
0104 }
0105
0106
0107
0108
0109 bool need_ordering = false;
0110 for (auto* queue : queues) {
0111 if (queue->GetEnforcesOrdering()) {
0112 need_ordering = true;
0113 }
0114 }
0115 if (!need_ordering) {
0116 for (auto* queue : queues) {
0117 queue->SetEstablishesOrdering(false);
0118 }
0119 }
0120 }
0121
0122
0123 void JTopologyBuilder::Init() {
0124
0125 m_components = GetApplication()->GetService<JComponentManager>();
0126
0127
0128
0129 if (m_params->Exists("nthreads")) {
0130 if (m_params->GetParameterValue<std::string>("nthreads") == "Ncores") {
0131 m_max_inflight_events = JCpuInfo::GetNumCpus();
0132 } else {
0133 m_max_inflight_events = m_params->GetParameterValue<int>("nthreads");
0134 }
0135 }
0136
0137 m_params->SetDefaultParameter("jana:max_inflight_events", m_max_inflight_events,
0138 "The number of events which may be in-flight at once. Should be at least `nthreads` to prevent starvation; more gives better load balancing.")
0139 ->SetIsAdvanced(true);
0140
0141
0142
0143
0144
0145
0146 m_params->SetDefaultParameter("jana:affinity", m_affinity,
0147 "Constrain worker thread CPU affinity. 0=Let the OS decide. 1=Avoid extra memory movement at the expense of using hyperthreads. 2=Avoid hyperthreads at the expense of extra memory movement")
0148 ->SetIsAdvanced(true);
0149 m_params->SetDefaultParameter("jana:locality", m_locality,
0150 "Constrain memory locality. 0=No constraint. 1=Events stay on the same socket. 2=Events stay on the same NUMA domain. 3=Events stay on same core. 4=Events stay on same cpu/hyperthread.")
0151 ->SetIsAdvanced(true);
0152 };
0153
0154
0155 void JTopologyBuilder::connect(JArrow* upstream, size_t upstream_port_id, JArrow* downstream, size_t downstream_port_id) {
0156
0157 JEventQueue* queue = nullptr;
0158
0159 JArrow::Port& downstream_port = downstream->m_ports.at(downstream_port_id);
0160 if (downstream_port.queue != nullptr) {
0161
0162 queue = downstream_port.queue;
0163 }
0164 else {
0165
0166 queue = new JEventQueue(m_max_inflight_events, mapping.get_loc_count());
0167 downstream_port.queue = queue;
0168 queues.push_back(queue);
0169 }
0170 downstream_port.pool = nullptr;
0171 if (downstream_port.enforces_ordering) {
0172 queue->SetEnforcesOrdering();
0173 }
0174
0175 JArrow::Port& upstream_port = upstream->m_ports.at(upstream_port_id);
0176 upstream_port.queue = queue;
0177 if (upstream_port.establishes_ordering) {
0178 queue->SetEstablishesOrdering(true);
0179 }
0180 upstream_port.pool = nullptr;
0181 }
0182
0183
0184 void JTopologyBuilder::connect_to_first_available(JArrow* upstream, size_t upstream_port, std::vector<std::pair<JArrow*, size_t>> downstreams) {
0185 for (auto& [downstream, downstream_port_id] : downstreams) {
0186 if (downstream != nullptr) {
0187 connect(upstream, upstream_port, downstream, downstream_port_id);
0188 return;
0189 }
0190 }
0191 }
0192
0193 std::pair<JEventTapArrow*, JEventTapArrow*> JTopologyBuilder::create_tap_chain(std::vector<JEventProcessor*>& procs, std::string level) {
0194
0195 JEventTapArrow* first = nullptr;
0196 JEventTapArrow* last = nullptr;
0197
0198 int i=1;
0199 std::string arrow_name = level + "Tap";
0200 for (JEventProcessor* proc : procs) {
0201 if (procs.size() > 1) {
0202 arrow_name += std::to_string(i++);
0203 }
0204 JEventTapArrow* current = new JEventTapArrow(arrow_name);
0205 current->add_processor(proc);
0206 arrows.push_back(current);
0207 if (first == nullptr) {
0208 first = current;
0209 }
0210 if (last != nullptr) {
0211 connect(last, JEventTapArrow::EVENT_OUT, current, JEventTapArrow::EVENT_IN);
0212 }
0213 last = current;
0214 }
0215 return {first, last};
0216 }
0217
0218
0219 void JTopologyBuilder::attach_level(JEventLevel current_level, JUnfoldArrow* parent_unfolder, JFoldArrow* parent_folder) {
0220 std::stringstream ss;
0221 ss << current_level;
0222 auto level_str = ss.str();
0223
0224
0225 std::vector<JEventSource*> sources_at_level;
0226 for (JEventSource* source : m_components->get_evt_srces()) {
0227 if (source->GetLevel() == current_level && source->IsEnabled()) {
0228 sources_at_level.push_back(source);
0229 }
0230 }
0231
0232
0233 std::vector<JEventUnfolder*> unfolders_at_level;
0234 for (JEventUnfolder* unfolder : m_components->get_unfolders()) {
0235 if (unfolder->GetLevel() == current_level && unfolder->IsEnabled()) {
0236 unfolders_at_level.push_back(unfolder);
0237 }
0238 }
0239
0240
0241 std::vector<JEventProcessor*> mappable_procs_at_level;
0242 std::vector<JEventProcessor*> tappable_procs_at_level;
0243
0244 for (JEventProcessor* proc : m_components->get_evt_procs()) {
0245
0246 if (proc->GetLevel() == current_level && proc->IsEnabled()) {
0247
0248
0249
0250
0251
0252 if (proc->GetCallbackStyle() == JEventProcessor::CallbackStyle::LegacyMode && proc->IsOrderingEnabled()) {
0253 throw JException("%s: Ordering can only be used with non-legacy JEventProcessors", proc->GetTypeName().c_str());
0254 }
0255
0256 mappable_procs_at_level.push_back(proc);
0257 if (proc->GetCallbackStyle() != JEventProcessor::CallbackStyle::LegacyMode) {
0258 tappable_procs_at_level.push_back(proc);
0259 }
0260 }
0261 }
0262
0263
0264 bool is_top_level = (parent_unfolder == nullptr);
0265 if (is_top_level && sources_at_level.size() == 0) {
0266
0267 LOG_TRACE(GetLogger()) << "JTopologyBuilder: No sources found at level " << current_level << ", skipping" << LOG_END;
0268 JEventLevel next = next_level(current_level);
0269 if (next == JEventLevel::None) {
0270 LOG_WARN(GetLogger()) << "No sources found: Processing topology will be empty." << LOG_END;
0271 return;
0272 }
0273 return attach_level(next, nullptr, nullptr);
0274 }
0275
0276
0277 if (!is_top_level && !sources_at_level.empty()) {
0278 throw JException("Topology forbids event sources at lower event levels in the topology");
0279 }
0280 if ((parent_unfolder == nullptr && parent_folder != nullptr) || (parent_unfolder != nullptr && parent_folder == nullptr)) {
0281 throw JException("Topology requires matching unfolder/folder arrow pairs");
0282 }
0283 if (unfolders_at_level.size() > 1) {
0284 throw JException("Multiple JEventUnfolders provided for level %s", level_str.c_str());
0285 }
0286
0287
0288
0289
0290
0291
0292
0293
0294 LOG_INFO(GetLogger()) << "Creating event pool with level=" << current_level << " and size=" << m_max_inflight_events;
0295 JEventPool* pool_at_level = new JEventPool(m_components, m_max_inflight_events, m_location_count, current_level);
0296 pools.push_back(pool_at_level);
0297 LOG_INFO(GetLogger()) << "Created event pool with level=" << current_level << " and size=" << m_max_inflight_events;
0298
0299
0300
0301
0302 JEventSourceArrow* src_arrow = nullptr;
0303 bool need_source = !sources_at_level.empty();
0304 if (need_source) {
0305 src_arrow = new JEventSourceArrow(level_str+"Source", sources_at_level);
0306 src_arrow->attach(pool_at_level, JEventSourceArrow::EVENT_IN);
0307 src_arrow->attach(pool_at_level, JEventSourceArrow::EVENT_OUT);
0308 arrows.push_back(src_arrow);
0309 }
0310
0311
0312
0313
0314 bool have_parallel_sources = false;
0315 for (JEventSource* source: sources_at_level) {
0316 have_parallel_sources |= source->IsProcessParallelEnabled();
0317 }
0318 bool have_unfolder = !unfolders_at_level.empty();
0319 JEventMapArrow* map1_arrow = nullptr;
0320 bool need_map1 = (have_parallel_sources || have_unfolder);
0321
0322 if (need_map1) {
0323 map1_arrow = new JEventMapArrow(level_str+"Map1");
0324 for (JEventSource* source: sources_at_level) {
0325 if (source->IsProcessParallelEnabled()) {
0326 map1_arrow->add_source(source);
0327 }
0328 }
0329 for (JEventUnfolder* unf: unfolders_at_level) {
0330 map1_arrow->add_unfolder(unf);
0331 }
0332 map1_arrow->attach(pool_at_level, JEventMapArrow::EVENT_IN);
0333 map1_arrow->attach(pool_at_level, JEventMapArrow::EVENT_OUT);
0334 arrows.push_back(map1_arrow);
0335 }
0336
0337
0338
0339
0340 JUnfoldArrow* unfold_arrow = nullptr;
0341 bool need_unfold = have_unfolder;
0342 if (need_unfold) {
0343 unfold_arrow = new JUnfoldArrow(level_str+"Unfold", unfolders_at_level[0]);
0344 unfold_arrow->attach(pool_at_level, JUnfoldArrow::REJECTED_PARENT_OUT);
0345 arrows.push_back(unfold_arrow);
0346 }
0347
0348
0349
0350
0351 JFoldArrow* fold_arrow = nullptr;
0352 bool need_fold = have_unfolder;
0353 if(need_fold) {
0354 fold_arrow = new JFoldArrow(level_str+"Fold", current_level, unfolders_at_level[0]->GetChildLevel());
0355 arrows.push_back(fold_arrow);
0356 fold_arrow->attach(pool_at_level, JFoldArrow::PARENT_OUT);
0357 }
0358
0359
0360
0361
0362 JEventMapArrow* map2_arrow = nullptr;
0363 bool need_map2 = !mappable_procs_at_level.empty();
0364 if (need_map2) {
0365 map2_arrow = new JEventMapArrow(level_str+"Map2");
0366 for (JEventProcessor* proc : mappable_procs_at_level) {
0367 map2_arrow->add_processor(proc);
0368 map2_arrow->attach(pool_at_level, JEventMapArrow::EVENT_IN);
0369 map2_arrow->attach(pool_at_level, JEventMapArrow::EVENT_OUT);
0370 }
0371 arrows.push_back(map2_arrow);
0372 }
0373
0374
0375
0376
0377 JEventTapArrow* first_tap_arrow = nullptr;
0378 JEventTapArrow* last_tap_arrow = nullptr;
0379 bool need_tap = !tappable_procs_at_level.empty();
0380 if (need_tap) {
0381 std::tie(first_tap_arrow, last_tap_arrow) = create_tap_chain(tappable_procs_at_level, level_str);
0382 first_tap_arrow->attach(pool_at_level, JEventTapArrow::EVENT_IN);
0383 last_tap_arrow->attach(pool_at_level, JEventTapArrow::EVENT_OUT);
0384 }
0385
0386
0387
0388
0389
0390
0391 if (parent_unfolder != nullptr) {
0392 parent_unfolder->attach(pool_at_level, JUnfoldArrow::CHILD_IN);
0393 connect_to_first_available(parent_unfolder, JUnfoldArrow::CHILD_OUT,
0394 {{map1_arrow, JEventMapArrow::EVENT_IN}, {unfold_arrow, JUnfoldArrow::PARENT_IN}, {map2_arrow, JEventMapArrow::EVENT_IN}, {first_tap_arrow, JEventTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}});
0395 }
0396 if (src_arrow != nullptr) {
0397 connect_to_first_available(src_arrow, JEventSourceArrow::EVENT_OUT,
0398 {{map1_arrow, JEventMapArrow::EVENT_IN}, {unfold_arrow, JUnfoldArrow::PARENT_IN}, {map2_arrow, JEventMapArrow::EVENT_IN}, {first_tap_arrow, JEventTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}});
0399 }
0400 if (map1_arrow != nullptr) {
0401 connect_to_first_available(map1_arrow, JEventMapArrow::EVENT_OUT,
0402 {{unfold_arrow, JUnfoldArrow::PARENT_IN}, {map2_arrow, JEventMapArrow::EVENT_IN}, {first_tap_arrow, JEventTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}});
0403 }
0404 if (unfold_arrow != nullptr) {
0405 connect_to_first_available(unfold_arrow, JUnfoldArrow::REJECTED_PARENT_OUT,
0406 {{map2_arrow, JEventMapArrow::EVENT_IN}, {first_tap_arrow, JEventTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}});
0407 }
0408 if (fold_arrow != nullptr) {
0409 connect_to_first_available(fold_arrow, JFoldArrow::CHILD_OUT,
0410 {{map2_arrow, JEventMapArrow::EVENT_IN}, {first_tap_arrow, JEventTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}});
0411 }
0412 if (map2_arrow != nullptr) {
0413 connect_to_first_available(map2_arrow, JEventMapArrow::EVENT_OUT,
0414 {{first_tap_arrow, JEventTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}});
0415 }
0416 if (last_tap_arrow != nullptr) {
0417 connect_to_first_available(last_tap_arrow, JEventTapArrow::EVENT_OUT,
0418 {{parent_folder, JFoldArrow::CHILD_IN}});
0419 }
0420 if (parent_folder != nullptr) {
0421 parent_folder->attach(pool_at_level, JFoldArrow::CHILD_OUT);
0422 }
0423
0424
0425 if (need_unfold) {
0426 auto next_level = unfolders_at_level[0]->GetChildLevel();
0427 attach_level(next_level, unfold_arrow, fold_arrow);
0428 }
0429 else {
0430
0431
0432 if (last_tap_arrow != nullptr) {
0433 last_tap_arrow->set_is_sink(true);
0434 }
0435 else if (map2_arrow != nullptr) {
0436 map2_arrow->set_is_sink(true);
0437 }
0438 else if (map1_arrow != nullptr) {
0439 map1_arrow->set_is_sink(true);
0440 }
0441 else if (src_arrow != nullptr) {
0442 src_arrow->set_is_sink(true);
0443 }
0444 }
0445 }
0446
0447
0448