Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 09:37:13

0001 // Copyright (C) 2005-2006 The Trustees of Indiana University.
0002 
0003 // Use, modification and distribution is subject to the Boost Software
0004 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
0005 // http://www.boost.org/LICENSE_1_0.txt)
0006 
0007 //  Authors: Douglas Gregor
0008 //           Andrew Lumsdaine
0009 #ifndef BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP
0010 #define BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP
0011 
0012 #ifndef BOOST_GRAPH_USE_MPI
0013 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
0014 #endif
0015 
0016 #include <boost/graph/parallel/process_group.hpp>
0017 #include <boost/type_traits/is_convertible.hpp>
0018 #include <vector>
0019 #include <boost/assert.hpp>
0020 #include <boost/optional.hpp>
0021 #include <queue>
0022 
0023 namespace boost { namespace graph { namespace detail {
0024 
0025 template<typename ProcessGroup>
0026 void do_synchronize(ProcessGroup& pg)
0027 { 
0028   using boost::parallel::synchronize;
0029   synchronize(pg);
0030 }
0031 
0032 struct remote_set_queued {};
0033 struct remote_set_immediate {};
0034 
0035 template<typename ProcessGroup>
0036 class remote_set_semantics
0037 {
0038   BOOST_STATIC_CONSTANT
0039     (bool, 
0040      queued = (is_convertible<
0041                  typename ProcessGroup::communication_category,
0042                  boost::parallel::bsp_process_group_tag>::value));
0043 
0044  public:
0045   typedef typename mpl::if_c<queued, 
0046                              remote_set_queued, 
0047                              remote_set_immediate>::type type;
0048 };
0049 
0050 
0051 template<typename Derived, typename ProcessGroup, typename Value,
0052          typename OwnerMap,
0053          typename Semantics = typename remote_set_semantics<ProcessGroup>::type>
0054 class remote_update_set;
0055 
0056 /**********************************************************************
0057  * Remote updating set that queues messages until synchronization     *
0058  **********************************************************************/
0059 template<typename Derived, typename ProcessGroup, typename Value,
0060          typename OwnerMap>
0061 class remote_update_set<Derived, ProcessGroup, Value, OwnerMap,
0062                         remote_set_queued>
0063 {
0064   typedef typename property_traits<OwnerMap>::key_type Key;
0065   typedef std::vector<std::pair<Key, Value> > Updates;
0066   typedef typename Updates::size_type   updates_size_type;
0067   typedef typename Updates::value_type  updates_pair_type;
0068 
0069 public:
0070 
0071 private:
0072   typedef typename ProcessGroup::process_id_type process_id_type;
0073 
0074   enum message_kind {
0075     /** Message containing the number of updates that will be sent in
0076      *  a msg_updates message that will immediately follow. This
0077      *  message will contain a single value of type
0078      *  updates_size_type. 
0079      */
0080     msg_num_updates,
0081 
0082     /** Contains (key, value) pairs with all of the updates from a
0083      *  particular source. The number of updates is variable, but will
0084      *  be provided in a msg_num_updates message that immediately
0085      *  preceeds this message.
0086      *
0087      */
0088     msg_updates
0089   };
0090 
0091   struct handle_messages
0092   {
0093     explicit 
0094     handle_messages(remote_update_set* self, const ProcessGroup& pg)
0095       : self(self), update_sizes(num_processes(pg), 0) { }
0096 
0097     void operator()(process_id_type source, int tag) 
0098     { 
0099       switch(tag) {
0100       case msg_num_updates:
0101         {
0102           // Receive the # of updates
0103           updates_size_type num_updates;
0104           receive(self->process_group, source, tag, num_updates);
0105 
0106           update_sizes[source] = num_updates;
0107         }
0108         break;
0109 
0110       case msg_updates:
0111         {
0112           updates_size_type num_updates = update_sizes[source];
0113           BOOST_ASSERT(num_updates);
0114 
0115           // Receive the actual updates
0116           std::vector<updates_pair_type> updates(num_updates);
0117           receive(self->process_group, source, msg_updates, &updates[0],
0118                   num_updates);
0119           
0120           // Send updates to derived "receive_update" member
0121           Derived* derived = static_cast<Derived*>(self);
0122           for (updates_size_type u = 0; u < num_updates; ++u)
0123             derived->receive_update(source, updates[u].first, updates[u].second);
0124 
0125           update_sizes[source] = 0;
0126         }
0127         break;
0128       };
0129     }
0130 
0131   private:
0132     remote_update_set* self;
0133     std::vector<updates_size_type> update_sizes;
0134   };
0135   friend struct handle_messages;
0136 
0137  protected:
0138   remote_update_set(const ProcessGroup& pg, const OwnerMap& owner)
0139     : process_group(pg, handle_messages(this, pg)),
0140       updates(num_processes(pg)), owner(owner) { 
0141     }
0142 
0143 
0144   void update(const Key& key, const Value& value)
0145   { 
0146     if (get(owner, key) == process_id(process_group)) {
0147       Derived* derived = static_cast<Derived*>(this);
0148       derived->receive_update(get(owner, key), key, value);
0149     }
0150     else {
0151       updates[get(owner, key)].push_back(std::make_pair(key, value));
0152     }
0153   }
0154 
0155   void collect() { }
0156 
0157   void synchronize()
0158   {
0159     // Emit all updates and then remove them
0160     process_id_type num_processes = updates.size();
0161     for (process_id_type p = 0; p < num_processes; ++p) {
0162       if (!updates[p].empty()) {
0163         send(process_group, p, msg_num_updates, updates[p].size());
0164         send(process_group, p, msg_updates, 
0165              &updates[p].front(), updates[p].size());
0166         updates[p].clear();
0167       }
0168     }
0169     
0170     do_synchronize(process_group);
0171   }
0172 
0173   ProcessGroup process_group;
0174 
0175  private:
0176   std::vector<Updates> updates;
0177   OwnerMap owner;
0178 };
0179 
0180 /**********************************************************************
0181  * Remote updating set that sends messages immediately                *
0182  **********************************************************************/
0183 template<typename Derived, typename ProcessGroup, typename Value,
0184          typename OwnerMap>
0185 class remote_update_set<Derived, ProcessGroup, Value, OwnerMap,
0186                         remote_set_immediate>
0187 {
0188   typedef typename property_traits<OwnerMap>::key_type Key;
0189   typedef std::pair<Key, Value> update_pair_type;
0190   typedef typename std::vector<update_pair_type>::size_type updates_size_type;
0191 
0192 public:
0193   typedef typename ProcessGroup::process_id_type process_id_type;
0194 
0195 private:
0196   enum message_kind {
0197     /** Contains a (key, value) pair that will be updated. */
0198     msg_update
0199   };
0200 
0201   struct handle_messages
0202   {
0203     explicit handle_messages(remote_update_set* self, const ProcessGroup& pg) 
0204       : self(self)
0205     { update_sizes.resize(num_processes(pg), 0); }
0206 
0207     void operator()(process_id_type source, int tag) 
0208     { 
0209       // Receive the # of updates
0210       BOOST_ASSERT(tag == msg_update);
0211       update_pair_type update;
0212       receive(self->process_group, source, tag, update);
0213       
0214       // Send update to derived "receive_update" member
0215       Derived* derived = static_cast<Derived*>(self);
0216       derived->receive_update(source, update.first, update.second);
0217     }
0218 
0219   private:
0220     std::vector<updates_size_type> update_sizes;
0221     remote_update_set* self;
0222   };
0223   friend struct handle_messages;
0224 
0225  protected:
0226   remote_update_set(const ProcessGroup& pg, const OwnerMap& owner)
0227     : process_group(pg, handle_messages(this, pg)), owner(owner) { }
0228 
0229   void update(const Key& key, const Value& value)
0230   { 
0231     if (get(owner, key) == process_id(process_group)) {
0232       Derived* derived = static_cast<Derived*>(this);
0233       derived->receive_update(get(owner, key), key, value);
0234     }
0235     else
0236       send(process_group, get(owner, key), msg_update, 
0237            update_pair_type(key, value));
0238   }
0239 
0240   void collect() 
0241   { 
0242     typedef std::pair<process_id_type, int> probe_type;
0243     handle_messages handler(this, process_group);
0244     while (optional<probe_type> stp = probe(process_group))
0245       if (stp->second == msg_update) handler(stp->first, stp->second);
0246   }
0247 
0248   void synchronize()
0249   {
0250     do_synchronize(process_group);
0251   }
0252 
0253   ProcessGroup process_group;
0254   OwnerMap owner;
0255 };
0256 
0257 } } } // end namespace boost::graph::detail
0258 
0259 #endif // BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP