File indexing completed on 2025-01-18 09:37:13
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 #ifndef BOOST_GRAPH_USE_MPI
0018 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
0019 #endif
0020
0021 #include <boost/assert.hpp>
0022 #include <algorithm>
0023 #include <boost/graph/parallel/detail/untracked_pair.hpp>
0024 #include <numeric>
0025 #include <iterator>
0026 #include <functional>
0027 #include <vector>
0028 #include <queue>
0029 #include <stack>
0030 #include <list>
0031 #include <map>
0032 #include <boost/graph/distributed/detail/tag_allocator.hpp>
0033 #include <stdio.h>
0034
0035
0036
0037 #ifdef PBGL_PROCESS_GROUP_DEBUG
0038 # include <iostream>
0039 #endif
0040
0041 namespace boost { namespace graph { namespace distributed {
0042
0043 struct mpi_process_group::impl
0044 {
0045
0046 typedef mpi_process_group::message_header message_header;
0047 typedef mpi_process_group::outgoing_messages outgoing_messages;
0048
0049
0050
0051
0052
0053
0054
0055
0056
0057 struct incoming_messages {
0058 incoming_messages();
0059 ~incoming_messages() {}
0060
0061 std::vector<message_header> headers;
0062 buffer_type buffer;
0063 std::vector<std::vector<message_header>::iterator> next_header;
0064 };
0065
0066 struct batch_request {
0067 MPI_Request request;
0068 buffer_type buffer;
0069 };
0070
0071
0072
0073 std::size_t batch_header_number;
0074 std::size_t batch_buffer_size;
0075 std::size_t batch_message_size;
0076
0077
0078
0079
0080 boost::mpi::communicator comm;
0081
0082
0083
0084
0085 boost::mpi::communicator oob_reply_comm;
0086
0087
0088 std::vector<outgoing_messages> outgoing;
0089
0090
0091 std::vector<incoming_messages> incoming;
0092
0093
0094 std::vector<int> processors_synchronizing_stage;
0095
0096
0097 std::vector<int> synchronizing_stage;
0098
0099
0100 std::vector<int> synchronizing_unfinished;
0101
0102
0103 std::vector<int> number_sent_batches;
0104
0105
0106 std::vector<int> number_received_batches;
0107
0108
0109
0110
0111 trigger_receive_context trigger_context;
0112
0113
0114
0115
0116 int processing_batches;
0117
0118
0119
0120
0121
0122 blocks_type blocks;
0123
0124
0125 bool synchronizing;
0126
0127
0128 std::vector<MPI_Request> requests;
0129
0130
0131 std::map<int,buffer_type> buffers;
0132
0133
0134 std::queue<std::pair<int,outgoing_messages> > new_batches;
0135
0136 std::size_t max_received;
0137
0138
0139 std::list<batch_request> sent_batches;
0140
0141 std::size_t max_sent;
0142
0143
0144 std::vector<batch_request> batch_pool;
0145
0146 std::stack<std::size_t> free_batches;
0147
0148 void free_sent_batches();
0149
0150
0151 detail::tag_allocator allocated_tags;
0152
0153 impl(std::size_t num_headers, std::size_t buffers_size,
0154 communicator_type parent_comm);
0155 ~impl();
0156
0157 private:
0158 void set_batch_size(std::size_t header_num, std::size_t buffer_sz);
0159 };
0160
0161 inline trigger_receive_context mpi_process_group::trigger_context() const
0162 {
0163 return impl_->trigger_context;
0164 }
0165
0166 template<typename T>
0167 void
0168 mpi_process_group::send_impl(int dest, int tag, const T& value,
0169 mpl::true_ ) const
0170 {
0171 BOOST_ASSERT(tag < msg_reserved_first || tag > msg_reserved_last);
0172
0173 impl::outgoing_messages& outgoing = impl_->outgoing[dest];
0174
0175
0176 impl::message_header header;
0177 header.source = process_id(*this);
0178 header.tag = tag;
0179 header.offset = outgoing.buffer.size();
0180
0181 boost::mpi::packed_oarchive oa(impl_->comm, outgoing.buffer);
0182 oa << value;
0183
0184 #ifdef PBGL_PROCESS_GROUP_DEBUG
0185 std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = "
0186 << tag << ", bytes = " << packed_size << std::endl;
0187 #endif
0188
0189
0190 header.bytes = outgoing.buffer.size() - header.offset;
0191 outgoing.headers.push_back(header);
0192
0193 maybe_send_batch(dest);
0194 }
0195
0196
0197 template<typename T>
0198 void
0199 mpi_process_group::send_impl(int dest, int tag, const T& value,
0200 mpl::false_ ) const
0201 {
0202 BOOST_ASSERT(tag < msg_reserved_first || tag > msg_reserved_last);
0203
0204 impl::outgoing_messages& outgoing = impl_->outgoing[dest];
0205
0206
0207 impl::message_header header;
0208 header.source = process_id(*this);
0209 header.tag = tag;
0210 header.offset = outgoing.buffer.size();
0211
0212
0213 boost::mpi::packed_oarchive out(impl_->comm, outgoing.buffer);
0214 out << value;
0215
0216
0217 header.bytes = outgoing.buffer.size() - header.offset;
0218 outgoing.headers.push_back(header);
0219 maybe_send_batch(dest);
0220
0221 #ifdef PBGL_PROCESS_GROUP_DEBUG
0222 std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = "
0223 << tag << ", bytes = " << header.bytes << std::endl;
0224 #endif
0225 }
0226
0227 template<typename T>
0228 inline void
0229 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
0230 int tag, const T& value)
0231 {
0232 pg.send_impl(dest, pg.encode_tag(pg.my_block_number(), tag), value,
0233 boost::mpi::is_mpi_datatype<T>());
0234 }
0235
0236 template<typename T>
0237 typename enable_if<boost::mpi::is_mpi_datatype<T>, void>::type
0238 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
0239 int tag, const T values[], std::size_t n)
0240 {
0241 pg.send_impl(dest, pg.encode_tag(pg.my_block_number(), tag),
0242 boost::serialization::make_array(values,n),
0243 boost::mpl::true_());
0244 }
0245
0246 template<typename T>
0247 typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type
0248 mpi_process_group::
0249 array_send_impl(int dest, int tag, const T values[], std::size_t n) const
0250 {
0251 BOOST_ASSERT(tag < msg_reserved_first || tag > msg_reserved_last);
0252
0253 impl::outgoing_messages& outgoing = impl_->outgoing[dest];
0254
0255
0256 impl::message_header header;
0257 header.source = process_id(*this);
0258 header.tag = tag;
0259 header.offset = outgoing.buffer.size();
0260
0261
0262 boost::mpi::packed_oarchive out(impl_->comm, outgoing.buffer);
0263 out << n;
0264
0265 for (std::size_t i = 0; i < n; ++i)
0266 out << values[i];
0267
0268
0269 header.bytes = outgoing.buffer.size() - header.offset;
0270 outgoing.headers.push_back(header);
0271 maybe_send_batch(dest);
0272
0273 #ifdef PBGL_PROCESS_GROUP_DEBUG
0274 std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = "
0275 << tag << ", bytes = " << header.bytes << std::endl;
0276 #endif
0277 }
0278
0279 template<typename T>
0280 typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type
0281 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
0282 int tag, const T values[], std::size_t n)
0283 {
0284 pg.array_send_impl(dest, pg.encode_tag(pg.my_block_number(), tag),
0285 values, n);
0286 }
0287
0288 template<typename InputIterator>
0289 void
0290 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
0291 int tag, InputIterator first, InputIterator last)
0292 {
0293 typedef typename std::iterator_traits<InputIterator>::value_type value_type;
0294 std::vector<value_type> values(first, last);
0295 if (values.empty()) send(pg, dest, tag, static_cast<value_type*>(0), 0);
0296 else send(pg, dest, tag, &values[0], values.size());
0297 }
0298
0299 template<typename T>
0300 bool
0301 mpi_process_group::receive_impl(int source, int tag, T& value,
0302 mpl::true_ ) const
0303 {
0304 #ifdef PBGL_PROCESS_GROUP_DEBUG
0305 std::cerr << "RECV: " << process_id(*this) << " <- " << source << ", tag = "
0306 << tag << std::endl;
0307 #endif
0308
0309 impl::incoming_messages& incoming = impl_->incoming[source];
0310
0311
0312 std::vector<impl::message_header>::iterator header =
0313 incoming.next_header[my_block_number()];
0314 while (header != incoming.headers.end() && header->tag != tag) ++header;
0315
0316
0317 if (header == incoming.headers.end()) return false;
0318
0319
0320 if (header->bytes > 0) {
0321 boost::mpi::packed_iarchive ia(impl_->comm, incoming.buffer,
0322 archive::no_header, header->offset);
0323 ia >> value;
0324 }
0325
0326
0327 header->tag = -1;
0328
0329
0330 while (incoming.next_header[my_block_number()] != incoming.headers.end()
0331 && incoming.next_header[my_block_number()]->tag == -1)
0332 ++incoming.next_header[my_block_number()];
0333
0334 if (incoming.next_header[my_block_number()] == incoming.headers.end()) {
0335 bool finished = true;
0336 for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) {
0337 if (incoming.next_header[i] != incoming.headers.end()) finished = false;
0338 }
0339
0340 if (finished) {
0341 std::vector<impl::message_header> no_headers;
0342 incoming.headers.swap(no_headers);
0343 buffer_type empty_buffer;
0344 incoming.buffer.swap(empty_buffer);
0345 for (std::size_t i = 0; i < incoming.next_header.size(); ++i)
0346 incoming.next_header[i] = incoming.headers.end();
0347 }
0348 }
0349
0350 return true;
0351 }
0352
0353 template<typename T>
0354 bool
0355 mpi_process_group::receive_impl(int source, int tag, T& value,
0356 mpl::false_ ) const
0357 {
0358 impl::incoming_messages& incoming = impl_->incoming[source];
0359
0360
0361 std::vector<impl::message_header>::iterator header =
0362 incoming.next_header[my_block_number()];
0363 while (header != incoming.headers.end() && header->tag != tag) ++header;
0364
0365
0366 if (header == incoming.headers.end()) return false;
0367
0368
0369 boost::mpi::packed_iarchive in(impl_->comm, incoming.buffer,
0370 archive::no_header, header->offset);
0371 in >> value;
0372
0373
0374 header->tag = -1;
0375
0376
0377 while (incoming.next_header[my_block_number()] != incoming.headers.end()
0378 && incoming.next_header[my_block_number()]->tag == -1)
0379 ++incoming.next_header[my_block_number()];
0380
0381 if (incoming.next_header[my_block_number()] == incoming.headers.end()) {
0382 bool finished = true;
0383 for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) {
0384 if (incoming.next_header[i] != incoming.headers.end()) finished = false;
0385 }
0386
0387 if (finished) {
0388 std::vector<impl::message_header> no_headers;
0389 incoming.headers.swap(no_headers);
0390 buffer_type empty_buffer;
0391 incoming.buffer.swap(empty_buffer);
0392 for (std::size_t i = 0; i < incoming.next_header.size(); ++i)
0393 incoming.next_header[i] = incoming.headers.end();
0394 }
0395 }
0396
0397 return true;
0398 }
0399
0400 template<typename T>
0401 typename disable_if<boost::mpi::is_mpi_datatype<T>, bool>::type
0402 mpi_process_group::
0403 array_receive_impl(int source, int tag, T* values, std::size_t& n) const
0404 {
0405 impl::incoming_messages& incoming = impl_->incoming[source];
0406
0407
0408 std::vector<impl::message_header>::iterator header =
0409 incoming.next_header[my_block_number()];
0410 while (header != incoming.headers.end() && header->tag != tag) ++header;
0411
0412
0413 if (header == incoming.headers.end()) return false;
0414
0415
0416 boost::mpi::packed_iarchive in(impl_->comm, incoming.buffer,
0417 archive::no_header, header->offset);
0418 std::size_t num_sent;
0419 in >> num_sent;
0420 if (num_sent > n)
0421 std::cerr << "ERROR: Have " << num_sent << " items but only space for "
0422 << n << " items\n";
0423
0424 for (std::size_t i = 0; i < num_sent; ++i)
0425 in >> values[i];
0426 n = num_sent;
0427
0428
0429 header->tag = -1;
0430
0431
0432 while (incoming.next_header[my_block_number()] != incoming.headers.end()
0433 && incoming.next_header[my_block_number()]->tag == -1)
0434 ++incoming.next_header[my_block_number()];
0435
0436 if (incoming.next_header[my_block_number()] == incoming.headers.end()) {
0437 bool finished = true;
0438 for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) {
0439 if (incoming.next_header[i] != incoming.headers.end()) finished = false;
0440 }
0441
0442 if (finished) {
0443 std::vector<impl::message_header> no_headers;
0444 incoming.headers.swap(no_headers);
0445 buffer_type empty_buffer;
0446 incoming.buffer.swap(empty_buffer);
0447 for (std::size_t i = 0; i < incoming.next_header.size(); ++i)
0448 incoming.next_header[i] = incoming.headers.end();
0449 }
0450 }
0451
0452 return true;
0453 }
0454
0455
0456 template<typename Type, typename Handler>
0457 void mpi_process_group::trigger(int tag, const Handler& handler)
0458 {
0459 BOOST_ASSERT(block_num);
0460 install_trigger(tag,my_block_number(),shared_ptr<trigger_base>(
0461 new trigger_launcher<Type, Handler>(*this, tag, handler)));
0462 }
0463
0464 template<typename Type, typename Handler>
0465 void mpi_process_group::trigger_with_reply(int tag, const Handler& handler)
0466 {
0467 BOOST_ASSERT(block_num);
0468 install_trigger(tag,my_block_number(),shared_ptr<trigger_base>(
0469 new reply_trigger_launcher<Type, Handler>(*this, tag, handler)));
0470 }
0471
0472 template<typename Type, typename Handler>
0473 void mpi_process_group::global_trigger(int tag, const Handler& handler,
0474 std::size_t sz)
0475 {
0476 if (sz==0)
0477 install_trigger(tag,0,shared_ptr<trigger_base>(
0478 new global_trigger_launcher<Type, Handler>(*this, tag, handler)));
0479 else
0480 install_trigger(tag,0,shared_ptr<trigger_base>(
0481 new global_irecv_trigger_launcher<Type, Handler>(*this, tag, handler,sz)));
0482
0483 }
0484
0485 namespace detail {
0486
0487 template<typename Type>
0488 void do_oob_receive(mpi_process_group const& self,
0489 int source, int tag, Type& data, mpl::true_ )
0490 {
0491 using boost::mpi::get_mpi_datatype;
0492
0493
0494 MPI_Recv(&data, 1, get_mpi_datatype<Type>(data), source, tag, self.impl_->comm,
0495 MPI_STATUS_IGNORE);
0496 }
0497
0498 template<typename Type>
0499 void do_oob_receive(mpi_process_group const& self,
0500 int source, int tag, Type& data, mpl::false_ )
0501 {
0502
0503
0504 boost::mpi::status status;
0505 status = self.impl_->comm.probe(source, tag);
0506
0507 #if BOOST_VERSION >= 103600
0508 int size = status.count<boost::mpi::packed>().get();
0509 #else
0510 int size;
0511 MPI_Status& mpi_status = status;
0512 MPI_Get_count(&mpi_status, MPI_PACKED, &size);
0513 #endif
0514
0515
0516 boost::mpi::packed_iarchive in(self.impl_->comm);
0517 in.resize(size);
0518 MPI_Recv(in.address(), size, MPI_PACKED, source, tag, self.impl_->comm,
0519 MPI_STATUS_IGNORE);
0520
0521
0522 in >> data;
0523 }
0524
0525 template<typename Type>
0526 void do_oob_receive(mpi_process_group const& self, int source, int tag, Type& data)
0527 {
0528 do_oob_receive(self, source, tag, data,
0529 boost::mpi::is_mpi_datatype<Type>());
0530 }
0531
0532
0533 }
0534
0535
0536 template<typename Type, typename Handler>
0537 void
0538 mpi_process_group::trigger_launcher<Type, Handler>::
0539 receive(mpi_process_group const&, int source, int tag,
0540 trigger_receive_context context, int block) const
0541 {
0542 #ifdef PBGL_PROCESS_GROUP_DEBUG
0543 std::cerr << (out_of_band? "OOB trigger" : "Trigger")
0544 << " receive from source " << source << " and tag " << tag
0545 << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
0546 #endif
0547
0548 Type data;
0549
0550 if (context == trc_out_of_band) {
0551
0552 int realtag = self.encode_tag(
0553 block == -1 ? self.my_block_number() : block, tag);
0554 detail::do_oob_receive(self,source,realtag,data);
0555 }
0556 else
0557
0558 boost::graph::distributed::receive(self, source, tag, data);
0559
0560
0561 handler(source, tag, data, context);
0562 }
0563
0564 template<typename Type, typename Handler>
0565 void
0566 mpi_process_group::reply_trigger_launcher<Type, Handler>::
0567 receive(mpi_process_group const&, int source, int tag,
0568 trigger_receive_context context, int block) const
0569 {
0570 #ifdef PBGL_PROCESS_GROUP_DEBUG
0571 std::cerr << (out_of_band? "OOB reply trigger" : "Reply trigger")
0572 << " receive from source " << source << " and tag " << tag
0573 << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
0574 #endif
0575 BOOST_ASSERT(context == trc_out_of_band);
0576
0577 boost::parallel::detail::untracked_pair<int, Type> data;
0578
0579
0580 int realtag = self.encode_tag(block == -1 ? self.my_block_number() : block,
0581 tag);
0582 detail::do_oob_receive(self, source, realtag, data);
0583
0584
0585
0586 send_oob(self, source, data.first,
0587 handler(source, tag, data.second, context), -2);
0588 }
0589
0590 template<typename Type, typename Handler>
0591 void
0592 mpi_process_group::global_trigger_launcher<Type, Handler>::
0593 receive(mpi_process_group const& self, int source, int tag,
0594 trigger_receive_context context, int block) const
0595 {
0596 #ifdef PBGL_PROCESS_GROUP_DEBUG
0597 std::cerr << (out_of_band? "OOB trigger" : "Trigger")
0598 << " receive from source " << source << " and tag " << tag
0599 << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
0600 #endif
0601
0602 Type data;
0603
0604 if (context == trc_out_of_band) {
0605
0606 int realtag = self.encode_tag(
0607 block == -1 ? self.my_block_number() : block, tag);
0608 detail::do_oob_receive(self,source,realtag,data);
0609 }
0610 else
0611
0612 boost::graph::distributed::receive(self, source, tag, data);
0613
0614
0615 handler(self, source, tag, data, context);
0616 }
0617
0618
0619 template<typename Type, typename Handler>
0620 void
0621 mpi_process_group::global_irecv_trigger_launcher<Type, Handler>::
0622 receive(mpi_process_group const& self, int source, int tag,
0623 trigger_receive_context context, int block) const
0624 {
0625 #ifdef PBGL_PROCESS_GROUP_DEBUG
0626 std::cerr << (out_of_band? "OOB trigger" : "Trigger")
0627 << " receive from source " << source << " and tag " << tag
0628 << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
0629 #endif
0630
0631 Type data;
0632
0633 if (context == trc_out_of_band) {
0634 return;
0635 }
0636 BOOST_ASSERT (context == trc_irecv_out_of_band);
0637
0638
0639 boost::mpi::packed_iarchive ia(self.impl_->comm,self.impl_->buffers[tag]);
0640 ia >> data;
0641
0642 prepare_receive(self,tag,true);
0643
0644 handler(self, source, tag, data, context);
0645 }
0646
0647
0648 template<typename Type, typename Handler>
0649 void
0650 mpi_process_group::global_irecv_trigger_launcher<Type, Handler>::
0651 prepare_receive(mpi_process_group const& self, int tag, bool force) const
0652 {
0653 #ifdef PBGL_PROCESS_GROUP_DEBUG
0654 std::cerr << ("Posting Irecv for trigger")
0655 << " receive with tag " << tag << std::endl;
0656 #endif
0657 if (self.impl_->buffers.find(tag) == self.impl_->buffers.end()) {
0658 self.impl_->buffers[tag].resize(buffer_size);
0659 force = true;
0660 }
0661 BOOST_ASSERT(static_cast<int>(self.impl_->buffers[tag].size()) >= buffer_size);
0662
0663
0664 if (force) {
0665 self.impl_->requests.push_back(MPI_Request());
0666 MPI_Request* request = &self.impl_->requests.back();
0667 MPI_Irecv(&self.impl_->buffers[tag].front(),buffer_size,
0668 MPI_PACKED,MPI_ANY_SOURCE,tag,self.impl_->comm,request);
0669 }
0670 }
0671
0672
0673 template<typename T>
0674 inline mpi_process_group::process_id_type
0675 receive(const mpi_process_group& pg, int tag, T& value)
0676 {
0677 for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) {
0678 if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
0679 value, boost::mpi::is_mpi_datatype<T>()))
0680 return source;
0681 }
0682 BOOST_ASSERT (false);
0683 }
0684
0685 template<typename T>
0686 typename
0687 enable_if<boost::mpi::is_mpi_datatype<T>,
0688 std::pair<mpi_process_group::process_id_type, std::size_t> >::type
0689 receive(const mpi_process_group& pg, int tag, T values[], std::size_t n)
0690 {
0691 for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) {
0692 bool result =
0693 pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
0694 boost::serialization::make_array(values,n),
0695 boost::mpl::true_());
0696 if (result)
0697 return std::make_pair(source, n);
0698 }
0699 BOOST_ASSERT(false);
0700 }
0701
0702 template<typename T>
0703 typename
0704 disable_if<boost::mpi::is_mpi_datatype<T>,
0705 std::pair<mpi_process_group::process_id_type, std::size_t> >::type
0706 receive(const mpi_process_group& pg, int tag, T values[], std::size_t n)
0707 {
0708 for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) {
0709 if (pg.array_receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
0710 values, n))
0711 return std::make_pair(source, n);
0712 }
0713 BOOST_ASSERT(false);
0714 }
0715
0716 template<typename T>
0717 mpi_process_group::process_id_type
0718 receive(const mpi_process_group& pg,
0719 mpi_process_group::process_id_type source, int tag, T& value)
0720 {
0721 if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
0722 value, boost::mpi::is_mpi_datatype<T>()))
0723 return source;
0724 else {
0725 fprintf(stderr,
0726 "Process %d failed to receive a message from process %d with tag %d in block %d.\n",
0727 process_id(pg), source, tag, pg.my_block_number());
0728
0729 BOOST_ASSERT(false);
0730 abort();
0731 }
0732 }
0733
0734 template<typename T>
0735 typename
0736 enable_if<boost::mpi::is_mpi_datatype<T>,
0737 std::pair<mpi_process_group::process_id_type, std::size_t> >::type
0738 receive(const mpi_process_group& pg, int source, int tag, T values[],
0739 std::size_t n)
0740 {
0741 if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
0742 boost::serialization::make_array(values,n),
0743 boost::mpl::true_()))
0744 return std::make_pair(source,n);
0745 else {
0746 fprintf(stderr,
0747 "Process %d failed to receive a message from process %d with tag %d in block %d.\n",
0748 process_id(pg), source, tag, pg.my_block_number());
0749
0750 BOOST_ASSERT(false);
0751 abort();
0752 }
0753 }
0754
0755 template<typename T>
0756 typename
0757 disable_if<boost::mpi::is_mpi_datatype<T>,
0758 std::pair<mpi_process_group::process_id_type, std::size_t> >::type
0759 receive(const mpi_process_group& pg, int source, int tag, T values[],
0760 std::size_t n)
0761 {
0762 pg.array_receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
0763 values, n);
0764
0765 return std::make_pair(source, n);
0766 }
0767
0768 template<typename T, typename BinaryOperation>
0769 T*
0770 all_reduce(const mpi_process_group& pg, T* first, T* last, T* out,
0771 BinaryOperation bin_op)
0772 {
0773 synchronize(pg);
0774
0775 bool inplace = first == out;
0776
0777 if (inplace) out = new T [last-first];
0778
0779 boost::mpi::all_reduce(boost::mpi::communicator(communicator(pg),
0780 boost::mpi::comm_attach),
0781 first, last-first, out, bin_op);
0782
0783 if (inplace) {
0784 std::copy(out, out + (last-first), first);
0785 delete [] out;
0786 return last;
0787 }
0788
0789 return out;
0790 }
0791
0792 template<typename T>
0793 void
0794 broadcast(const mpi_process_group& pg, T& val,
0795 mpi_process_group::process_id_type root)
0796 {
0797
0798 boost::mpi::communicator comm(communicator(pg),boost::mpi::comm_attach);
0799 boost::mpi::broadcast(comm,val,root);
0800 }
0801
0802
0803 template<typename T, typename BinaryOperation>
0804 T*
0805 scan(const mpi_process_group& pg, T* first, T* last, T* out,
0806 BinaryOperation bin_op)
0807 {
0808 synchronize(pg);
0809
0810 bool inplace = first == out;
0811
0812 if (inplace) out = new T [last-first];
0813
0814 boost::mpi::scan(communicator(pg), first, last-first, out, bin_op);
0815
0816 if (inplace) {
0817 std::copy(out, out + (last-first), first);
0818 delete [] out;
0819 return last;
0820 }
0821
0822 return out;
0823 }
0824
0825
0826 template<typename InputIterator, typename T>
0827 void
0828 all_gather(const mpi_process_group& pg, InputIterator first,
0829 InputIterator last, std::vector<T>& out)
0830 {
0831 synchronize(pg);
0832
0833
0834 std::vector<T> local_values(first, last);
0835
0836
0837 int size = local_values.size();
0838 std::vector<int> sizes(num_processes(pg));
0839 int result = MPI_Allgather(&size, 1, MPI_INT,
0840 &sizes[0], 1, MPI_INT,
0841 communicator(pg));
0842 BOOST_ASSERT(result == MPI_SUCCESS);
0843 (void)result;
0844
0845
0846
0847
0848
0849
0850
0851
0852 for( std::size_t i = 0, n = sizes.size(); i < n; ++i )
0853 {
0854 sizes[ i ] *= sizeof( T );
0855 }
0856
0857
0858 std::vector<int> displacements;
0859 displacements.reserve(sizes.size() + 1);
0860 displacements.push_back(0);
0861 std::partial_sum(sizes.begin(), sizes.end(),
0862 std::back_inserter(displacements));
0863
0864
0865 out.resize(displacements.back() / sizeof(T));
0866 if (!out.empty()) {
0867 result = MPI_Allgatherv(local_values.empty()? (void*)&local_values
0868 : (void*)&local_values[0],
0869 local_values.size() * sizeof(T),
0870 MPI_BYTE,
0871 &out[0], &sizes[0], &displacements[0], MPI_BYTE,
0872 communicator(pg));
0873 }
0874 BOOST_ASSERT(result == MPI_SUCCESS);
0875 }
0876
0877 template<typename InputIterator>
0878 mpi_process_group
0879 process_subgroup(const mpi_process_group& pg,
0880 InputIterator first, InputIterator last)
0881 {
0882
0883
0884
0885
0886
0887
0888 std::vector<int> ranks(first, last);
0889
0890 MPI_Group current_group;
0891 int result = MPI_Comm_group(communicator(pg), ¤t_group);
0892 BOOST_ASSERT(result == MPI_SUCCESS);
0893 (void)result;
0894
0895 MPI_Group new_group;
0896 result = MPI_Group_incl(current_group, ranks.size(), &ranks[0], &new_group);
0897 BOOST_ASSERT(result == MPI_SUCCESS);
0898
0899 MPI_Comm new_comm;
0900 result = MPI_Comm_create(communicator(pg), new_group, &new_comm);
0901 BOOST_ASSERT(result == MPI_SUCCESS);
0902
0903 result = MPI_Group_free(&new_group);
0904 BOOST_ASSERT(result == MPI_SUCCESS);
0905 result = MPI_Group_free(¤t_group);
0906 BOOST_ASSERT(result == MPI_SUCCESS);
0907
0908 if (new_comm != MPI_COMM_NULL) {
0909 mpi_process_group result_pg(boost::mpi::communicator(new_comm,boost::mpi::comm_attach));
0910 result = MPI_Comm_free(&new_comm);
0911 BOOST_ASSERT(result == 0);
0912 return result_pg;
0913 } else {
0914 return mpi_process_group(mpi_process_group::create_empty());
0915 }
0916
0917 }
0918
0919
0920 template<typename Receiver>
0921 Receiver* mpi_process_group::get_receiver()
0922 {
0923 return impl_->blocks[my_block_number()]->on_receive
0924 .template target<Receiver>();
0925 }
0926
0927 template<typename T>
0928 typename enable_if<boost::mpi::is_mpi_datatype<T> >::type
0929 receive_oob(const mpi_process_group& pg,
0930 mpi_process_group::process_id_type source, int tag, T& value, int block)
0931 {
0932 using boost::mpi::get_mpi_datatype;
0933
0934
0935
0936 std::pair<boost::mpi::communicator, int> actual
0937 = pg.actual_communicator_and_tag(tag, block);
0938
0939
0940 MPI_Request request;
0941 MPI_Irecv(&value, 1, get_mpi_datatype<T>(value),
0942 source, actual.second, actual.first, &request);
0943
0944 int done = 0;
0945 do {
0946 MPI_Test(&request, &done, MPI_STATUS_IGNORE);
0947 if (!done)
0948 pg.poll(false, block);
0949 } while (!done);
0950 }
0951
0952 template<typename T>
0953 typename disable_if<boost::mpi::is_mpi_datatype<T> >::type
0954 receive_oob(const mpi_process_group& pg,
0955 mpi_process_group::process_id_type source, int tag, T& value, int block)
0956 {
0957
0958
0959 std::pair<boost::mpi::communicator, int> actual
0960 = pg.actual_communicator_and_tag(tag, block);
0961
0962 boost::optional<boost::mpi::status> status;
0963 do {
0964 status = actual.first.iprobe(source, actual.second);
0965 if (!status)
0966 pg.poll();
0967 } while (!status);
0968
0969
0970
0971
0972 boost::mpi::packed_iarchive in(actual.first);
0973
0974 #if BOOST_VERSION >= 103600
0975 in.resize(status->count<boost::mpi::packed>().get());
0976 #else
0977 int size;
0978 MPI_Status mpi_status = *status;
0979 MPI_Get_count(&mpi_status, MPI_PACKED, &size);
0980 in.resize(size);
0981 #endif
0982
0983
0984 MPI_Recv(in.address(), in.size(), MPI_PACKED,
0985 status->source(), status->tag(), actual.first, MPI_STATUS_IGNORE);
0986
0987
0988 in >> value;
0989 }
0990
0991
0992 template<typename SendT, typename ReplyT>
0993 typename enable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
0994 send_oob_with_reply(const mpi_process_group& pg,
0995 mpi_process_group::process_id_type dest,
0996 int tag, const SendT& send_value, ReplyT& reply_value,
0997 int block)
0998 {
0999 detail::tag_allocator::token reply_tag = pg.impl_->allocated_tags.get_tag();
1000 send_oob(pg, dest, tag, boost::parallel::detail::make_untracked_pair(
1001 (int)reply_tag, send_value), block);
1002 receive_oob(pg, dest, reply_tag, reply_value);
1003 }
1004
1005 template<typename SendT, typename ReplyT>
1006 typename disable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
1007 send_oob_with_reply(const mpi_process_group& pg,
1008 mpi_process_group::process_id_type dest,
1009 int tag, const SendT& send_value, ReplyT& reply_value,
1010 int block)
1011 {
1012 detail::tag_allocator::token reply_tag = pg.impl_->allocated_tags.get_tag();
1013 send_oob(pg, dest, tag,
1014 boost::parallel::detail::make_untracked_pair((int)reply_tag,
1015 send_value), block);
1016 receive_oob(pg, dest, reply_tag, reply_value);
1017 }
1018
1019 } } }