File indexing completed on 2025-01-18 09:37:13
0001
0002
0003
0004
0005
0006
0007
0008
0009 #ifndef BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP
0010 #define BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP
0011
0012 #ifndef BOOST_GRAPH_USE_MPI
0013 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
0014 #endif
0015
0016 #include <boost/graph/parallel/process_group.hpp>
0017 #include <boost/type_traits/is_convertible.hpp>
0018 #include <vector>
0019 #include <boost/assert.hpp>
0020 #include <boost/optional.hpp>
0021 #include <queue>
0022
0023 namespace boost { namespace graph { namespace detail {
0024
0025 template<typename ProcessGroup>
0026 void do_synchronize(ProcessGroup& pg)
0027 {
0028 using boost::parallel::synchronize;
0029 synchronize(pg);
0030 }
0031
0032 struct remote_set_queued {};
0033 struct remote_set_immediate {};
0034
0035 template<typename ProcessGroup>
0036 class remote_set_semantics
0037 {
0038 BOOST_STATIC_CONSTANT
0039 (bool,
0040 queued = (is_convertible<
0041 typename ProcessGroup::communication_category,
0042 boost::parallel::bsp_process_group_tag>::value));
0043
0044 public:
0045 typedef typename mpl::if_c<queued,
0046 remote_set_queued,
0047 remote_set_immediate>::type type;
0048 };
0049
0050
0051 template<typename Derived, typename ProcessGroup, typename Value,
0052 typename OwnerMap,
0053 typename Semantics = typename remote_set_semantics<ProcessGroup>::type>
0054 class remote_update_set;
0055
0056
0057
0058
0059 template<typename Derived, typename ProcessGroup, typename Value,
0060 typename OwnerMap>
0061 class remote_update_set<Derived, ProcessGroup, Value, OwnerMap,
0062 remote_set_queued>
0063 {
0064 typedef typename property_traits<OwnerMap>::key_type Key;
0065 typedef std::vector<std::pair<Key, Value> > Updates;
0066 typedef typename Updates::size_type updates_size_type;
0067 typedef typename Updates::value_type updates_pair_type;
0068
0069 public:
0070
0071 private:
0072 typedef typename ProcessGroup::process_id_type process_id_type;
0073
0074 enum message_kind {
0075
0076
0077
0078
0079
0080 msg_num_updates,
0081
0082
0083
0084
0085
0086
0087
0088 msg_updates
0089 };
0090
0091 struct handle_messages
0092 {
0093 explicit
0094 handle_messages(remote_update_set* self, const ProcessGroup& pg)
0095 : self(self), update_sizes(num_processes(pg), 0) { }
0096
0097 void operator()(process_id_type source, int tag)
0098 {
0099 switch(tag) {
0100 case msg_num_updates:
0101 {
0102
0103 updates_size_type num_updates;
0104 receive(self->process_group, source, tag, num_updates);
0105
0106 update_sizes[source] = num_updates;
0107 }
0108 break;
0109
0110 case msg_updates:
0111 {
0112 updates_size_type num_updates = update_sizes[source];
0113 BOOST_ASSERT(num_updates);
0114
0115
0116 std::vector<updates_pair_type> updates(num_updates);
0117 receive(self->process_group, source, msg_updates, &updates[0],
0118 num_updates);
0119
0120
0121 Derived* derived = static_cast<Derived*>(self);
0122 for (updates_size_type u = 0; u < num_updates; ++u)
0123 derived->receive_update(source, updates[u].first, updates[u].second);
0124
0125 update_sizes[source] = 0;
0126 }
0127 break;
0128 };
0129 }
0130
0131 private:
0132 remote_update_set* self;
0133 std::vector<updates_size_type> update_sizes;
0134 };
0135 friend struct handle_messages;
0136
0137 protected:
0138 remote_update_set(const ProcessGroup& pg, const OwnerMap& owner)
0139 : process_group(pg, handle_messages(this, pg)),
0140 updates(num_processes(pg)), owner(owner) {
0141 }
0142
0143
0144 void update(const Key& key, const Value& value)
0145 {
0146 if (get(owner, key) == process_id(process_group)) {
0147 Derived* derived = static_cast<Derived*>(this);
0148 derived->receive_update(get(owner, key), key, value);
0149 }
0150 else {
0151 updates[get(owner, key)].push_back(std::make_pair(key, value));
0152 }
0153 }
0154
0155 void collect() { }
0156
0157 void synchronize()
0158 {
0159
0160 process_id_type num_processes = updates.size();
0161 for (process_id_type p = 0; p < num_processes; ++p) {
0162 if (!updates[p].empty()) {
0163 send(process_group, p, msg_num_updates, updates[p].size());
0164 send(process_group, p, msg_updates,
0165 &updates[p].front(), updates[p].size());
0166 updates[p].clear();
0167 }
0168 }
0169
0170 do_synchronize(process_group);
0171 }
0172
0173 ProcessGroup process_group;
0174
0175 private:
0176 std::vector<Updates> updates;
0177 OwnerMap owner;
0178 };
0179
0180
0181
0182
0183 template<typename Derived, typename ProcessGroup, typename Value,
0184 typename OwnerMap>
0185 class remote_update_set<Derived, ProcessGroup, Value, OwnerMap,
0186 remote_set_immediate>
0187 {
0188 typedef typename property_traits<OwnerMap>::key_type Key;
0189 typedef std::pair<Key, Value> update_pair_type;
0190 typedef typename std::vector<update_pair_type>::size_type updates_size_type;
0191
0192 public:
0193 typedef typename ProcessGroup::process_id_type process_id_type;
0194
0195 private:
0196 enum message_kind {
0197
0198 msg_update
0199 };
0200
0201 struct handle_messages
0202 {
0203 explicit handle_messages(remote_update_set* self, const ProcessGroup& pg)
0204 : self(self)
0205 { update_sizes.resize(num_processes(pg), 0); }
0206
0207 void operator()(process_id_type source, int tag)
0208 {
0209
0210 BOOST_ASSERT(tag == msg_update);
0211 update_pair_type update;
0212 receive(self->process_group, source, tag, update);
0213
0214
0215 Derived* derived = static_cast<Derived*>(self);
0216 derived->receive_update(source, update.first, update.second);
0217 }
0218
0219 private:
0220 std::vector<updates_size_type> update_sizes;
0221 remote_update_set* self;
0222 };
0223 friend struct handle_messages;
0224
0225 protected:
0226 remote_update_set(const ProcessGroup& pg, const OwnerMap& owner)
0227 : process_group(pg, handle_messages(this, pg)), owner(owner) { }
0228
0229 void update(const Key& key, const Value& value)
0230 {
0231 if (get(owner, key) == process_id(process_group)) {
0232 Derived* derived = static_cast<Derived*>(this);
0233 derived->receive_update(get(owner, key), key, value);
0234 }
0235 else
0236 send(process_group, get(owner, key), msg_update,
0237 update_pair_type(key, value));
0238 }
0239
0240 void collect()
0241 {
0242 typedef std::pair<process_id_type, int> probe_type;
0243 handle_messages handler(this, process_group);
0244 while (optional<probe_type> stp = probe(process_group))
0245 if (stp->second == msg_update) handler(stp->first, stp->second);
0246 }
0247
0248 void synchronize()
0249 {
0250 do_synchronize(process_group);
0251 }
0252
0253 ProcessGroup process_group;
0254 OwnerMap owner;
0255 };
0256
0257 } } }
0258
0259 #endif