Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 09:37:13

0001 // -*- C++ -*-
0002 
0003 // Copyright (C) 2004-2008 The Trustees of Indiana University.
0004 // Copyright (C) 2007  Douglas Gregor <doug.gregor@gmail.com>
0005 // Copyright (C) 2007  Matthias Troyer  <troyer@boost-consulting.com>
0006 
0007 // Use, modification and distribution is subject to the Boost Software
0008 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
0009 // http://www.boost.org/LICENSE_1_0.txt)
0010 
0011 //  Authors: Douglas Gregor
0012 //           Andrew Lumsdaine
0013 //           Matthias Troyer
0014 
0015 //#define PBGL_PROCESS_GROUP_DEBUG
0016 
0017 #ifndef BOOST_GRAPH_USE_MPI
0018 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
0019 #endif
0020 
0021 #include <boost/assert.hpp>
0022 #include <algorithm>
0023 #include <boost/graph/parallel/detail/untracked_pair.hpp>
0024 #include <numeric>
0025 #include <iterator>
0026 #include <functional>
0027 #include <vector>
0028 #include <queue>
0029 #include <stack>
0030 #include <list>
0031 #include <map>
0032 #include <boost/graph/distributed/detail/tag_allocator.hpp>
0033 #include <stdio.h>
0034 
0035 // #define PBGL_PROCESS_GROUP_DEBUG
0036 
0037 #ifdef PBGL_PROCESS_GROUP_DEBUG
0038 #  include <iostream>
0039 #endif
0040 
0041 namespace boost { namespace graph { namespace distributed {
0042 
0043 struct mpi_process_group::impl
0044 {
0045   
0046   typedef mpi_process_group::message_header message_header;
0047   typedef mpi_process_group::outgoing_messages outgoing_messages;
0048 
0049   /**
0050    * Stores the incoming messages from a particular processor.
0051    *
0052    * @todo Evaluate whether we should use a deque instance, which
0053    * would reduce could reduce the cost of "receiving" messages and
0054      allow us to deallocate memory earlier, but increases the time
0055      spent in the synchronization step.
0056    */
0057   struct incoming_messages {
0058     incoming_messages();
0059     ~incoming_messages() {}
0060 
0061     std::vector<message_header> headers;
0062     buffer_type                 buffer;
0063     std::vector<std::vector<message_header>::iterator> next_header;
0064   };
0065 
0066   struct batch_request {
0067     MPI_Request request;
0068     buffer_type buffer;
0069   };
0070 
0071   // send once we have a certain number of messages or bytes in the buffer
0072   // these numbers need to be tuned, we keep them small at first for testing
0073   std::size_t batch_header_number;
0074   std::size_t batch_buffer_size;
0075   std::size_t batch_message_size;
0076   
0077   /**
0078    * The actual MPI communicator used to transmit data.
0079    */
0080   boost::mpi::communicator             comm;
0081 
0082   /**
0083    * The MPI communicator used to transmit out-of-band replies.
0084    */
0085   boost::mpi::communicator             oob_reply_comm;
0086 
0087   /// Outgoing message information, indexed by destination processor.
0088   std::vector<outgoing_messages> outgoing;
0089 
0090   /// Incoming message information, indexed by source processor.
0091   std::vector<incoming_messages> incoming;
0092 
0093   /// The numbers of processors that have entered a synchronization stage
0094   std::vector<int> processors_synchronizing_stage;
0095   
0096   /// The synchronization stage of a processor
0097   std::vector<int> synchronizing_stage;
0098 
0099   /// Number of processors still sending messages
0100   std::vector<int> synchronizing_unfinished;
0101   
0102   /// Number of batches sent since last synchronization stage
0103   std::vector<int> number_sent_batches;
0104   
0105   /// Number of batches received minus number of expected batches
0106   std::vector<int> number_received_batches;
0107   
0108 
0109   /// The context of the currently-executing trigger, or @c trc_none
0110   /// if no trigger is executing.
0111   trigger_receive_context trigger_context;
0112 
0113   /// Non-zero indicates that we're processing batches
0114   /// Increment this when processing patches,
0115   /// decrement it when you're done.
0116   int processing_batches;
0117 
0118   /**
0119    * Contains all of the active blocks corresponding to attached
0120    * distributed data structures.
0121    */
0122   blocks_type blocks;
0123 
0124   /// Whether we are currently synchronizing
0125   bool synchronizing;
0126 
0127   /// The MPI requests for posted sends of oob messages
0128   std::vector<MPI_Request> requests;
0129   
0130   /// The MPI buffers for posted irecvs of oob messages
0131   std::map<int,buffer_type> buffers;
0132 
0133   /// Queue for message batches received while already processing messages
0134   std::queue<std::pair<int,outgoing_messages> > new_batches;
0135   /// Maximum encountered size of the new_batches queue
0136   std::size_t max_received;
0137 
0138   /// The MPI requests and buffers for batchess being sent
0139   std::list<batch_request> sent_batches;
0140   /// Maximum encountered size of the sent_batches list
0141   std::size_t max_sent;
0142 
0143   /// Pre-allocated requests in a pool
0144   std::vector<batch_request> batch_pool;
0145   /// A stack controlling which batches are available
0146   std::stack<std::size_t> free_batches;
0147 
0148   void free_sent_batches();
0149   
0150   // Tag allocator
0151   detail::tag_allocator allocated_tags;
0152 
0153   impl(std::size_t num_headers, std::size_t buffers_size,
0154        communicator_type parent_comm);
0155   ~impl();
0156   
0157 private:
0158   void set_batch_size(std::size_t header_num, std::size_t buffer_sz);
0159 };
0160 
0161 inline trigger_receive_context mpi_process_group::trigger_context() const
0162 {
0163   return impl_->trigger_context;
0164 }
0165 
0166 template<typename T>
0167 void
0168 mpi_process_group::send_impl(int dest, int tag, const T& value,
0169                              mpl::true_ /*is_mpi_datatype*/) const
0170 {
0171   BOOST_ASSERT(tag <  msg_reserved_first || tag > msg_reserved_last);
0172 
0173   impl::outgoing_messages& outgoing = impl_->outgoing[dest];
0174 
0175   // Start constructing the message header
0176   impl::message_header header;
0177   header.source = process_id(*this);
0178   header.tag = tag;
0179   header.offset = outgoing.buffer.size();
0180   
0181   boost::mpi::packed_oarchive oa(impl_->comm, outgoing.buffer);
0182   oa << value;
0183 
0184 #ifdef PBGL_PROCESS_GROUP_DEBUG
0185   std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = "
0186             << tag << ", bytes = " << packed_size << std::endl;
0187 #endif
0188 
0189   // Store the header
0190   header.bytes = outgoing.buffer.size() - header.offset;
0191   outgoing.headers.push_back(header);
0192 
0193   maybe_send_batch(dest);
0194 }
0195 
0196 
0197 template<typename T>
0198 void
0199 mpi_process_group::send_impl(int dest, int tag, const T& value,
0200                              mpl::false_ /*is_mpi_datatype*/) const
0201 {
0202   BOOST_ASSERT(tag <  msg_reserved_first || tag > msg_reserved_last);
0203 
0204   impl::outgoing_messages& outgoing = impl_->outgoing[dest];
0205 
0206   // Start constructing the message header
0207   impl::message_header header;
0208   header.source = process_id(*this);
0209   header.tag = tag;
0210   header.offset = outgoing.buffer.size();
0211 
0212   // Serialize into the buffer
0213   boost::mpi::packed_oarchive out(impl_->comm, outgoing.buffer);
0214   out << value;
0215 
0216   // Store the header
0217   header.bytes = outgoing.buffer.size() - header.offset;
0218   outgoing.headers.push_back(header);
0219   maybe_send_batch(dest);
0220 
0221 #ifdef PBGL_PROCESS_GROUP_DEBUG
0222   std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = "
0223             << tag << ", bytes = " << header.bytes << std::endl;
0224 #endif
0225 }
0226 
0227 template<typename T>
0228 inline void
0229 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
0230      int tag, const T& value)
0231 {
0232   pg.send_impl(dest, pg.encode_tag(pg.my_block_number(), tag), value,
0233                boost::mpi::is_mpi_datatype<T>());
0234 }
0235 
0236 template<typename T>
0237 typename enable_if<boost::mpi::is_mpi_datatype<T>, void>::type
0238 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
0239      int tag, const T values[], std::size_t n)
0240 {
0241   pg.send_impl(dest, pg.encode_tag(pg.my_block_number(), tag),
0242                  boost::serialization::make_array(values,n), 
0243                  boost::mpl::true_());
0244 }
0245 
0246 template<typename T>
0247 typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type
0248 mpi_process_group::
0249 array_send_impl(int dest, int tag, const T values[], std::size_t n) const
0250 {
0251   BOOST_ASSERT(tag <  msg_reserved_first || tag > msg_reserved_last);
0252 
0253   impl::outgoing_messages& outgoing = impl_->outgoing[dest];
0254 
0255   // Start constructing the message header
0256   impl::message_header header;
0257   header.source = process_id(*this);
0258   header.tag = tag;
0259   header.offset = outgoing.buffer.size();
0260 
0261   // Serialize into the buffer
0262   boost::mpi::packed_oarchive out(impl_->comm, outgoing.buffer);
0263   out << n;
0264 
0265   for (std::size_t i = 0; i < n; ++i)
0266     out << values[i];
0267 
0268   // Store the header
0269   header.bytes = outgoing.buffer.size() - header.offset;
0270   outgoing.headers.push_back(header);
0271   maybe_send_batch(dest);
0272 
0273 #ifdef PBGL_PROCESS_GROUP_DEBUG
0274   std::cerr << "SEND: " << process_id(*this) << " -> " << dest << ", tag = "
0275             << tag << ", bytes = " << header.bytes << std::endl;
0276 #endif
0277 }
0278 
0279 template<typename T>
0280 typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type
0281 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
0282      int tag, const T values[], std::size_t n)
0283 {
0284   pg.array_send_impl(dest, pg.encode_tag(pg.my_block_number(), tag), 
0285                      values, n);
0286 }
0287 
0288 template<typename InputIterator>
0289 void
0290 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
0291      int tag, InputIterator first, InputIterator last)
0292 {
0293   typedef typename std::iterator_traits<InputIterator>::value_type value_type;
0294   std::vector<value_type> values(first, last);
0295   if (values.empty()) send(pg, dest, tag, static_cast<value_type*>(0), 0);
0296   else send(pg, dest, tag, &values[0], values.size());
0297 }
0298 
0299 template<typename T>
0300 bool
0301 mpi_process_group::receive_impl(int source, int tag, T& value,
0302                                 mpl::true_ /*is_mpi_datatype*/) const
0303 {
0304 #ifdef PBGL_PROCESS_GROUP_DEBUG
0305   std::cerr << "RECV: " << process_id(*this) << " <- " << source << ", tag = "
0306             << tag << std::endl;
0307 #endif
0308 
0309   impl::incoming_messages& incoming = impl_->incoming[source];
0310 
0311   // Find the next header with the right tag
0312   std::vector<impl::message_header>::iterator header =
0313     incoming.next_header[my_block_number()];
0314   while (header != incoming.headers.end() && header->tag != tag) ++header;
0315 
0316   // If no header is found, notify the caller
0317   if (header == incoming.headers.end()) return false;
0318 
0319   // Unpack the data
0320   if (header->bytes > 0) {
0321     boost::mpi::packed_iarchive ia(impl_->comm, incoming.buffer, 
0322                                    archive::no_header, header->offset);
0323     ia >> value;
0324   }
0325 
0326   // Mark this message as received
0327   header->tag = -1;
0328 
0329   // Move the "next header" indicator to the next unreceived message
0330   while (incoming.next_header[my_block_number()] != incoming.headers.end()
0331          && incoming.next_header[my_block_number()]->tag == -1)
0332     ++incoming.next_header[my_block_number()];
0333 
0334   if (incoming.next_header[my_block_number()] == incoming.headers.end()) {
0335     bool finished = true;
0336     for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) {
0337       if (incoming.next_header[i] != incoming.headers.end()) finished = false;
0338     }
0339 
0340     if (finished) {
0341       std::vector<impl::message_header> no_headers;
0342       incoming.headers.swap(no_headers);
0343       buffer_type empty_buffer;
0344       incoming.buffer.swap(empty_buffer);
0345       for (std::size_t i = 0; i < incoming.next_header.size(); ++i)
0346         incoming.next_header[i] = incoming.headers.end();
0347     }
0348   }
0349 
0350   return true;
0351 }
0352 
0353 template<typename T>
0354 bool
0355 mpi_process_group::receive_impl(int source, int tag, T& value,
0356                                 mpl::false_ /*is_mpi_datatype*/) const
0357 {
0358   impl::incoming_messages& incoming = impl_->incoming[source];
0359 
0360   // Find the next header with the right tag
0361   std::vector<impl::message_header>::iterator header =
0362     incoming.next_header[my_block_number()];
0363   while (header != incoming.headers.end() && header->tag != tag) ++header;
0364 
0365   // If no header is found, notify the caller
0366   if (header == incoming.headers.end()) return false;
0367 
0368   // Deserialize the data
0369   boost::mpi::packed_iarchive in(impl_->comm, incoming.buffer, 
0370                                  archive::no_header, header->offset);
0371   in >> value;
0372 
0373   // Mark this message as received
0374   header->tag = -1;
0375 
0376   // Move the "next header" indicator to the next unreceived message
0377   while (incoming.next_header[my_block_number()] != incoming.headers.end()
0378          && incoming.next_header[my_block_number()]->tag == -1)
0379     ++incoming.next_header[my_block_number()];
0380 
0381   if (incoming.next_header[my_block_number()] == incoming.headers.end()) {
0382     bool finished = true;
0383     for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) {
0384       if (incoming.next_header[i] != incoming.headers.end()) finished = false;
0385     }
0386 
0387     if (finished) {
0388       std::vector<impl::message_header> no_headers;
0389       incoming.headers.swap(no_headers);
0390       buffer_type empty_buffer;
0391       incoming.buffer.swap(empty_buffer);
0392       for (std::size_t i = 0; i < incoming.next_header.size(); ++i)
0393         incoming.next_header[i] = incoming.headers.end();
0394     }
0395   }
0396 
0397   return true;
0398 }
0399 
0400 template<typename T>
0401 typename disable_if<boost::mpi::is_mpi_datatype<T>, bool>::type
0402 mpi_process_group::
0403 array_receive_impl(int source, int tag, T* values, std::size_t& n) const
0404 {
0405   impl::incoming_messages& incoming = impl_->incoming[source];
0406 
0407   // Find the next header with the right tag
0408   std::vector<impl::message_header>::iterator header =
0409     incoming.next_header[my_block_number()];
0410   while (header != incoming.headers.end() && header->tag != tag) ++header;
0411 
0412   // If no header is found, notify the caller
0413   if (header == incoming.headers.end()) return false;
0414 
0415   // Deserialize the data
0416   boost::mpi::packed_iarchive in(impl_->comm, incoming.buffer, 
0417                                  archive::no_header, header->offset);
0418   std::size_t num_sent;
0419   in >> num_sent;
0420   if (num_sent > n)
0421     std::cerr << "ERROR: Have " << num_sent << " items but only space for "
0422               << n << " items\n";
0423 
0424   for (std::size_t i = 0; i < num_sent; ++i)
0425     in >> values[i];
0426   n = num_sent;
0427 
0428   // Mark this message as received
0429   header->tag = -1;
0430 
0431   // Move the "next header" indicator to the next unreceived message
0432   while (incoming.next_header[my_block_number()] != incoming.headers.end()
0433          && incoming.next_header[my_block_number()]->tag == -1)
0434     ++incoming.next_header[my_block_number()];
0435 
0436   if (incoming.next_header[my_block_number()] == incoming.headers.end()) {
0437     bool finished = true;
0438     for (std::size_t i = 0; i < incoming.next_header.size() && finished; ++i) {
0439       if (incoming.next_header[i] != incoming.headers.end()) finished = false;
0440     }
0441 
0442     if (finished) {
0443       std::vector<impl::message_header> no_headers;
0444       incoming.headers.swap(no_headers);
0445       buffer_type empty_buffer;
0446       incoming.buffer.swap(empty_buffer);
0447       for (std::size_t i = 0; i < incoming.next_header.size(); ++i)
0448         incoming.next_header[i] = incoming.headers.end();
0449     }
0450   }
0451 
0452   return true;
0453 }
0454 
0455 // Construct triggers
0456 template<typename Type, typename Handler>
0457 void mpi_process_group::trigger(int tag, const Handler& handler)
0458 {
0459   BOOST_ASSERT(block_num);
0460   install_trigger(tag,my_block_number(),shared_ptr<trigger_base>(
0461     new trigger_launcher<Type, Handler>(*this, tag, handler)));
0462 }
0463 
0464 template<typename Type, typename Handler>
0465 void mpi_process_group::trigger_with_reply(int tag, const Handler& handler)
0466 {
0467   BOOST_ASSERT(block_num);
0468   install_trigger(tag,my_block_number(),shared_ptr<trigger_base>(
0469     new reply_trigger_launcher<Type, Handler>(*this, tag, handler)));
0470 }
0471 
0472 template<typename Type, typename Handler>
0473 void mpi_process_group::global_trigger(int tag, const Handler& handler, 
0474       std::size_t sz)
0475 {
0476   if (sz==0) // normal trigger
0477     install_trigger(tag,0,shared_ptr<trigger_base>(
0478       new global_trigger_launcher<Type, Handler>(*this, tag, handler)));
0479   else // trigger with irecv
0480     install_trigger(tag,0,shared_ptr<trigger_base>(
0481       new global_irecv_trigger_launcher<Type, Handler>(*this, tag, handler,sz)));
0482   
0483 }
0484 
0485 namespace detail {
0486 
0487 template<typename Type>
0488 void  do_oob_receive(mpi_process_group const& self,
0489     int source, int tag, Type& data, mpl::true_ /*is_mpi_datatype*/) 
0490 {
0491   using boost::mpi::get_mpi_datatype;
0492 
0493   //self.impl_->comm.recv(source,tag,data);
0494   MPI_Recv(&data, 1, get_mpi_datatype<Type>(data), source, tag, self.impl_->comm,
0495            MPI_STATUS_IGNORE);
0496 }
0497 
0498 template<typename Type>
0499 void do_oob_receive(mpi_process_group const& self,
0500     int source, int tag, Type& data, mpl::false_ /*is_mpi_datatype*/) 
0501 {
0502   //  self.impl_->comm.recv(source,tag,data);
0503   // Receive the size of the data packet
0504   boost::mpi::status status;
0505   status = self.impl_->comm.probe(source, tag);
0506 
0507 #if BOOST_VERSION >= 103600
0508   int size = status.count<boost::mpi::packed>().get();
0509 #else
0510   int size;
0511   MPI_Status& mpi_status = status;
0512   MPI_Get_count(&mpi_status, MPI_PACKED, &size);
0513 #endif
0514 
0515   // Receive the data packed itself
0516   boost::mpi::packed_iarchive in(self.impl_->comm);
0517   in.resize(size);
0518   MPI_Recv(in.address(), size, MPI_PACKED, source, tag, self.impl_->comm,
0519        MPI_STATUS_IGNORE);
0520 
0521   // Deserialize the data
0522   in >> data;
0523 }
0524 
0525 template<typename Type>
0526 void do_oob_receive(mpi_process_group const& self, int source, int tag, Type& data) 
0527 {
0528   do_oob_receive(self, source, tag, data,
0529                            boost::mpi::is_mpi_datatype<Type>());
0530 }
0531 
0532 
0533 } // namespace detail
0534 
0535 
0536 template<typename Type, typename Handler>
0537 void 
0538 mpi_process_group::trigger_launcher<Type, Handler>::
0539 receive(mpi_process_group const&, int source, int tag, 
0540         trigger_receive_context context, int block) const
0541 {
0542 #ifdef PBGL_PROCESS_GROUP_DEBUG
0543   std::cerr << (out_of_band? "OOB trigger" : "Trigger") 
0544             << " receive from source " << source << " and tag " << tag
0545         << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
0546 #endif
0547 
0548   Type data;
0549 
0550   if (context == trc_out_of_band) {
0551     // Receive the message directly off the wire
0552     int realtag  = self.encode_tag(
0553       block == -1 ? self.my_block_number() : block, tag);
0554     detail::do_oob_receive(self,source,realtag,data);
0555   }
0556   else
0557     // Receive the message out of the local buffer
0558     boost::graph::distributed::receive(self, source, tag, data);
0559 
0560   // Pass the message off to the handler
0561   handler(source, tag, data, context);
0562 }
0563 
0564 template<typename Type, typename Handler>
0565 void 
0566 mpi_process_group::reply_trigger_launcher<Type, Handler>::
0567 receive(mpi_process_group const&, int source, int tag, 
0568         trigger_receive_context context, int block) const
0569 {
0570 #ifdef PBGL_PROCESS_GROUP_DEBUG
0571   std::cerr << (out_of_band? "OOB reply trigger" : "Reply trigger") 
0572             << " receive from source " << source << " and tag " << tag
0573         << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
0574 #endif
0575   BOOST_ASSERT(context == trc_out_of_band);
0576 
0577   boost::parallel::detail::untracked_pair<int, Type> data;
0578 
0579   // Receive the message directly off the wire
0580   int realtag  = self.encode_tag(block == -1 ? self.my_block_number() : block,
0581                                  tag);
0582   detail::do_oob_receive(self, source, realtag, data);
0583 
0584   // Pass the message off to the handler and send the result back to
0585   // the source.
0586   send_oob(self, source, data.first, 
0587            handler(source, tag, data.second, context), -2);
0588 }
0589 
0590 template<typename Type, typename Handler>
0591 void 
0592 mpi_process_group::global_trigger_launcher<Type, Handler>::
0593 receive(mpi_process_group const& self, int source, int tag, 
0594         trigger_receive_context context, int block) const
0595 {
0596 #ifdef PBGL_PROCESS_GROUP_DEBUG
0597   std::cerr << (out_of_band? "OOB trigger" : "Trigger") 
0598             << " receive from source " << source << " and tag " << tag
0599         << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
0600 #endif
0601 
0602   Type data;
0603 
0604   if (context == trc_out_of_band) {
0605     // Receive the message directly off the wire
0606     int realtag  = self.encode_tag(
0607       block == -1 ? self.my_block_number() : block, tag);
0608     detail::do_oob_receive(self,source,realtag,data);
0609   }
0610   else
0611     // Receive the message out of the local buffer
0612     boost::graph::distributed::receive(self, source, tag, data);
0613 
0614   // Pass the message off to the handler
0615   handler(self, source, tag, data, context);
0616 }
0617 
0618 
0619 template<typename Type, typename Handler>
0620 void 
0621 mpi_process_group::global_irecv_trigger_launcher<Type, Handler>::
0622 receive(mpi_process_group const& self, int source, int tag, 
0623         trigger_receive_context context, int block) const
0624 {
0625 #ifdef PBGL_PROCESS_GROUP_DEBUG
0626   std::cerr << (out_of_band? "OOB trigger" : "Trigger") 
0627             << " receive from source " << source << " and tag " << tag
0628         << " in block " << (block == -1 ? self.my_block_number() : block) << std::endl;
0629 #endif
0630 
0631   Type data;
0632 
0633   if (context == trc_out_of_band) {
0634     return;
0635   }
0636   BOOST_ASSERT (context == trc_irecv_out_of_band);
0637 
0638   // force posting of new MPI_Irecv, even though buffer is already allocated
0639   boost::mpi::packed_iarchive ia(self.impl_->comm,self.impl_->buffers[tag]);
0640   ia >> data;
0641   // Start a new receive
0642   prepare_receive(self,tag,true);
0643   // Pass the message off to the handler
0644   handler(self, source, tag, data, context);
0645 }
0646 
0647 
0648 template<typename Type, typename Handler>
0649 void 
0650 mpi_process_group::global_irecv_trigger_launcher<Type, Handler>::
0651 prepare_receive(mpi_process_group const& self, int tag, bool force) const
0652 {
0653 #ifdef PBGL_PROCESS_GROUP_DEBUG
0654  std::cerr << ("Posting Irecv for trigger") 
0655       << " receive with tag " << tag << std::endl;
0656 #endif
0657   if (self.impl_->buffers.find(tag) == self.impl_->buffers.end()) {
0658     self.impl_->buffers[tag].resize(buffer_size);
0659     force = true;
0660   }
0661   BOOST_ASSERT(static_cast<int>(self.impl_->buffers[tag].size()) >= buffer_size);
0662   
0663   //BOOST_MPL_ASSERT(mpl::not_<is_mpi_datatype<Type> >);
0664   if (force) {
0665     self.impl_->requests.push_back(MPI_Request());
0666     MPI_Request* request = &self.impl_->requests.back();
0667     MPI_Irecv(&self.impl_->buffers[tag].front(),buffer_size,
0668                MPI_PACKED,MPI_ANY_SOURCE,tag,self.impl_->comm,request);
0669   }
0670 }
0671 
0672 
0673 template<typename T>
0674 inline mpi_process_group::process_id_type
0675 receive(const mpi_process_group& pg, int tag, T& value)
0676 {
0677   for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) {
0678     if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
0679                         value, boost::mpi::is_mpi_datatype<T>()))
0680       return source;
0681   }
0682   BOOST_ASSERT (false);
0683 }
0684 
0685 template<typename T>
0686 typename 
0687   enable_if<boost::mpi::is_mpi_datatype<T>, 
0688             std::pair<mpi_process_group::process_id_type, std::size_t> >::type
0689 receive(const mpi_process_group& pg, int tag, T values[], std::size_t n)
0690 {
0691   for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) {
0692     bool result =
0693       pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
0694                  boost::serialization::make_array(values,n),
0695                  boost::mpl::true_());
0696     if (result) 
0697       return std::make_pair(source, n);
0698   }
0699   BOOST_ASSERT(false);
0700 }
0701 
0702 template<typename T>
0703 typename 
0704   disable_if<boost::mpi::is_mpi_datatype<T>, 
0705              std::pair<mpi_process_group::process_id_type, std::size_t> >::type
0706 receive(const mpi_process_group& pg, int tag, T values[], std::size_t n)
0707 {
0708   for (std::size_t source = 0; source < pg.impl_->incoming.size(); ++source) {
0709     if (pg.array_receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
0710                               values, n))
0711       return std::make_pair(source, n);
0712   }
0713   BOOST_ASSERT(false);
0714 }
0715 
0716 template<typename T>
0717 mpi_process_group::process_id_type
0718 receive(const mpi_process_group& pg,
0719         mpi_process_group::process_id_type source, int tag, T& value)
0720 {
0721   if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
0722                       value, boost::mpi::is_mpi_datatype<T>()))
0723     return source;
0724   else {
0725     fprintf(stderr,
0726             "Process %d failed to receive a message from process %d with tag %d in block %d.\n",
0727             process_id(pg), source, tag, pg.my_block_number());
0728 
0729     BOOST_ASSERT(false);
0730     abort();
0731   }
0732 }
0733 
0734 template<typename T>
0735 typename 
0736   enable_if<boost::mpi::is_mpi_datatype<T>, 
0737             std::pair<mpi_process_group::process_id_type, std::size_t> >::type
0738 receive(const mpi_process_group& pg, int source, int tag, T values[], 
0739         std::size_t n)
0740 {
0741   if (pg.receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
0742                       boost::serialization::make_array(values,n), 
0743                       boost::mpl::true_()))
0744     return std::make_pair(source,n);
0745   else {
0746     fprintf(stderr,
0747             "Process %d failed to receive a message from process %d with tag %d in block %d.\n",
0748             process_id(pg), source, tag, pg.my_block_number());
0749 
0750     BOOST_ASSERT(false);
0751     abort();
0752   }
0753 }
0754 
0755 template<typename T>
0756 typename 
0757   disable_if<boost::mpi::is_mpi_datatype<T>, 
0758              std::pair<mpi_process_group::process_id_type, std::size_t> >::type
0759 receive(const mpi_process_group& pg, int source, int tag, T values[], 
0760         std::size_t n)
0761 {
0762   pg.array_receive_impl(source, pg.encode_tag(pg.my_block_number(), tag),
0763                         values, n);
0764 
0765   return std::make_pair(source, n);
0766 }
0767 
0768 template<typename T, typename BinaryOperation>
0769 T*
0770 all_reduce(const mpi_process_group& pg, T* first, T* last, T* out,
0771            BinaryOperation bin_op)
0772 {
0773   synchronize(pg);
0774 
0775   bool inplace = first == out;
0776 
0777   if (inplace) out = new T [last-first];
0778 
0779   boost::mpi::all_reduce(boost::mpi::communicator(communicator(pg),
0780                                                   boost::mpi::comm_attach), 
0781                          first, last-first, out, bin_op);
0782 
0783   if (inplace) {
0784     std::copy(out, out + (last-first), first);
0785     delete [] out;
0786     return last;
0787   }
0788 
0789   return out;
0790 }
0791 
0792 template<typename T>
0793 void
0794 broadcast(const mpi_process_group& pg, T& val, 
0795           mpi_process_group::process_id_type root)
0796 {
0797   // broadcast the seed  
0798   boost::mpi::communicator comm(communicator(pg),boost::mpi::comm_attach);
0799   boost::mpi::broadcast(comm,val,root);
0800 }
0801 
0802 
0803 template<typename T, typename BinaryOperation>
0804 T*
0805 scan(const mpi_process_group& pg, T* first, T* last, T* out,
0806            BinaryOperation bin_op)
0807 {
0808   synchronize(pg);
0809 
0810   bool inplace = first == out;
0811 
0812   if (inplace) out = new T [last-first];
0813 
0814   boost::mpi::scan(communicator(pg), first, last-first, out, bin_op);
0815 
0816   if (inplace) {
0817     std::copy(out, out + (last-first), first);
0818     delete [] out;
0819     return last;
0820   }
0821 
0822   return out;
0823 }
0824 
0825 
0826 template<typename InputIterator, typename T>
0827 void
0828 all_gather(const mpi_process_group& pg, InputIterator first,
0829            InputIterator last, std::vector<T>& out)
0830 {
0831   synchronize(pg);
0832 
0833   // Stick a copy of the local values into a vector, so we can broadcast it
0834   std::vector<T> local_values(first, last);
0835 
0836   // Collect the number of vertices stored in each process
0837   int size = local_values.size();
0838   std::vector<int> sizes(num_processes(pg));
0839   int result = MPI_Allgather(&size, 1, MPI_INT,
0840                              &sizes[0], 1, MPI_INT,
0841                              communicator(pg));
0842   BOOST_ASSERT(result == MPI_SUCCESS);
0843   (void)result;
0844 
0845   // Adjust sizes based on the number of bytes
0846   //
0847   // std::transform(sizes.begin(), sizes.end(), sizes.begin(),
0848   //               std::bind2nd(std::multiplies<int>(), sizeof(T)));
0849   //
0850   // std::bind2nd has been removed from C++17
0851 
0852   for( std::size_t i = 0, n = sizes.size(); i < n; ++i )
0853   {
0854     sizes[ i ] *= sizeof( T );
0855   }
0856 
0857   // Compute displacements
0858   std::vector<int> displacements;
0859   displacements.reserve(sizes.size() + 1);
0860   displacements.push_back(0);
0861   std::partial_sum(sizes.begin(), sizes.end(),
0862                    std::back_inserter(displacements));
0863 
0864   // Gather all of the values
0865   out.resize(displacements.back() / sizeof(T));
0866   if (!out.empty()) {
0867     result = MPI_Allgatherv(local_values.empty()? (void*)&local_values
0868                             /* local results */: (void*)&local_values[0],
0869                             local_values.size() * sizeof(T),
0870                             MPI_BYTE,
0871                             &out[0], &sizes[0], &displacements[0], MPI_BYTE,
0872                             communicator(pg));
0873   }
0874   BOOST_ASSERT(result == MPI_SUCCESS);
0875 }
0876 
0877 template<typename InputIterator>
0878 mpi_process_group
0879 process_subgroup(const mpi_process_group& pg,
0880                  InputIterator first, InputIterator last)
0881 {
0882 /*
0883   boost::mpi::group current_group = communicator(pg).group();
0884   boost::mpi::group new_group = current_group.include(first,last);
0885   boost::mpi::communicator new_comm(communicator(pg),new_group);
0886   return mpi_process_group(new_comm);
0887 */
0888   std::vector<int> ranks(first, last);
0889 
0890   MPI_Group current_group;
0891   int result = MPI_Comm_group(communicator(pg), &current_group);
0892   BOOST_ASSERT(result == MPI_SUCCESS);
0893   (void)result;
0894 
0895   MPI_Group new_group;
0896   result = MPI_Group_incl(current_group, ranks.size(), &ranks[0], &new_group);
0897   BOOST_ASSERT(result == MPI_SUCCESS);
0898 
0899   MPI_Comm new_comm;
0900   result = MPI_Comm_create(communicator(pg), new_group, &new_comm);
0901   BOOST_ASSERT(result == MPI_SUCCESS);
0902 
0903   result = MPI_Group_free(&new_group);
0904   BOOST_ASSERT(result == MPI_SUCCESS);
0905   result = MPI_Group_free(&current_group);
0906   BOOST_ASSERT(result == MPI_SUCCESS);
0907 
0908   if (new_comm != MPI_COMM_NULL) {
0909     mpi_process_group result_pg(boost::mpi::communicator(new_comm,boost::mpi::comm_attach));
0910     result = MPI_Comm_free(&new_comm);
0911     BOOST_ASSERT(result == 0);
0912     return result_pg;
0913   } else {
0914     return mpi_process_group(mpi_process_group::create_empty());
0915   }
0916 
0917 }
0918 
0919 
0920 template<typename Receiver>
0921 Receiver* mpi_process_group::get_receiver()
0922 {
0923   return impl_->blocks[my_block_number()]->on_receive
0924            .template target<Receiver>();
0925 }
0926 
0927 template<typename T>
0928 typename enable_if<boost::mpi::is_mpi_datatype<T> >::type
0929 receive_oob(const mpi_process_group& pg, 
0930             mpi_process_group::process_id_type source, int tag, T& value, int block)
0931 {
0932   using boost::mpi::get_mpi_datatype;
0933 
0934   // Determine the actual message we expect to receive, and which
0935   // communicator it will come by.
0936   std::pair<boost::mpi::communicator, int> actual
0937     = pg.actual_communicator_and_tag(tag, block);
0938 
0939   // Post a non-blocking receive that waits until we complete this request.
0940   MPI_Request request;
0941   MPI_Irecv(&value, 1, get_mpi_datatype<T>(value),  
0942             source, actual.second, actual.first, &request); 
0943 
0944   int done = 0;
0945   do {
0946     MPI_Test(&request, &done, MPI_STATUS_IGNORE);
0947     if (!done)
0948       pg.poll(/*wait=*/false, block);
0949   } while (!done);
0950 }
0951 
0952 template<typename T>
0953 typename disable_if<boost::mpi::is_mpi_datatype<T> >::type
0954 receive_oob(const mpi_process_group& pg, 
0955             mpi_process_group::process_id_type source, int tag, T& value, int block)
0956 {
0957   // Determine the actual message we expect to receive, and which
0958   // communicator it will come by.
0959   std::pair<boost::mpi::communicator, int> actual
0960     = pg.actual_communicator_and_tag(tag, block);
0961 
0962   boost::optional<boost::mpi::status> status;
0963   do {
0964     status = actual.first.iprobe(source, actual.second);
0965     if (!status)
0966       pg.poll();
0967   } while (!status);
0968 
0969   //actual.first.recv(status->source(), status->tag(),value);
0970 
0971   // Allocate the receive buffer
0972   boost::mpi::packed_iarchive in(actual.first);
0973 
0974 #if BOOST_VERSION >= 103600
0975   in.resize(status->count<boost::mpi::packed>().get());
0976 #else
0977   int size;
0978   MPI_Status mpi_status = *status;
0979   MPI_Get_count(&mpi_status, MPI_PACKED, &size);
0980   in.resize(size);
0981 #endif
0982   
0983   // Receive the message data
0984   MPI_Recv(in.address(), in.size(), MPI_PACKED,
0985            status->source(), status->tag(), actual.first, MPI_STATUS_IGNORE);
0986   
0987   // Unpack the message data
0988   in >> value;
0989 }
0990 
0991 
0992 template<typename SendT, typename ReplyT>
0993 typename enable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
0994 send_oob_with_reply(const mpi_process_group& pg, 
0995                     mpi_process_group::process_id_type dest,
0996                     int tag, const SendT& send_value, ReplyT& reply_value,
0997                     int block)
0998 {
0999   detail::tag_allocator::token reply_tag = pg.impl_->allocated_tags.get_tag();
1000   send_oob(pg, dest, tag, boost::parallel::detail::make_untracked_pair(
1001         (int)reply_tag, send_value), block);
1002   receive_oob(pg, dest, reply_tag, reply_value);
1003 }
1004 
1005 template<typename SendT, typename ReplyT>
1006 typename disable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
1007 send_oob_with_reply(const mpi_process_group& pg, 
1008                     mpi_process_group::process_id_type dest,
1009                     int tag, const SendT& send_value, ReplyT& reply_value,
1010                     int block)
1011 {
1012   detail::tag_allocator::token reply_tag = pg.impl_->allocated_tags.get_tag();
1013   send_oob(pg, dest, tag, 
1014            boost::parallel::detail::make_untracked_pair((int)reply_tag, 
1015                                                         send_value), block);
1016   receive_oob(pg, dest, reply_tag, reply_value);
1017 }
1018 
1019 } } } // end namespace boost::graph::distributed