File indexing completed on 2025-01-18 09:50:15
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 #include <boost/assert.hpp>
0011 #include <boost/property_map/parallel/distributed_property_map.hpp>
0012 #include <boost/property_map/parallel/detail/untracked_pair.hpp>
0013 #include <boost/type_traits/is_base_and_derived.hpp>
0014 #include <boost/property_map/parallel/simple_trigger.hpp>
0015
0016 namespace boost { namespace parallel {
0017
0018 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0019 template<typename Reduce>
0020 PBGL_DISTRIB_PMAP
0021 ::distributed_property_map(const ProcessGroup& pg, const GlobalMap& global,
0022 const StorageMap& pm, const Reduce& reduce)
0023 : data(new data_t(pg, global, pm, reduce, Reduce::non_default_resolver))
0024 {
0025 typedef handle_message<Reduce> Handler;
0026
0027 data->ghost_cells.reset(new ghost_cells_type());
0028 data->reset = &data_t::template do_reset<Reduce>;
0029 data->process_group.replace_handler(Handler(data, reduce));
0030 data->process_group.template get_receiver<Handler>()
0031 ->setup_triggers(data->process_group);
0032 }
0033
0034 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0035 PBGL_DISTRIB_PMAP::~distributed_property_map() { }
0036
0037 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0038 template<typename Reduce>
0039 void
0040 PBGL_DISTRIB_PMAP::set_reduce(const Reduce& reduce)
0041 {
0042 typedef handle_message<Reduce> Handler;
0043 data->process_group.replace_handler(Handler(data, reduce));
0044 Handler* handler = data->process_group.template get_receiver<Handler>();
0045 BOOST_ASSERT(handler);
0046 handler->setup_triggers(data->process_group);
0047 data->get_default_value = reduce;
0048 data->has_default_resolver = Reduce::non_default_resolver;
0049 int model = data->model;
0050 data->reset = &data_t::template do_reset<Reduce>;
0051 set_consistency_model(model);
0052 }
0053
0054 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0055 void PBGL_DISTRIB_PMAP::prune_ghost_cells() const
0056 {
0057 if (data->max_ghost_cells == 0)
0058 return;
0059
0060 while (data->ghost_cells->size() > data->max_ghost_cells) {
0061
0062
0063 if (data->model & cm_flush) {
0064
0065 boost::parallel::detail::untracked_pair<key_type, value_type> const& victim
0066 = data->ghost_cells->back();
0067 send(data->process_group, get(data->global, victim.first).first,
0068 property_map_put, victim);
0069 }
0070
0071
0072 data->ghost_cells->pop_back();
0073 }
0074 }
0075
0076 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0077 typename PBGL_DISTRIB_PMAP::value_type&
0078 PBGL_DISTRIB_PMAP::cell(const key_type& key, bool request_if_missing) const
0079 {
0080
0081 ghost_cells_key_index_type const& key_index
0082 = data->ghost_cells->template get<1>();
0083
0084
0085 iterator ghost_cell
0086 = data->ghost_cells->template project<0>(key_index.find(key));
0087 if (ghost_cell == data->ghost_cells->end()) {
0088 value_type value;
0089 if (data->has_default_resolver)
0090
0091
0092 value = data->get_default_value(key);
0093 else if (request_if_missing)
0094
0095 send_oob_with_reply(data->process_group, get(data->global, key).first,
0096 property_map_get, key, value);
0097 else
0098 value = value_type();
0099
0100
0101 ghost_cell
0102 = data->ghost_cells->push_front(std::make_pair(key, value)).first;
0103
0104
0105 if (data->max_ghost_cells > 0)
0106 prune_ghost_cells();
0107 } else if (data->max_ghost_cells > 0)
0108
0109 data->ghost_cells->relocate(data->ghost_cells->begin(), ghost_cell);
0110
0111 return const_cast<value_type&>(ghost_cell->second);
0112 }
0113
0114 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0115 template<typename Reduce>
0116 void
0117 PBGL_DISTRIB_PMAP
0118 ::handle_message<Reduce>::operator()(process_id_type source, int tag)
0119 {
0120 BOOST_ASSERT(false);
0121 }
0122
0123 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0124 template<typename Reduce>
0125 void
0126 PBGL_DISTRIB_PMAP::handle_message<Reduce>::
0127 handle_put(int , int ,
0128 const boost::parallel::detail::untracked_pair<key_type, value_type>& req, trigger_receive_context)
0129 {
0130 using boost::get;
0131
0132 shared_ptr<data_t> data(data_ptr);
0133
0134 owner_local_pair p = get(data->global, req.first);
0135 BOOST_ASSERT(p.first == process_id(data->process_group));
0136
0137 detail::maybe_put(data->storage, p.second,
0138 reduce(req.first,
0139 get(data->storage, p.second),
0140 req.second));
0141 }
0142
0143 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0144 template<typename Reduce>
0145 typename PBGL_DISTRIB_PMAP::value_type
0146 PBGL_DISTRIB_PMAP::handle_message<Reduce>::
0147 handle_get(int source, int , const key_type& key,
0148 trigger_receive_context)
0149 {
0150 using boost::get;
0151
0152 shared_ptr<data_t> data(data_ptr);
0153 BOOST_ASSERT(data);
0154
0155 owner_local_pair p = get(data->global, key);
0156 return get(data->storage, p.second);
0157 }
0158
0159 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0160 template<typename Reduce>
0161 void
0162 PBGL_DISTRIB_PMAP::handle_message<Reduce>::
0163 handle_multiget(int source, int tag, const std::vector<key_type>& keys,
0164 trigger_receive_context)
0165 {
0166 shared_ptr<data_t> data(data_ptr);
0167 BOOST_ASSERT(data);
0168
0169 typedef boost::parallel::detail::untracked_pair<key_type, value_type> key_value;
0170 std::vector<key_value> results;
0171 std::size_t n = keys.size();
0172 results.reserve(n);
0173
0174 using boost::get;
0175
0176 for (std::size_t i = 0; i < n; ++i) {
0177 local_key_type local_key = get(data->global, keys[i]).second;
0178 results.push_back(key_value(keys[i], get(data->storage, local_key)));
0179 }
0180 send(data->process_group, source, property_map_multiget_reply, results);
0181 }
0182
0183 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0184 template<typename Reduce>
0185 void
0186 PBGL_DISTRIB_PMAP::handle_message<Reduce>::
0187 handle_multiget_reply
0188 (int source, int tag,
0189 const std::vector<boost::parallel::detail::untracked_pair<key_type, value_type> >& msg,
0190 trigger_receive_context)
0191 {
0192 shared_ptr<data_t> data(data_ptr);
0193 BOOST_ASSERT(data);
0194
0195
0196 ghost_cells_key_index_type const& key_index
0197 = data->ghost_cells->template get<1>();
0198
0199 std::size_t n = msg.size();
0200 for (std::size_t i = 0; i < n; ++i) {
0201
0202 iterator position
0203 = data->ghost_cells->template project<0>(key_index.find(msg[i].first));
0204
0205 if (position != data->ghost_cells->end())
0206 const_cast<value_type&>(position->second) = msg[i].second;
0207 }
0208 }
0209
0210 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0211 template<typename Reduce>
0212 void
0213 PBGL_DISTRIB_PMAP::handle_message<Reduce>::
0214 handle_multiput
0215 (int source, int tag,
0216 const std::vector<unsafe_pair<local_key_type, value_type> >& values,
0217 trigger_receive_context)
0218 {
0219 using boost::get;
0220
0221 shared_ptr<data_t> data(data_ptr);
0222 BOOST_ASSERT(data);
0223
0224 std::size_t n = values.size();
0225 for (std::size_t i = 0; i < n; ++i) {
0226 local_key_type local_key = values[i].first;
0227 value_type local_value = get(data->storage, local_key);
0228 detail::maybe_put(data->storage, values[i].first,
0229 reduce(values[i].first,
0230 local_value,
0231 values[i].second));
0232 }
0233 }
0234
0235 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0236 template<typename Reduce>
0237 void
0238 PBGL_DISTRIB_PMAP::handle_message<Reduce>::
0239 setup_triggers(process_group_type& pg)
0240 {
0241 using boost::parallel::simple_trigger;
0242
0243 simple_trigger(pg, property_map_put, this, &handle_message::handle_put);
0244 simple_trigger(pg, property_map_get, this, &handle_message::handle_get);
0245 simple_trigger(pg, property_map_multiget, this,
0246 &handle_message::handle_multiget);
0247 simple_trigger(pg, property_map_multiget_reply, this,
0248 &handle_message::handle_multiget_reply);
0249 simple_trigger(pg, property_map_multiput, this,
0250 &handle_message::handle_multiput);
0251 }
0252
0253 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0254 void
0255 PBGL_DISTRIB_PMAP
0256 ::on_synchronize::operator()()
0257 {
0258 int stage=0;
0259 shared_ptr<data_t> data(data_ptr);
0260 BOOST_ASSERT(data);
0261
0262
0263 int backward_stage = -1;
0264 if (data->model & cm_backward) {
0265 if (data->model & cm_flush) backward_stage = 1;
0266 else backward_stage = 0;
0267 }
0268
0269
0270 if (stage == 0 && data->model & cm_flush)
0271 data->flush();
0272
0273
0274 if (stage == backward_stage && !(data->model & (cm_clear | cm_reset)))
0275 data->refresh_ghost_cells();
0276
0277
0278 if (data->model & cm_clear)
0279 data->clear();
0280
0281
0282 if (data->model & cm_reset) {
0283 if (data->reset) ((*data).*data->reset)();
0284 }
0285 }
0286
0287
0288 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0289 void
0290 PBGL_DISTRIB_PMAP::set_consistency_model(int model)
0291 {
0292 data->model = model;
0293
0294 bool need_on_synchronize = (model != cm_forward);
0295
0296
0297 if (model & cm_backward) {
0298
0299
0300 data->max_ghost_cells = 0;
0301 }
0302
0303
0304 if (need_on_synchronize)
0305 data->process_group.replace_on_synchronize_handler(on_synchronize(data));
0306 }
0307
0308 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0309 void
0310 PBGL_DISTRIB_PMAP::set_max_ghost_cells(std::size_t max_ghost_cells)
0311 {
0312 if ((data->model & cm_backward) && max_ghost_cells > 0)
0313 boost::throw_exception(std::runtime_error("distributed_property_map::set_max_ghost_cells: "
0314 "cannot limit ghost-cell usage with a backward "
0315 "consistency model"));
0316
0317 if (max_ghost_cells == 1)
0318
0319
0320 max_ghost_cells = 2;
0321
0322 data->max_ghost_cells = max_ghost_cells;
0323 prune_ghost_cells();
0324 }
0325
0326 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0327 void PBGL_DISTRIB_PMAP::clear()
0328 {
0329 data->clear();
0330 }
0331
0332 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0333 void PBGL_DISTRIB_PMAP::data_t::clear()
0334 {
0335 ghost_cells->clear();
0336 }
0337
0338 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0339 void PBGL_DISTRIB_PMAP::reset()
0340 {
0341 if (data->reset) ((*data).*data->reset)();
0342 }
0343
0344 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0345 void PBGL_DISTRIB_PMAP::flush()
0346 {
0347 data->flush();
0348 }
0349
0350 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0351 void PBGL_DISTRIB_PMAP::data_t::refresh_ghost_cells()
0352 {
0353 using boost::get;
0354
0355 std::vector<std::vector<key_type> > keys;
0356 keys.resize(num_processes(process_group));
0357
0358
0359 for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i)
0360 keys[get(global, i->first).first].push_back(i->first);
0361
0362
0363 typedef typename ProcessGroup::process_size_type process_size_type;
0364 process_size_type n = num_processes(process_group);
0365 process_id_type id = process_id(process_group);
0366 for (process_size_type p = (id + 1) % n ; p != id ; p = (p + 1) % n) {
0367 if (!keys[p].empty())
0368 send(process_group, p, property_map_multiget, keys[p]);
0369 }
0370 }
0371
0372 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0373 void PBGL_DISTRIB_PMAP::data_t::flush()
0374 {
0375 using boost::get;
0376
0377 int n = num_processes(process_group);
0378 std::vector<std::vector<unsafe_pair<local_key_type, value_type> > > values;
0379 values.resize(n);
0380
0381
0382 for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i) {
0383 std::pair<int, local_key_type> g = get(global, i->first);
0384 values[g.first].push_back(std::make_pair(g.second, i->second));
0385 }
0386
0387
0388 for (int p = 0; p < n; ++p) {
0389 if (!values[p].empty())
0390 send(process_group, p, property_map_multiput, values[p]);
0391 }
0392 }
0393
0394 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0395 void PBGL_DISTRIB_PMAP::do_synchronize()
0396 {
0397 if (data->model & cm_backward) {
0398 synchronize(data->process_group);
0399 return;
0400 }
0401
0402
0403 data->refresh_ghost_cells();
0404
0405
0406 synchronize(data->process_group);
0407
0408
0409 synchronize(data->process_group);
0410 }
0411
0412 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0413 template<typename Resolver>
0414 void PBGL_DISTRIB_PMAP::data_t::do_reset()
0415 {
0416 Resolver* resolver = get_default_value.template target<Resolver>();
0417 BOOST_ASSERT(resolver);
0418
0419 for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i)
0420 const_cast<value_type&>(i->second) = (*resolver)(i->first);
0421 }
0422
0423 } }