Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 09:50:15

0001 // Copyright (C) 2004-2006 The Trustees of Indiana University.
0002 
0003 // Use, modification and distribution is subject to the Boost Software
0004 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
0005 // http://www.boost.org/LICENSE_1_0.txt)
0006 
0007 //  Authors: Douglas Gregor
0008 //           Nick Edmonds
0009 //           Andrew Lumsdaine
0010 #include <boost/assert.hpp>
0011 #include <boost/property_map/parallel/distributed_property_map.hpp>
0012 #include <boost/property_map/parallel/detail/untracked_pair.hpp>
0013 #include <boost/type_traits/is_base_and_derived.hpp>
0014 #include <boost/property_map/parallel/simple_trigger.hpp>
0015 
0016 namespace boost { namespace parallel {
0017 
0018 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0019 template<typename Reduce>
0020 PBGL_DISTRIB_PMAP
0021 ::distributed_property_map(const ProcessGroup& pg, const GlobalMap& global,
0022                            const StorageMap& pm, const Reduce& reduce)
0023   : data(new data_t(pg, global, pm, reduce, Reduce::non_default_resolver))
0024 {
0025   typedef handle_message<Reduce> Handler;
0026 
0027   data->ghost_cells.reset(new ghost_cells_type());
0028   data->reset = &data_t::template do_reset<Reduce>;
0029   data->process_group.replace_handler(Handler(data, reduce));
0030   data->process_group.template get_receiver<Handler>()
0031     ->setup_triggers(data->process_group);
0032 }
0033 
0034 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0035 PBGL_DISTRIB_PMAP::~distributed_property_map() { }
0036 
0037 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0038 template<typename Reduce>
0039 void 
0040 PBGL_DISTRIB_PMAP::set_reduce(const Reduce& reduce)
0041 {
0042   typedef handle_message<Reduce> Handler;
0043   data->process_group.replace_handler(Handler(data, reduce));
0044   Handler* handler = data->process_group.template get_receiver<Handler>();
0045   BOOST_ASSERT(handler);
0046   handler->setup_triggers(data->process_group);
0047   data->get_default_value = reduce;
0048   data->has_default_resolver = Reduce::non_default_resolver;
0049   int model = data->model;
0050   data->reset = &data_t::template do_reset<Reduce>;
0051   set_consistency_model(model);
0052 }
0053 
0054 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0055 void PBGL_DISTRIB_PMAP::prune_ghost_cells() const
0056 {
0057   if (data->max_ghost_cells == 0)
0058     return;
0059 
0060   while (data->ghost_cells->size() > data->max_ghost_cells) {
0061     // Evict the last ghost cell
0062 
0063     if (data->model & cm_flush) {
0064       // We need to flush values when we evict them.
0065       boost::parallel::detail::untracked_pair<key_type, value_type> const& victim
0066         = data->ghost_cells->back();
0067       send(data->process_group, get(data->global, victim.first).first, 
0068            property_map_put, victim);
0069     }
0070 
0071     // Actually remove the ghost cell
0072     data->ghost_cells->pop_back();
0073   }
0074 }
0075 
0076 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0077 typename PBGL_DISTRIB_PMAP::value_type&
0078 PBGL_DISTRIB_PMAP::cell(const key_type& key, bool request_if_missing) const
0079 {
0080   // Index by key
0081   ghost_cells_key_index_type const& key_index 
0082     = data->ghost_cells->template get<1>();
0083 
0084   // Search for the ghost cell by key, and project back to the sequence
0085   iterator ghost_cell 
0086     = data->ghost_cells->template project<0>(key_index.find(key));
0087   if (ghost_cell == data->ghost_cells->end()) {
0088     value_type value;
0089     if (data->has_default_resolver)
0090       // Since we have a default resolver, use it to create a default
0091       // value for this ghost cell.
0092       value = data->get_default_value(key);
0093     else if (request_if_missing)
0094       // Request the actual value of this key from its owner
0095       send_oob_with_reply(data->process_group, get(data->global, key).first, 
0096                           property_map_get, key, value);
0097     else
0098       value = value_type();
0099 
0100     // Create a ghost cell containing the new value
0101     ghost_cell 
0102       = data->ghost_cells->push_front(std::make_pair(key, value)).first;
0103 
0104     // If we need to, prune the ghost cells
0105     if (data->max_ghost_cells > 0)
0106       prune_ghost_cells();
0107   } else if (data->max_ghost_cells > 0)
0108     // Put this cell at the beginning of the MRU list
0109     data->ghost_cells->relocate(data->ghost_cells->begin(), ghost_cell);
0110 
0111   return const_cast<value_type&>(ghost_cell->second);
0112 }
0113 
0114 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0115 template<typename Reduce>
0116 void
0117 PBGL_DISTRIB_PMAP
0118 ::handle_message<Reduce>::operator()(process_id_type source, int tag)
0119 {
0120   BOOST_ASSERT(false);
0121 }
0122 
0123 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0124 template<typename Reduce>
0125 void
0126 PBGL_DISTRIB_PMAP::handle_message<Reduce>::
0127 handle_put(int /*source*/, int /*tag*/, 
0128            const boost::parallel::detail::untracked_pair<key_type, value_type>& req, trigger_receive_context)
0129 {
0130   using boost::get;
0131 
0132   shared_ptr<data_t> data(data_ptr);
0133 
0134   owner_local_pair p = get(data->global, req.first);
0135   BOOST_ASSERT(p.first == process_id(data->process_group));
0136 
0137   detail::maybe_put(data->storage, p.second,
0138                     reduce(req.first,
0139                            get(data->storage, p.second),
0140                            req.second));
0141 }
0142 
0143 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0144 template<typename Reduce>
0145 typename PBGL_DISTRIB_PMAP::value_type
0146 PBGL_DISTRIB_PMAP::handle_message<Reduce>::
0147 handle_get(int source, int /*tag*/, const key_type& key, 
0148            trigger_receive_context)
0149 {
0150   using boost::get;
0151 
0152   shared_ptr<data_t> data(data_ptr);
0153   BOOST_ASSERT(data);
0154 
0155   owner_local_pair p = get(data->global, key);
0156   return get(data->storage, p.second);
0157 }
0158 
0159 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0160 template<typename Reduce>
0161 void
0162 PBGL_DISTRIB_PMAP::handle_message<Reduce>::
0163 handle_multiget(int source, int tag, const std::vector<key_type>& keys,
0164                 trigger_receive_context)
0165 {
0166   shared_ptr<data_t> data(data_ptr);
0167   BOOST_ASSERT(data);
0168 
0169   typedef boost::parallel::detail::untracked_pair<key_type, value_type> key_value;
0170   std::vector<key_value> results;
0171   std::size_t n = keys.size();
0172   results.reserve(n);
0173 
0174   using boost::get;
0175 
0176   for (std::size_t i = 0; i < n; ++i) {
0177     local_key_type local_key = get(data->global, keys[i]).second;
0178     results.push_back(key_value(keys[i], get(data->storage, local_key)));
0179   }
0180   send(data->process_group, source, property_map_multiget_reply, results);
0181 }
0182 
0183 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0184 template<typename Reduce>
0185 void
0186 PBGL_DISTRIB_PMAP::handle_message<Reduce>::
0187 handle_multiget_reply
0188   (int source, int tag, 
0189    const std::vector<boost::parallel::detail::untracked_pair<key_type, value_type> >& msg,
0190    trigger_receive_context)
0191 {
0192   shared_ptr<data_t> data(data_ptr);
0193   BOOST_ASSERT(data);
0194 
0195   // Index by key
0196   ghost_cells_key_index_type const& key_index 
0197     = data->ghost_cells->template get<1>();
0198 
0199   std::size_t n = msg.size();
0200   for (std::size_t i = 0; i < n; ++i) {
0201     // Search for the ghost cell by key, and project back to the sequence
0202     iterator position
0203       = data->ghost_cells->template project<0>(key_index.find(msg[i].first));
0204 
0205     if (position != data->ghost_cells->end())
0206       const_cast<value_type&>(position->second) = msg[i].second;
0207   }
0208 }
0209 
0210 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0211 template<typename Reduce>
0212 void
0213 PBGL_DISTRIB_PMAP::handle_message<Reduce>::
0214 handle_multiput
0215   (int source, int tag, 
0216    const std::vector<unsafe_pair<local_key_type, value_type> >& values,
0217    trigger_receive_context)
0218 {
0219   using boost::get;
0220 
0221   shared_ptr<data_t> data(data_ptr);
0222   BOOST_ASSERT(data);
0223 
0224   std::size_t n = values.size();
0225   for (std::size_t i = 0; i < n; ++i) {
0226     local_key_type local_key = values[i].first;
0227     value_type local_value = get(data->storage, local_key);
0228     detail::maybe_put(data->storage, values[i].first,
0229                       reduce(values[i].first,
0230                              local_value,
0231                              values[i].second));
0232   }
0233 }
0234 
0235 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0236 template<typename Reduce>
0237 void
0238 PBGL_DISTRIB_PMAP::handle_message<Reduce>::
0239 setup_triggers(process_group_type& pg)
0240 {
0241   using boost::parallel::simple_trigger;
0242 
0243   simple_trigger(pg, property_map_put, this, &handle_message::handle_put);
0244   simple_trigger(pg, property_map_get, this, &handle_message::handle_get);
0245   simple_trigger(pg, property_map_multiget, this, 
0246                  &handle_message::handle_multiget);
0247   simple_trigger(pg, property_map_multiget_reply, this, 
0248                  &handle_message::handle_multiget_reply);
0249   simple_trigger(pg, property_map_multiput, this, 
0250                  &handle_message::handle_multiput);
0251 }
0252 
0253 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0254 void
0255 PBGL_DISTRIB_PMAP
0256 ::on_synchronize::operator()()
0257 {
0258   int stage=0; // we only get called at the start now
0259   shared_ptr<data_t> data(data_ptr);
0260   BOOST_ASSERT(data);
0261 
0262   // Determine in which stage backward consistency messages should be sent.
0263   int backward_stage = -1;
0264   if (data->model & cm_backward) {
0265     if (data->model & cm_flush) backward_stage = 1;
0266     else backward_stage = 0;
0267   }
0268 
0269   // Flush results in first stage
0270   if (stage == 0 && data->model & cm_flush)
0271     data->flush();
0272 
0273   // Backward consistency
0274   if (stage == backward_stage && !(data->model & (cm_clear | cm_reset)))
0275     data->refresh_ghost_cells();
0276 
0277   // Optionally clear results
0278   if (data->model & cm_clear)
0279     data->clear();
0280 
0281   // Optionally reset results
0282   if (data->model & cm_reset) {
0283     if (data->reset) ((*data).*data->reset)();
0284   }
0285 }
0286 
0287 
0288 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0289 void
0290 PBGL_DISTRIB_PMAP::set_consistency_model(int model)
0291 {
0292   data->model = model;
0293 
0294   bool need_on_synchronize = (model != cm_forward);
0295 
0296   // Backward consistency is a two-stage process.
0297   if (model & cm_backward) {
0298     // For backward consistency to work, we absolutely cannot throw
0299     // away any ghost cells.
0300     data->max_ghost_cells = 0;
0301   }
0302 
0303   // attach the on_synchronize handler.
0304   if (need_on_synchronize)
0305     data->process_group.replace_on_synchronize_handler(on_synchronize(data));
0306 }
0307 
0308 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0309 void
0310 PBGL_DISTRIB_PMAP::set_max_ghost_cells(std::size_t max_ghost_cells)
0311 {
0312   if ((data->model & cm_backward) && max_ghost_cells > 0)
0313       boost::throw_exception(std::runtime_error("distributed_property_map::set_max_ghost_cells: "
0314                                                 "cannot limit ghost-cell usage with a backward "
0315                                                 "consistency model"));
0316 
0317   if (max_ghost_cells == 1)
0318     // It is not safe to have only 1 ghost cell; the cell() method
0319     // will fail.
0320     max_ghost_cells = 2;
0321 
0322   data->max_ghost_cells = max_ghost_cells;
0323   prune_ghost_cells();
0324 }
0325 
0326 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0327 void PBGL_DISTRIB_PMAP::clear()
0328 {
0329   data->clear();
0330 }
0331 
0332 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0333 void PBGL_DISTRIB_PMAP::data_t::clear()
0334 {
0335   ghost_cells->clear();
0336 }
0337 
0338 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0339 void PBGL_DISTRIB_PMAP::reset()
0340 {
0341   if (data->reset) ((*data).*data->reset)();
0342 }
0343 
0344 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0345 void PBGL_DISTRIB_PMAP::flush()
0346 {
0347   data->flush();
0348 }
0349 
0350 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0351 void PBGL_DISTRIB_PMAP::data_t::refresh_ghost_cells()
0352 {
0353   using boost::get;
0354 
0355   std::vector<std::vector<key_type> > keys;
0356   keys.resize(num_processes(process_group));
0357 
0358   // Collect the set of keys for which we will request values
0359   for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i)
0360     keys[get(global, i->first).first].push_back(i->first);
0361 
0362   // Send multiget requests to each of the other processors
0363   typedef typename ProcessGroup::process_size_type process_size_type;
0364   process_size_type n = num_processes(process_group);
0365   process_id_type id = process_id(process_group);
0366   for (process_size_type p = (id + 1) % n ; p != id ; p = (p + 1) % n) {
0367     if (!keys[p].empty())
0368       send(process_group, p, property_map_multiget, keys[p]);
0369   }  
0370 }
0371 
0372 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0373 void PBGL_DISTRIB_PMAP::data_t::flush()
0374 {
0375   using boost::get;
0376 
0377   int n = num_processes(process_group);
0378   std::vector<std::vector<unsafe_pair<local_key_type, value_type> > > values;
0379   values.resize(n);
0380 
0381   // Collect all of the flushed values
0382   for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i) {
0383     std::pair<int, local_key_type> g = get(global, i->first);
0384     values[g.first].push_back(std::make_pair(g.second, i->second));
0385   }
0386 
0387   // Transmit flushed values
0388   for (int p = 0; p < n; ++p) {
0389     if (!values[p].empty())
0390       send(process_group, p, property_map_multiput, values[p]);
0391   }
0392 }
0393 
0394 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0395 void PBGL_DISTRIB_PMAP::do_synchronize()
0396 {
0397   if (data->model & cm_backward) {
0398     synchronize(data->process_group);
0399     return;
0400   }
0401 
0402   // Request refreshes of the values of our ghost cells
0403   data->refresh_ghost_cells();
0404 
0405   // Allows all of the multigets to get to their destinations
0406   synchronize(data->process_group);
0407 
0408   // Allows all of the multiget responses to get to their destinations
0409   synchronize(data->process_group);
0410 }
0411 
0412 template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
0413 template<typename Resolver>
0414 void PBGL_DISTRIB_PMAP::data_t::do_reset()
0415 {
0416   Resolver* resolver = get_default_value.template target<Resolver>();
0417   BOOST_ASSERT(resolver);
0418 
0419   for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i)
0420     const_cast<value_type&>(i->second) = (*resolver)(i->first);
0421 }
0422 
0423 } } // end namespace boost::parallel