File indexing completed on 2026-04-06 08:34:04
0001
0002
0003
0004
0005
0006 #include "JTopologyBuilder.h"
0007
0008 #include "JEventSourceArrow.h"
0009 #include "JEventMapArrow.h"
0010 #include "JEventTapArrow.h"
0011 #include "JUnfoldArrow.h"
0012 #include "JFoldArrow.h"
0013 #include <JANA/JEventProcessor.h>
0014 #include <JANA/Utils/JTablePrinter.h>
0015 #include <string>
0016 #include <vector>
0017
0018
0019 JTopologyBuilder::JTopologyBuilder() {
0020 SetPrefix("jana");
0021 }
0022
0023 JTopologyBuilder::~JTopologyBuilder() {
0024 for (auto arrow : arrows) {
0025 delete arrow;
0026 }
0027 for (auto queue : queues) {
0028 delete queue;
0029 }
0030 for (auto pool : pools) {
0031 delete pool;
0032 }
0033 }
0034
0035 std::string JTopologyBuilder::print_topology() {
0036 JTablePrinter t;
0037 t.AddColumn("Arrow", JTablePrinter::Justify::Left, 0);
0038 t.AddColumn("Parallel", JTablePrinter::Justify::Center, 0);
0039 t.AddColumn("Direction", JTablePrinter::Justify::Left, 0);
0040 t.AddColumn("Place", JTablePrinter::Justify::Left, 0);
0041 t.AddColumn("ID", JTablePrinter::Justify::Left, 0);
0042
0043
0044 int i = 0;
0045 std::map<void*, int> lookup;
0046 for (JEventQueue* queue : queues) {
0047 lookup[queue] = i;
0048 i += 1;
0049 }
0050
0051 for (JEventPool* pool : pools) {
0052 lookup[pool] = i;
0053 i += 1;
0054 }
0055
0056
0057 bool show_row = true;
0058
0059 for (JArrow* arrow : arrows) {
0060
0061 show_row = true;
0062 for (JArrow::Port& port : arrow->m_ports) {
0063 if (show_row) {
0064 t | arrow->get_name();
0065 t | arrow->is_parallel();
0066 show_row = false;
0067 }
0068 else {
0069 t | "" | "" ;
0070 }
0071 auto place_index = lookup[(port.queue!=nullptr) ? (void*) port.queue : (void*) port.pool];
0072
0073 t | ((port.is_input) ? "Input ": "Output");
0074 t | ((port.queue != nullptr) ? "Queue ": "Pool");
0075 t | place_index;
0076 }
0077 }
0078 return t.Render();
0079 }
0080
0081
0082
0083
0084
0085
0086 void JTopologyBuilder::set_configure_fn(std::function<void(JTopologyBuilder&)> configure_fn) {
0087 m_configure_topology = std::move(configure_fn);
0088 }
0089
0090 void JTopologyBuilder::create_topology() {
0091 mapping.initialize(static_cast<JProcessorMapping::AffinityStrategy>(m_affinity),
0092 static_cast<JProcessorMapping::LocalityStrategy>(m_locality));
0093
0094 if (m_configure_topology) {
0095 m_configure_topology(*this);
0096 LOG_WARN(GetLogger()) << "Found custom topology configurator! Modified arrow topology is: \n" << print_topology() << LOG_END;
0097 }
0098 else {
0099 attach_level(JEventLevel::Run, nullptr, nullptr);
0100 LOG_INFO(GetLogger()) << "Arrow topology is:\n" << print_topology() << LOG_END;
0101 }
0102 for (auto* arrow : arrows) {
0103 arrow->set_logger(GetLogger());
0104 }
0105
0106
0107
0108
0109 bool need_ordering = false;
0110 for (auto* queue : queues) {
0111 if (queue->GetEnforcesOrdering()) {
0112 need_ordering = true;
0113 }
0114 }
0115 if (!need_ordering) {
0116 for (auto* queue : queues) {
0117 queue->SetEstablishesOrdering(false);
0118 }
0119 }
0120 size_t i = 0;
0121 for (auto* queue: queues) {
0122 LOG_DEBUG(GetLogger()) << "Queue " << i++ << ": establishes_ordering: " << queue->GetEstablishesOrdering() << ", enforces_ordering: " << queue->GetEnforcesOrdering();
0123 }
0124 }
0125
0126
0127 void JTopologyBuilder::Init() {
0128
0129 m_components = GetApplication()->GetService<JComponentManager>();
0130
0131
0132
0133 if (m_params->Exists("nthreads")) {
0134 if (m_params->GetParameterValue<std::string>("nthreads") == "Ncores") {
0135 m_max_inflight_events = JCpuInfo::GetNumCpus();
0136 } else {
0137 m_max_inflight_events = m_params->GetParameterValue<int>("nthreads");
0138 }
0139 }
0140
0141 m_params->SetDefaultParameter("jana:max_inflight_events", m_max_inflight_events,
0142 "The number of events which may be in-flight at once. Should be at least `nthreads` to prevent starvation; more gives better load balancing.")
0143 ->SetIsAdvanced(true);
0144
0145
0146
0147
0148
0149
0150 m_params->SetDefaultParameter("jana:affinity", m_affinity,
0151 "Constrain worker thread CPU affinity. 0=Let the OS decide. 1=Avoid extra memory movement at the expense of using hyperthreads. 2=Avoid hyperthreads at the expense of extra memory movement")
0152 ->SetIsAdvanced(true);
0153 m_params->SetDefaultParameter("jana:locality", m_locality,
0154 "Constrain memory locality. 0=No constraint. 1=Events stay on the same socket. 2=Events stay on the same NUMA domain. 3=Events stay on same core. 4=Events stay on same cpu/hyperthread.")
0155 ->SetIsAdvanced(true);
0156 };
0157
0158
0159 void JTopologyBuilder::connect(JArrow* upstream, size_t upstream_port_id, JArrow* downstream, size_t downstream_port_id) {
0160
0161 JEventQueue* queue = nullptr;
0162
0163 JArrow::Port& downstream_port = downstream->m_ports.at(downstream_port_id);
0164 if (downstream_port.queue != nullptr) {
0165
0166 queue = downstream_port.queue;
0167 }
0168 else {
0169
0170 queue = new JEventQueue(m_max_inflight_events, mapping.get_loc_count());
0171 downstream_port.queue = queue;
0172 queues.push_back(queue);
0173 }
0174 downstream_port.pool = nullptr;
0175 if (downstream_port.enforces_ordering) {
0176 queue->SetEnforcesOrdering();
0177 }
0178
0179 JArrow::Port& upstream_port = upstream->m_ports.at(upstream_port_id);
0180 upstream_port.queue = queue;
0181 if (upstream_port.establishes_ordering) {
0182 queue->SetEstablishesOrdering(true);
0183 }
0184 upstream_port.pool = nullptr;
0185 }
0186
0187
0188 void JTopologyBuilder::connect_to_first_available(JArrow* upstream, size_t upstream_port, std::vector<std::pair<JArrow*, size_t>> downstreams) {
0189 for (auto& [downstream, downstream_port_id] : downstreams) {
0190 if (downstream != nullptr) {
0191 connect(upstream, upstream_port, downstream, downstream_port_id);
0192 return;
0193 }
0194 }
0195 }
0196
0197 std::pair<JEventTapArrow*, JEventTapArrow*> JTopologyBuilder::create_tap_chain(std::vector<JEventProcessor*>& procs, std::string level) {
0198
0199 JEventTapArrow* first = nullptr;
0200 JEventTapArrow* last = nullptr;
0201
0202 int i=1;
0203 std::string arrow_name = level + "Tap";
0204 for (JEventProcessor* proc : procs) {
0205 if (procs.size() > 1) {
0206 arrow_name += std::to_string(i++);
0207 }
0208 JEventTapArrow* current = new JEventTapArrow(arrow_name);
0209 current->add_processor(proc);
0210 arrows.push_back(current);
0211 if (first == nullptr) {
0212 first = current;
0213 }
0214 if (last != nullptr) {
0215 connect(last, JEventTapArrow::EVENT_OUT, current, JEventTapArrow::EVENT_IN);
0216 }
0217 last = current;
0218 }
0219 return {first, last};
0220 }
0221
0222
0223 void JTopologyBuilder::attach_level(JEventLevel current_level, JUnfoldArrow* parent_unfolder, JFoldArrow* parent_folder) {
0224 std::stringstream ss;
0225 ss << current_level;
0226 auto level_str = ss.str();
0227
0228
0229 std::vector<JEventSource*> sources_at_level;
0230 for (JEventSource* source : m_components->get_evt_srces()) {
0231 if (source->GetLevel() == current_level && source->IsEnabled()) {
0232 sources_at_level.push_back(source);
0233 }
0234 }
0235
0236
0237 std::vector<JEventUnfolder*> unfolders_at_level;
0238 for (JEventUnfolder* unfolder : m_components->get_unfolders()) {
0239 if (unfolder->GetLevel() == current_level && unfolder->IsEnabled()) {
0240 unfolders_at_level.push_back(unfolder);
0241 }
0242 }
0243
0244
0245 std::vector<JEventProcessor*> mappable_procs_at_level;
0246 std::vector<JEventProcessor*> tappable_procs_at_level;
0247
0248 for (JEventProcessor* proc : m_components->get_evt_procs()) {
0249
0250 if (proc->GetLevel() == current_level && proc->IsEnabled()) {
0251
0252
0253
0254
0255
0256 if (proc->GetCallbackStyle() == JEventProcessor::CallbackStyle::LegacyMode && proc->IsOrderingEnabled()) {
0257 throw JException("%s: Ordering can only be used with non-legacy JEventProcessors", proc->GetTypeName().c_str());
0258 }
0259
0260 mappable_procs_at_level.push_back(proc);
0261 if (proc->GetCallbackStyle() != JEventProcessor::CallbackStyle::LegacyMode) {
0262 tappable_procs_at_level.push_back(proc);
0263 }
0264 }
0265 }
0266
0267
0268 bool is_top_level = (parent_unfolder == nullptr);
0269 if (is_top_level && sources_at_level.size() == 0) {
0270
0271 LOG_TRACE(GetLogger()) << "JTopologyBuilder: No sources found at level " << current_level << ", skipping" << LOG_END;
0272 JEventLevel next = next_level(current_level);
0273 if (next == JEventLevel::None) {
0274 LOG_WARN(GetLogger()) << "No sources found: Processing topology will be empty." << LOG_END;
0275 return;
0276 }
0277 return attach_level(next, nullptr, nullptr);
0278 }
0279
0280
0281 if (!is_top_level && !sources_at_level.empty()) {
0282 throw JException("Topology forbids event sources at lower event levels in the topology");
0283 }
0284 if ((parent_unfolder == nullptr && parent_folder != nullptr) || (parent_unfolder != nullptr && parent_folder == nullptr)) {
0285 throw JException("Topology requires matching unfolder/folder arrow pairs");
0286 }
0287 if (unfolders_at_level.size() > 1) {
0288 throw JException("Multiple JEventUnfolders provided for level %s", level_str.c_str());
0289 }
0290
0291
0292
0293
0294
0295
0296
0297
0298 LOG_INFO(GetLogger()) << "Creating event pool with level=" << current_level << " and size=" << m_max_inflight_events;
0299 JEventPool* pool_at_level = new JEventPool(m_components, m_max_inflight_events, m_location_count, current_level);
0300 pools.push_back(pool_at_level);
0301 LOG_INFO(GetLogger()) << "Created event pool with level=" << current_level << " and size=" << m_max_inflight_events;
0302
0303
0304
0305
0306 JEventSourceArrow* src_arrow = nullptr;
0307 bool need_source = !sources_at_level.empty();
0308 if (need_source) {
0309 src_arrow = new JEventSourceArrow(level_str+"Source", sources_at_level);
0310 src_arrow->attach(pool_at_level, JEventSourceArrow::EVENT_IN);
0311 src_arrow->attach(pool_at_level, JEventSourceArrow::EVENT_OUT);
0312 arrows.push_back(src_arrow);
0313 }
0314
0315
0316
0317
0318 bool have_parallel_sources = false;
0319 for (JEventSource* source: sources_at_level) {
0320 have_parallel_sources |= source->IsProcessParallelEnabled();
0321 }
0322 bool have_unfolder = !unfolders_at_level.empty();
0323 JEventMapArrow* map1_arrow = nullptr;
0324 bool need_map1 = (have_parallel_sources || have_unfolder);
0325
0326 if (need_map1) {
0327 map1_arrow = new JEventMapArrow(level_str+"Map1");
0328 for (JEventSource* source: sources_at_level) {
0329 if (source->IsProcessParallelEnabled()) {
0330 map1_arrow->add_source(source);
0331 }
0332 }
0333 for (JEventUnfolder* unf: unfolders_at_level) {
0334 map1_arrow->add_unfolder(unf);
0335 }
0336 map1_arrow->attach(pool_at_level, JEventMapArrow::EVENT_IN);
0337 map1_arrow->attach(pool_at_level, JEventMapArrow::EVENT_OUT);
0338 arrows.push_back(map1_arrow);
0339 }
0340
0341
0342
0343
0344 JUnfoldArrow* unfold_arrow = nullptr;
0345 bool need_unfold = have_unfolder;
0346 if (need_unfold) {
0347 unfold_arrow = new JUnfoldArrow(level_str+"Unfold", unfolders_at_level[0]);
0348 unfold_arrow->attach(pool_at_level, JUnfoldArrow::REJECTED_PARENT_OUT);
0349 arrows.push_back(unfold_arrow);
0350 }
0351
0352
0353
0354
0355 JFoldArrow* fold_arrow = nullptr;
0356 bool need_fold = have_unfolder;
0357 if(need_fold) {
0358 fold_arrow = new JFoldArrow(level_str+"Fold", current_level, unfolders_at_level[0]->GetChildLevel());
0359 arrows.push_back(fold_arrow);
0360 fold_arrow->attach(pool_at_level, JFoldArrow::PARENT_OUT);
0361 }
0362
0363
0364
0365
0366 JEventMapArrow* map2_arrow = nullptr;
0367 bool need_map2 = !mappable_procs_at_level.empty();
0368 if (need_map2) {
0369 map2_arrow = new JEventMapArrow(level_str+"Map2");
0370 for (JEventProcessor* proc : mappable_procs_at_level) {
0371 map2_arrow->add_processor(proc);
0372 map2_arrow->attach(pool_at_level, JEventMapArrow::EVENT_IN);
0373 map2_arrow->attach(pool_at_level, JEventMapArrow::EVENT_OUT);
0374 }
0375 arrows.push_back(map2_arrow);
0376 }
0377
0378
0379
0380
0381 JEventTapArrow* first_tap_arrow = nullptr;
0382 JEventTapArrow* last_tap_arrow = nullptr;
0383 bool need_tap = !tappable_procs_at_level.empty();
0384 if (need_tap) {
0385 std::tie(first_tap_arrow, last_tap_arrow) = create_tap_chain(tappable_procs_at_level, level_str);
0386 first_tap_arrow->attach(pool_at_level, JEventTapArrow::EVENT_IN);
0387 last_tap_arrow->attach(pool_at_level, JEventTapArrow::EVENT_OUT);
0388 }
0389
0390
0391
0392
0393
0394
0395 if (parent_unfolder != nullptr) {
0396 parent_unfolder->attach(pool_at_level, JUnfoldArrow::CHILD_IN);
0397 connect_to_first_available(parent_unfolder, JUnfoldArrow::CHILD_OUT,
0398 {{map1_arrow, JEventMapArrow::EVENT_IN}, {unfold_arrow, JUnfoldArrow::PARENT_IN}, {map2_arrow, JEventMapArrow::EVENT_IN}, {first_tap_arrow, JEventTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}});
0399 }
0400 if (src_arrow != nullptr) {
0401 connect_to_first_available(src_arrow, JEventSourceArrow::EVENT_OUT,
0402 {{map1_arrow, JEventMapArrow::EVENT_IN}, {unfold_arrow, JUnfoldArrow::PARENT_IN}, {map2_arrow, JEventMapArrow::EVENT_IN}, {first_tap_arrow, JEventTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}});
0403 }
0404 if (map1_arrow != nullptr) {
0405 connect_to_first_available(map1_arrow, JEventMapArrow::EVENT_OUT,
0406 {{unfold_arrow, JUnfoldArrow::PARENT_IN}, {map2_arrow, JEventMapArrow::EVENT_IN}, {first_tap_arrow, JEventTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}});
0407 }
0408 if (unfold_arrow != nullptr) {
0409 connect_to_first_available(unfold_arrow, JUnfoldArrow::REJECTED_PARENT_OUT,
0410 {{map2_arrow, JEventMapArrow::EVENT_IN}, {first_tap_arrow, JEventTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}});
0411 }
0412 if (fold_arrow != nullptr) {
0413 connect_to_first_available(fold_arrow, JFoldArrow::CHILD_OUT,
0414 {{map2_arrow, JEventMapArrow::EVENT_IN}, {first_tap_arrow, JEventTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}});
0415 }
0416 if (map2_arrow != nullptr) {
0417 connect_to_first_available(map2_arrow, JEventMapArrow::EVENT_OUT,
0418 {{first_tap_arrow, JEventTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}});
0419 }
0420 if (last_tap_arrow != nullptr) {
0421 connect_to_first_available(last_tap_arrow, JEventTapArrow::EVENT_OUT,
0422 {{parent_folder, JFoldArrow::CHILD_IN}});
0423 }
0424 if (parent_folder != nullptr) {
0425 parent_folder->attach(pool_at_level, JFoldArrow::CHILD_OUT);
0426 }
0427
0428
0429 if (need_unfold) {
0430 auto next_level = unfolders_at_level[0]->GetChildLevel();
0431 attach_level(next_level, unfold_arrow, fold_arrow);
0432 }
0433 else {
0434
0435
0436 if (last_tap_arrow != nullptr) {
0437 last_tap_arrow->set_is_sink(true);
0438 }
0439 else if (map2_arrow != nullptr) {
0440 map2_arrow->set_is_sink(true);
0441 }
0442 else if (map1_arrow != nullptr) {
0443 map1_arrow->set_is_sink(true);
0444 }
0445 else if (src_arrow != nullptr) {
0446 src_arrow->set_is_sink(true);
0447 }
0448 }
0449 }
0450
0451
0452