File indexing completed on 2025-01-18 09:37:16
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 #ifndef BOOST_GRAPH_PARALLEL_CC_PS_HPP
0011 #define BOOST_GRAPH_PARALLEL_CC_PS_HPP
0012
0013 #ifndef BOOST_GRAPH_USE_MPI
0014 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
0015 #endif
0016
0017 #include <boost/assert.hpp>
0018 #include <boost/property_map/property_map.hpp>
0019 #include <boost/property_map/parallel/parallel_property_maps.hpp>
0020 #include <boost/graph/parallel/algorithm.hpp>
0021 #include <boost/pending/indirect_cmp.hpp>
0022 #include <boost/graph/graph_traits.hpp>
0023 #include <boost/graph/overloading.hpp>
0024 #include <boost/graph/distributed/concepts.hpp>
0025 #include <boost/graph/parallel/properties.hpp>
0026 #include <boost/graph/parallel/process_group.hpp>
0027 #include <boost/optional.hpp>
0028 #include <algorithm>
0029 #include <vector>
0030 #include <queue>
0031 #include <limits>
0032 #include <map>
0033 #include <boost/graph/parallel/container_traits.hpp>
0034 #include <boost/graph/iteration_macros.hpp>
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044
0045
0046
0047
0048
0049
0050
0051
0052
0053
0054
0055
0056
0057
0058
0059
0060
0061
0062 namespace boost { namespace graph { namespace distributed {
0063 namespace cc_ps_detail {
0064
0065
0066
0067
0068
0069 template<typename component_value_type>
0070 class component_value_allocator {
0071 public:
0072 component_value_allocator(int num, int size) :
0073 last(0), num(num), size(size)
0074 {
0075 }
0076
0077 component_value_type allocate(void)
0078 {
0079 component_value_type ret = num + (last * size);
0080 last++;
0081 return ret;
0082 }
0083
0084 private:
0085 component_value_type last;
0086 int num;
0087 int size;
0088 };
0089
0090
0091
0092
0093
0094
0095
0096
0097
0098
0099
0100
0101
0102
0103
0104 template<typename component_value_type>
0105 class collision_map {
0106 public:
0107 collision_map() : num_unique(0)
0108 {
0109 }
0110
0111
0112
0113
0114 void add(const component_value_type &a)
0115 {
0116 BOOST_ASSERT(collisions.count(a) == 0);
0117 collisions[a] = a;
0118 }
0119
0120
0121
0122 void add(const component_value_type &a, const component_value_type &b)
0123 {
0124 component_value_type high, low, tmp;
0125 if (a > b) {
0126 high = a;
0127 low = b;
0128 } else {
0129 high = b;
0130 low = a;
0131 }
0132
0133 if (collisions.count(high) != 0 && collisions[high] != low) {
0134 tmp = collisions[high];
0135 if (tmp > low) {
0136 collisions[tmp] = low;
0137 collisions[high] = low;
0138 } else {
0139 collisions[low] = tmp;
0140 collisions[high] = tmp;
0141 }
0142 } else {
0143 collisions[high] = low;
0144 }
0145
0146 }
0147
0148
0149
0150 component_value_type update(component_value_type a)
0151 {
0152 BOOST_ASSERT(num_unique > 0);
0153 BOOST_ASSERT(collisions.count(a) != 0);
0154 return collisions[a];
0155 }
0156
0157
0158
0159 void uniqify(void)
0160 {
0161 typename std::map<component_value_type, component_value_type>::iterator i, end;
0162
0163 end = collisions.end();
0164 for (i = collisions.begin() ; i != end ; ++i) {
0165 if (i->first == i->second) {
0166 num_unique++;
0167 } else {
0168 i->second = collisions[i->second];
0169 }
0170 }
0171 }
0172
0173
0174
0175
0176
0177 int unique(void)
0178 {
0179 BOOST_ASSERT(num_unique > 0);
0180 return num_unique;
0181 }
0182
0183
0184 std::vector<component_value_type> serialize(void)
0185 {
0186 std::vector<component_value_type> ret;
0187 typename std::map<component_value_type, component_value_type>::iterator i, end;
0188
0189 end = collisions.end();
0190 for (i = collisions.begin() ; i != end ; ++i) {
0191 ret.push_back(i->first);
0192 ret.push_back(i->second);
0193 }
0194
0195 return ret;
0196 }
0197
0198 private:
0199 std::map<component_value_type, component_value_type> collisions;
0200 int num_unique;
0201 };
0202
0203
0204
0205
0206
0207
0208
0209
0210
0211 template<typename ComponentMap, typename work_queue>
0212 struct update_reducer {
0213 BOOST_STATIC_CONSTANT(bool, non_default_resolver = false);
0214
0215 typedef typename property_traits<ComponentMap>::value_type component_value_type;
0216 typedef typename property_traits<ComponentMap>::key_type vertex_descriptor;
0217
0218 update_reducer(work_queue *q,
0219 cc_ps_detail::collision_map<component_value_type> *collisions,
0220 processor_id_type pg_id) :
0221 q(q), collisions(collisions), pg_id(pg_id)
0222 {
0223 }
0224
0225
0226
0227 template<typename K>
0228 component_value_type operator()(const K&) const
0229 {
0230 return component_value_type(0);
0231 }
0232
0233
0234
0235
0236
0237
0238
0239
0240
0241
0242 component_value_type operator()(const vertex_descriptor &v,
0243 const component_value_type& current,
0244 const component_value_type& update) const
0245 {
0246 const component_value_type max = (std::numeric_limits<component_value_type>::max)();
0247 component_value_type ret = current;
0248
0249 if (max == current) {
0250 q->push(v);
0251 ret = update;
0252 } else if (current != update) {
0253 collisions->add(current, update);
0254 }
0255
0256 return ret;
0257 }
0258
0259
0260
0261
0262
0263
0264
0265 template<typename K>
0266 component_value_type operator()(const K& v,
0267 const component_value_type& current,
0268 const component_value_type& update) const
0269 {
0270 return (*this)(vertex_descriptor(pg_id, v), current, update);
0271 }
0272
0273 private:
0274 work_queue *q;
0275 collision_map<component_value_type> *collisions;
0276 boost::processor_id_type pg_id;
0277 };
0278
0279 }
0280
0281
0282 template<typename Graph, typename ComponentMap>
0283 typename property_traits<ComponentMap>::value_type
0284 connected_components_ps(const Graph& g, ComponentMap c)
0285 {
0286 using boost::graph::parallel::process_group;
0287
0288 typedef typename property_traits<ComponentMap>::value_type component_value_type;
0289 typedef typename graph_traits<Graph>::vertex_iterator vertex_iterator;
0290 typedef typename graph_traits<Graph>::vertex_descriptor vertex_descriptor;
0291 typedef typename boost::graph::parallel::process_group_type<Graph>
0292 ::type process_group_type;
0293 typedef typename process_group_type::process_id_type process_id_type;
0294 typedef std::queue<vertex_descriptor> work_queue;
0295
0296 static const component_value_type max_component =
0297 (std::numeric_limits<component_value_type>::max)();
0298 typename property_map<Graph, vertex_owner_t>::const_type
0299 owner = get(vertex_owner, g);
0300
0301
0302 process_group_type pg = process_group(g);
0303 process_id_type id = process_id(pg);
0304
0305
0306 BGL_FORALL_VERTICES_T(v, g, Graph) put(c, v, max_component);
0307
0308 vertex_iterator current, end;
0309 boost::tie(current, end) = vertices(g);
0310
0311 cc_ps_detail::component_value_allocator<component_value_type> cva(process_id(pg), num_processes(pg));
0312 cc_ps_detail::collision_map<component_value_type> collisions;
0313 work_queue q;
0314 c.set_reduce(cc_ps_detail::update_reducer<ComponentMap, work_queue>(&q, &collisions, id));
0315
0316
0317 while (true) {
0318 bool useful_found = false;
0319 component_value_type val = cva.allocate();
0320 put(c, *current, val);
0321 collisions.add(val);
0322 q.push(*current);
0323 if (0 != out_degree(*current, g)) useful_found = true;
0324 ++current;
0325 if (useful_found) break;
0326 }
0327
0328
0329 bool global_done = false;
0330 while (!global_done) {
0331
0332
0333 while (!q.empty()) {
0334 vertex_descriptor v = q.front();
0335 q.pop();
0336
0337
0338
0339
0340
0341 BGL_FORALL_ADJ_T(v, peer, g, Graph) {
0342 component_value_type my_component = get(c, v);
0343
0344
0345
0346
0347
0348 if (id == get(owner, peer)) {
0349 if (max_component == get(c, peer)) {
0350 put(c, peer, my_component);
0351 q.push(peer);
0352 } else if (my_component != get(c, peer)) {
0353 collisions.add(my_component, get(c, peer));
0354 }
0355 } else {
0356 put(c, peer, my_component);
0357 }
0358 }
0359 }
0360
0361
0362 synchronize(pg);
0363 global_done = all_reduce(pg, (q.empty() && (current == end)), boost::parallel::minimum<bool>());
0364
0365
0366
0367
0368
0369
0370
0371 if (q.empty()) {
0372 bool useful_found = false;
0373 for ( ; current != end && !useful_found ; ++current) {
0374 if (max_component == get(c, *current)) {
0375 component_value_type val = cva.allocate();
0376 put(c, *current, val);
0377 collisions.add(val);
0378 q.push(*current);
0379 if (0 != out_degree(*current, g)) useful_found = true;
0380 }
0381 }
0382 }
0383 }
0384
0385
0386 std::vector<component_value_type> global;
0387 std::vector<component_value_type> mine = collisions.serialize();
0388 all_gather(pg, mine.begin(), mine.end(), global);
0389 for (size_t i = 0 ; i < global.size() ; i += 2) {
0390 collisions.add(global[i], global[i + 1]);
0391 }
0392 collisions.uniqify();
0393
0394
0395 BGL_FORALL_VERTICES_T(v, g, Graph) {
0396 put(c, v, collisions.update(get(c, v)));
0397 }
0398
0399 return collisions.unique();
0400 }
0401
0402 }
0403
0404 }
0405
0406 }
0407
0408 #endif