File indexing completed on 2025-01-18 10:17:36
0001
0002
0003
0004
0005
0006 #include "JTopologyBuilder.h"
0007
0008 #include "JEventSourceArrow.h"
0009 #include "JEventMapArrow.h"
0010 #include "JEventTapArrow.h"
0011 #include "JUnfoldArrow.h"
0012 #include "JFoldArrow.h"
0013 #include <JANA/JEventProcessor.h>
0014 #include <JANA/Utils/JTablePrinter.h>
0015
0016
0017 JTopologyBuilder::JTopologyBuilder() {
0018 SetPrefix("jana");
0019 }
0020
0021 JTopologyBuilder::~JTopologyBuilder() {
0022 for (auto arrow : arrows) {
0023 delete arrow;
0024 }
0025 for (auto queue : queues) {
0026 delete queue;
0027 }
0028 for (auto pool : pools) {
0029 delete pool;
0030 }
0031 }
0032
0033 std::string JTopologyBuilder::print_topology() {
0034 JTablePrinter t;
0035 t.AddColumn("Arrow", JTablePrinter::Justify::Left, 0);
0036 t.AddColumn("Parallel", JTablePrinter::Justify::Center, 0);
0037 t.AddColumn("Direction", JTablePrinter::Justify::Left, 0);
0038 t.AddColumn("Place", JTablePrinter::Justify::Left, 0);
0039 t.AddColumn("ID", JTablePrinter::Justify::Left, 0);
0040
0041
0042 int i = 0;
0043 std::map<void*, int> lookup;
0044 for (JEventQueue* queue : queues) {
0045 lookup[queue] = i;
0046 i += 1;
0047 }
0048
0049 for (JEventPool* pool : pools) {
0050 lookup[pool] = i;
0051 i += 1;
0052 }
0053
0054
0055 bool show_row = true;
0056
0057 for (JArrow* arrow : arrows) {
0058
0059 show_row = true;
0060 for (JArrow::Port& port : arrow->m_ports) {
0061 if (show_row) {
0062 t | arrow->get_name();
0063 t | arrow->is_parallel();
0064 show_row = false;
0065 }
0066 else {
0067 t | "" | "" ;
0068 }
0069 auto place_index = lookup[(port.queue!=nullptr) ? (void*) port.queue : (void*) port.pool];
0070
0071 t | ((port.is_input) ? "Input ": "Output");
0072 t | ((port.queue != nullptr) ? "Queue ": "Pool");
0073 t | place_index;
0074 }
0075 }
0076 return t.Render();
0077 }
0078
0079
0080
0081
0082
0083
0084 void JTopologyBuilder::set_configure_fn(std::function<void(JTopologyBuilder&)> configure_fn) {
0085 m_configure_topology = std::move(configure_fn);
0086 }
0087
0088 void JTopologyBuilder::create_topology() {
0089 mapping.initialize(static_cast<JProcessorMapping::AffinityStrategy>(m_affinity),
0090 static_cast<JProcessorMapping::LocalityStrategy>(m_locality));
0091
0092 if (m_configure_topology) {
0093 m_configure_topology(*this);
0094 LOG_WARN(GetLogger()) << "Found custom topology configurator! Modified arrow topology is: \n" << print_topology() << LOG_END;
0095 }
0096 else {
0097 attach_level(JEventLevel::Run, nullptr, nullptr);
0098 LOG_INFO(GetLogger()) << "Arrow topology is:\n" << print_topology() << LOG_END;
0099 }
0100 for (auto* arrow : arrows) {
0101 arrow->set_logger(GetLogger());
0102 }
0103 }
0104
0105
0106 void JTopologyBuilder::acquire_services(JServiceLocator *sl) {
0107
0108 m_components = sl->get<JComponentManager>();
0109
0110
0111
0112 if (m_params->Exists("nthreads")) {
0113 if (m_params->GetParameterValue<std::string>("nthreads") == "Ncores") {
0114 m_max_inflight_events = JCpuInfo::GetNumCpus();
0115 } else {
0116 m_max_inflight_events = m_params->GetParameterValue<int>("nthreads");
0117 }
0118 }
0119
0120 m_params->SetDefaultParameter("jana:max_inflight_events", m_max_inflight_events,
0121 "The number of events which may be in-flight at once. Should be at least `nthreads` to prevent starvation; more gives better load balancing.")
0122 ->SetIsAdvanced(true);
0123
0124
0125
0126
0127
0128
0129 m_params->SetDefaultParameter("jana:affinity", m_affinity,
0130 "Constrain worker thread CPU affinity. 0=Let the OS decide. 1=Avoid extra memory movement at the expense of using hyperthreads. 2=Avoid hyperthreads at the expense of extra memory movement")
0131 ->SetIsAdvanced(true);
0132 m_params->SetDefaultParameter("jana:locality", m_locality,
0133 "Constrain memory locality. 0=No constraint. 1=Events stay on the same socket. 2=Events stay on the same NUMA domain. 3=Events stay on same core. 4=Events stay on same cpu/hyperthread.")
0134 ->SetIsAdvanced(true);
0135 };
0136
0137
0138 void JTopologyBuilder::connect(JArrow* upstream, size_t up_index, JArrow* downstream, size_t down_index) {
0139
0140 auto queue = new JEventQueue(m_max_inflight_events, mapping.get_loc_count());
0141 queues.push_back(queue);
0142
0143 size_t i = 0;
0144 for (JArrow::Port& port : upstream->m_ports) {
0145 if (!port.is_input) {
0146 if (i++ == up_index) {
0147
0148 port.queue = queue;
0149 port.pool = nullptr;
0150 }
0151 }
0152 }
0153 i = 0;
0154 for (JArrow::Port& port : downstream->m_ports) {
0155 if (port.is_input) {
0156 if (i++ == down_index) {
0157
0158 port.queue = queue;
0159 port.pool = nullptr;
0160 }
0161 }
0162 }
0163 }
0164
0165
0166 void JTopologyBuilder::connect_to_first_available(JArrow* upstream, std::vector<JArrow*> downstreams) {
0167 for (JArrow* downstream : downstreams) {
0168 if (downstream != nullptr) {
0169
0170 connect(upstream, 0, downstream, 0);
0171 return;
0172 }
0173 }
0174 }
0175
0176
0177 void JTopologyBuilder::attach_level(JEventLevel current_level, JUnfoldArrow* parent_unfolder, JFoldArrow* parent_folder) {
0178 std::stringstream ss;
0179 ss << current_level;
0180 auto level_str = ss.str();
0181
0182
0183 std::vector<JEventSource*> sources_at_level;
0184 for (JEventSource* source : m_components->get_evt_srces()) {
0185 if (source->GetLevel() == current_level) {
0186 sources_at_level.push_back(source);
0187 }
0188 }
0189
0190
0191 std::vector<JEventUnfolder*> unfolders_at_level;
0192 for (JEventUnfolder* unfolder : m_components->get_unfolders()) {
0193 if (unfolder->GetLevel() == current_level) {
0194 unfolders_at_level.push_back(unfolder);
0195 }
0196 }
0197
0198
0199 std::vector<JEventProcessor*> mappable_procs_at_level;
0200 std::vector<JEventProcessor*> tappable_procs_at_level;
0201
0202 for (JEventProcessor* proc : m_components->get_evt_procs()) {
0203 if (proc->GetLevel() == current_level) {
0204 mappable_procs_at_level.push_back(proc);
0205 if (proc->GetCallbackStyle() != JEventProcessor::CallbackStyle::LegacyMode) {
0206 tappable_procs_at_level.push_back(proc);
0207 }
0208 }
0209 }
0210
0211
0212 bool is_top_level = (parent_unfolder == nullptr);
0213 if (is_top_level && sources_at_level.size() == 0) {
0214
0215 LOG_TRACE(GetLogger()) << "JTopologyBuilder: No sources found at level " << current_level << ", skipping" << LOG_END;
0216 JEventLevel next = next_level(current_level);
0217 if (next == JEventLevel::None) {
0218 LOG_WARN(GetLogger()) << "No sources found: Processing topology will be empty." << LOG_END;
0219 return;
0220 }
0221 return attach_level(next, nullptr, nullptr);
0222 }
0223
0224
0225 if (!is_top_level && !sources_at_level.empty()) {
0226 throw JException("Topology forbids event sources at lower event levels in the topology");
0227 }
0228 if ((parent_unfolder == nullptr && parent_folder != nullptr) || (parent_unfolder != nullptr && parent_folder == nullptr)) {
0229 throw JException("Topology requires matching unfolder/folder arrow pairs");
0230 }
0231 if (unfolders_at_level.size() > 1) {
0232 throw JException("Multiple JEventUnfolders provided for level %s", level_str.c_str());
0233 }
0234
0235
0236
0237
0238
0239
0240
0241
0242 JEventPool* pool_at_level = new JEventPool(m_components, m_max_inflight_events, m_location_count, current_level);
0243 pools.push_back(pool_at_level);
0244 LOG_INFO(GetLogger()) << "Created event pool with level=" << current_level << " and size=" << m_max_inflight_events;
0245
0246
0247
0248
0249 JEventSourceArrow* src_arrow = nullptr;
0250 bool need_source = !sources_at_level.empty();
0251 if (need_source) {
0252 src_arrow = new JEventSourceArrow(level_str+"Source", sources_at_level);
0253 src_arrow->attach(pool_at_level, JEventSourceArrow::EVENT_IN);
0254 src_arrow->attach(pool_at_level, JEventSourceArrow::EVENT_OUT);
0255 arrows.push_back(src_arrow);
0256 }
0257
0258
0259
0260
0261 bool have_parallel_sources = false;
0262 for (JEventSource* source: sources_at_level) {
0263 have_parallel_sources |= source->IsPreprocessEnabled();
0264 }
0265 bool have_unfolder = !unfolders_at_level.empty();
0266 JEventMapArrow* map1_arrow = nullptr;
0267 bool need_map1 = (have_parallel_sources || have_unfolder);
0268
0269 if (need_map1) {
0270 map1_arrow = new JEventMapArrow(level_str+"Map1");
0271 for (JEventSource* source: sources_at_level) {
0272 if (source->IsPreprocessEnabled()) {
0273 map1_arrow->add_source(source);
0274 }
0275 }
0276 for (JEventUnfolder* unf: unfolders_at_level) {
0277 map1_arrow->add_unfolder(unf);
0278 }
0279 map1_arrow->attach(pool_at_level, JEventMapArrow::EVENT_IN);
0280 map1_arrow->attach(pool_at_level, JEventMapArrow::EVENT_OUT);
0281 arrows.push_back(map1_arrow);
0282 }
0283
0284
0285
0286
0287 JUnfoldArrow* unfold_arrow = nullptr;
0288 bool need_unfold = have_unfolder;
0289 if (need_unfold) {
0290 unfold_arrow = new JUnfoldArrow(level_str+"Unfold", unfolders_at_level[0]);
0291 arrows.push_back(unfold_arrow);
0292 }
0293
0294
0295
0296
0297 JFoldArrow* fold_arrow = nullptr;
0298 bool need_fold = have_unfolder;
0299 if(need_fold) {
0300 fold_arrow = new JFoldArrow(level_str+"Fold", current_level, unfolders_at_level[0]->GetChildLevel());
0301 arrows.push_back(fold_arrow);
0302 fold_arrow->attach(pool_at_level, JFoldArrow::PARENT_OUT);
0303 }
0304
0305
0306
0307
0308 JEventMapArrow* map2_arrow = nullptr;
0309 bool need_map2 = !mappable_procs_at_level.empty();
0310 if (need_map2) {
0311 map2_arrow = new JEventMapArrow(level_str+"Map2");
0312 for (JEventProcessor* proc : mappable_procs_at_level) {
0313 map2_arrow->add_processor(proc);
0314 map2_arrow->attach(pool_at_level, JEventMapArrow::EVENT_IN);
0315 map2_arrow->attach(pool_at_level, JEventMapArrow::EVENT_OUT);
0316 }
0317 arrows.push_back(map2_arrow);
0318 }
0319
0320
0321
0322
0323 JEventTapArrow* tap_arrow = nullptr;
0324 bool need_tap = !tappable_procs_at_level.empty();
0325 if (need_tap) {
0326 tap_arrow = new JEventTapArrow(level_str+"Tap");
0327 for (JEventProcessor* proc : tappable_procs_at_level) {
0328 tap_arrow->add_processor(proc);
0329 tap_arrow->attach(pool_at_level, JEventTapArrow::EVENT_IN);
0330 tap_arrow->attach(pool_at_level, JEventTapArrow::EVENT_OUT);
0331 }
0332 arrows.push_back(tap_arrow);
0333 }
0334
0335
0336
0337
0338
0339
0340 if (parent_unfolder != nullptr) {
0341 parent_unfolder->attach(pool_at_level, JUnfoldArrow::CHILD_IN);
0342 connect_to_first_available(parent_unfolder, {map1_arrow, unfold_arrow, map2_arrow, tap_arrow, parent_folder});
0343 }
0344 if (src_arrow != nullptr) {
0345 connect_to_first_available(src_arrow, {map1_arrow, unfold_arrow, map2_arrow, tap_arrow, parent_folder});
0346 }
0347 if (map1_arrow != nullptr) {
0348 connect_to_first_available(map1_arrow, {unfold_arrow, map2_arrow, tap_arrow, parent_folder});
0349 }
0350 if (fold_arrow != nullptr) {
0351 connect_to_first_available(fold_arrow, {map2_arrow, tap_arrow, parent_folder});
0352 }
0353 if (map2_arrow != nullptr) {
0354 connect_to_first_available(map2_arrow, {tap_arrow, parent_folder});
0355 }
0356 if (tap_arrow != nullptr) {
0357 connect_to_first_available(tap_arrow, {parent_folder});
0358 }
0359 if (parent_folder != nullptr) {
0360 parent_folder->attach(pool_at_level, JFoldArrow::CHILD_OUT);
0361 }
0362
0363
0364 if (need_unfold) {
0365 auto next_level = unfolders_at_level[0]->GetChildLevel();
0366 attach_level(next_level, unfold_arrow, fold_arrow);
0367 }
0368 else {
0369
0370
0371 if (tap_arrow != nullptr) {
0372 tap_arrow->set_is_sink(true);
0373 }
0374 else if (map2_arrow != nullptr) {
0375 map2_arrow->set_is_sink(true);
0376 }
0377 else if (map1_arrow != nullptr) {
0378 map1_arrow->set_is_sink(true);
0379 }
0380 else if (src_arrow != nullptr) {
0381 src_arrow->set_is_sink(true);
0382 }
0383 }
0384 }
0385
0386
0387