Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 09:37:18

0001 // Copyright (C) 2004-2008 The Trustees of Indiana University.
0002 // Copyright (C) 2007   Douglas Gregor
0003 // Copyright (C) 2007  Matthias Troyer  <troyer@boost-consulting.com>
0004 
0005 // Use, modification and distribution is subject to the Boost Software
0006 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
0007 // http://www.boost.org/LICENSE_1_0.txt)
0008 
0009 //  Authors: Douglas Gregor
0010 //           Matthias Troyer
0011 //           Andrew Lumsdaine
0012 #ifndef BOOST_GRAPH_DISTRIBUTED_MPI_PROCESS_GROUP
0013 #define BOOST_GRAPH_DISTRIBUTED_MPI_PROCESS_GROUP
0014 
0015 #ifndef BOOST_GRAPH_USE_MPI
0016 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
0017 #endif
0018 
0019 //#define NO_SPLIT_BATCHES
0020 #define SEND_OOB_BSEND
0021 
0022 #include <boost/optional.hpp>
0023 #include <boost/shared_ptr.hpp>
0024 #include <boost/weak_ptr.hpp>
0025 #include <utility>
0026 #include <memory>
0027 #include <boost/function/function1.hpp>
0028 #include <boost/function/function2.hpp>
0029 #include <boost/function/function0.hpp>
0030 #include <boost/mpi.hpp>
0031 #include <boost/property_map/parallel/process_group.hpp>
0032 #include <boost/serialization/vector.hpp>
0033 #include <boost/utility/enable_if.hpp>
0034 
0035 namespace boost { namespace graph { namespace distributed {
0036 
0037 // Process group tags
0038 struct mpi_process_group_tag : virtual boost::parallel::linear_process_group_tag { };
0039 
0040 class mpi_process_group
0041 {
0042   struct impl;
0043 
0044  public:
0045   /// Number of tags available to each data structure.
0046   static const int max_tags = 256;
0047 
0048   /**
0049    * The type of a "receive" handler, that will be provided with
0050    * (source, tag) pairs when a message is received. Users can provide a
0051    * receive handler for a distributed data structure, for example, to
0052    * automatically pick up and respond to messages as needed.  
0053    */
0054   typedef function<void(int source, int tag)> receiver_type;
0055 
0056   /**
0057    * The type of a handler for the on-synchronize event, which will be
0058    * executed at the beginning of synchronize().
0059    */
0060   typedef function0<void>      on_synchronize_event_type;
0061 
0062   /// Used as a tag to help create an "empty" process group.
0063   struct create_empty {};
0064 
0065   /// The type used to buffer message data
0066   typedef boost::mpi::packed_oprimitive::buffer_type buffer_type;
0067 
0068   /// The type used to identify a process
0069   typedef int process_id_type;
0070 
0071   /// The type used to count the number of processes
0072   typedef int process_size_type;
0073 
0074   /// The type of communicator used to transmit data via MPI
0075   typedef boost::mpi::communicator communicator_type;
0076 
0077   /// Classification of the capabilities of this process group
0078   struct communication_category
0079     : virtual boost::parallel::bsp_process_group_tag, 
0080       virtual mpi_process_group_tag { };
0081 
0082   // TBD: We can eliminate the "source" field and possibly the
0083   // "offset" field.
0084   struct message_header {
0085     /// The process that sent the message
0086     process_id_type source;
0087 
0088     /// The message tag
0089     int tag;
0090 
0091     /// The offset of the message into the buffer
0092     std::size_t offset;
0093 
0094     /// The length of the message in the buffer, in bytes
0095     std::size_t bytes;
0096     
0097     template <class Archive>
0098     void serialize(Archive& ar, int)
0099     {
0100       ar & source & tag & offset & bytes;
0101     }
0102   };
0103 
0104   /**
0105    * Stores the outgoing messages for a particular processor.
0106    *
0107    * @todo Evaluate whether we should use a deque instance, which
0108    * would reduce could reduce the cost of "sending" messages but
0109    * increases the time spent in the synchronization step.
0110    */
0111   struct outgoing_messages {
0112         outgoing_messages() {}
0113         ~outgoing_messages() {}
0114 
0115     std::vector<message_header> headers;
0116     buffer_type                 buffer;
0117     
0118     template <class Archive>
0119     void serialize(Archive& ar, int)
0120     {
0121       ar & headers & buffer;
0122     }
0123     
0124     void swap(outgoing_messages& x) 
0125     {
0126       headers.swap(x.headers);
0127       buffer.swap(x.buffer);
0128     }
0129   };
0130 
0131 private:
0132   /**
0133    * Virtual base from which every trigger will be launched. See @c
0134    * trigger_launcher for more information.
0135    */
0136   class trigger_base : boost::noncopyable
0137   {
0138   public:
0139     explicit trigger_base(int tag) : tag_(tag) { }
0140 
0141     /// Retrieve the tag associated with this trigger  
0142     int tag() const { return tag_; }
0143 
0144     virtual ~trigger_base() { }
0145 
0146     /**
0147      * Invoked to receive a message that matches a particular trigger. 
0148      *
0149      * @param source      the source of the message
0150      * @param tag         the (local) tag of the message
0151      * @param context     the context under which the trigger is being
0152      *                    invoked
0153      */
0154     virtual void 
0155     receive(mpi_process_group const& pg, int source, int tag, 
0156             trigger_receive_context context, int block=-1) const = 0;
0157 
0158   protected:
0159     // The message tag associated with this trigger
0160     int tag_;
0161   };
0162 
0163   /**
0164    * Launches a specific handler in response to a trigger. This
0165    * function object wraps up the handler function object and a buffer
0166    * for incoming data. 
0167    */
0168   template<typename Type, typename Handler>
0169   class trigger_launcher : public trigger_base
0170   {
0171   public:
0172     explicit trigger_launcher(mpi_process_group& self, int tag, 
0173                               const Handler& handler) 
0174       : trigger_base(tag), self(self), handler(handler) 
0175       {}
0176 
0177     void 
0178     receive(mpi_process_group const& pg, int source, int tag,  
0179             trigger_receive_context context, int block=-1) const;
0180 
0181   private:
0182     mpi_process_group& self;
0183     mutable Handler handler;
0184   };
0185 
0186   /**
0187    * Launches a specific handler with a message reply in response to a
0188    * trigger. This function object wraps up the handler function
0189    * object and a buffer for incoming data.
0190    */
0191   template<typename Type, typename Handler>
0192   class reply_trigger_launcher : public trigger_base
0193   {
0194   public:
0195     explicit reply_trigger_launcher(mpi_process_group& self, int tag, 
0196                                     const Handler& handler) 
0197       : trigger_base(tag), self(self), handler(handler) 
0198       {}
0199 
0200     void 
0201     receive(mpi_process_group const& pg, int source, int tag, 
0202             trigger_receive_context context, int block=-1) const;
0203 
0204   private:
0205     mpi_process_group& self;
0206     mutable Handler handler;
0207   };
0208 
0209   template<typename Type, typename Handler>
0210   class global_trigger_launcher : public trigger_base
0211   {
0212   public:
0213     explicit global_trigger_launcher(mpi_process_group& self, int tag, 
0214                               const Handler& handler) 
0215       : trigger_base(tag), handler(handler) 
0216       { 
0217       }
0218 
0219     void 
0220     receive(mpi_process_group const& pg, int source, int tag, 
0221             trigger_receive_context context, int block=-1) const;
0222 
0223   private:
0224     mutable Handler handler;
0225     // TBD: do not forget to cancel any outstanding Irecv when deleted,
0226     // if we decide to use Irecv
0227   };
0228 
0229   template<typename Type, typename Handler>
0230   class global_irecv_trigger_launcher : public trigger_base
0231   {
0232   public:
0233     explicit global_irecv_trigger_launcher(mpi_process_group& self, int tag, 
0234                               const Handler& handler, int sz) 
0235       : trigger_base(tag), handler(handler), buffer_size(sz)
0236       { 
0237         prepare_receive(self,tag);
0238       }
0239 
0240     void 
0241     receive(mpi_process_group const& pg, int source, int tag, 
0242             trigger_receive_context context, int block=-1) const;
0243 
0244   private:
0245     void prepare_receive(mpi_process_group const& pg, int tag, bool force=false) const;
0246     Handler handler;
0247     int buffer_size;
0248     // TBD: do not forget to cancel any outstanding Irecv when deleted,
0249     // if we decide to use Irecv
0250   };
0251 
0252 public:
0253   /** 
0254    * Construct a new BSP process group from an MPI communicator. The
0255    * MPI communicator will be duplicated to create a new communicator
0256    * for this process group to use.
0257    */
0258   mpi_process_group(communicator_type parent_comm = communicator_type());
0259 
0260   /** 
0261    * Construct a new BSP process group from an MPI communicator. The
0262    * MPI communicator will be duplicated to create a new communicator
0263    * for this process group to use. This constructor allows to tune the
0264    * size of message batches.
0265    *    
0266    *   @param num_headers The maximum number of headers in a message batch
0267    *
0268    *   @param buffer_size The maximum size of the message buffer in a batch.
0269    *
0270    */
0271   mpi_process_group( std::size_t num_headers, std::size_t buffer_size, 
0272                      communicator_type parent_comm = communicator_type());
0273 
0274   /**
0275    * Construct a copy of the BSP process group for a new distributed
0276    * data structure. This data structure will synchronize with all
0277    * other members of the process group's equivalence class (including
0278    * @p other), but will have its own set of tags. 
0279    *
0280    *   @param other The process group that this new process group will
0281    *   be based on, using a different set of tags within the same
0282    *   communication and synchronization space.
0283    *
0284    *   @param handler A message handler that will be passed (source,
0285    *   tag) pairs for each message received by this data
0286    *   structure. The handler is expected to receive the messages
0287    *   immediately. The handler can be changed after-the-fact by
0288    *   calling @c replace_handler.
0289    *
0290    *   @param out_of_band_receive An anachronism. TODO: remove this.
0291    */
0292   mpi_process_group(const mpi_process_group& other,
0293                     const receiver_type& handler,
0294                     bool out_of_band_receive = false);
0295 
0296   /**
0297    * Construct a copy of the BSP process group for a new distributed
0298    * data structure. This data structure will synchronize with all
0299    * other members of the process group's equivalence class (including
0300    * @p other), but will have its own set of tags. 
0301    */
0302   mpi_process_group(const mpi_process_group& other, 
0303                     attach_distributed_object,
0304                     bool out_of_band_receive = false);
0305 
0306   /**
0307    * Create an "empty" process group, with no information. This is an
0308    * internal routine that users should never need.
0309    */
0310   explicit mpi_process_group(create_empty) {}
0311 
0312   /**
0313    * Destroys this copy of the process group.
0314    */
0315   ~mpi_process_group();
0316 
0317   /**
0318    * Replace the current message handler with a new message handler.
0319    *
0320    * @param handle The new message handler.
0321    * @param out_of_band_receive An anachronism: remove this
0322    */
0323   void replace_handler(const receiver_type& handler,
0324                        bool out_of_band_receive = false);
0325 
0326   /**
0327    * Turns this process group into the process group for a new
0328    * distributed data structure or object, allocating its own tag
0329    * block.
0330    */
0331   void make_distributed_object();
0332 
0333   /**
0334    * Replace the handler to be invoked at the beginning of synchronize.
0335    */
0336   void
0337   replace_on_synchronize_handler(const on_synchronize_event_type& handler = 0);
0338 
0339   /** 
0340    * Return the block number of the current data structure. A value of
0341    * 0 indicates that this particular instance of the process group is
0342    * not associated with any distributed data structure.
0343    */
0344   int my_block_number() const { return block_num? *block_num : 0; }
0345 
0346   /**
0347    * Encode a block number/tag pair into a single encoded tag for
0348    * transmission.
0349    */
0350   int encode_tag(int block_num, int tag) const
0351   { return block_num * max_tags + tag; }
0352 
0353   /**
0354    * Decode an encoded tag into a block number/tag pair. 
0355    */
0356   std::pair<int, int> decode_tag(int encoded_tag) const
0357   { return std::make_pair(encoded_tag / max_tags, encoded_tag % max_tags); }
0358 
0359   // @todo Actually write up the friend declarations so these could be
0360   // private.
0361 
0362   // private:
0363 
0364   /** Allocate a block of tags for this instance. The block should not
0365    * have been allocated already, e.g., my_block_number() ==
0366    * 0. Returns the newly-allocated block number.
0367    */
0368   int allocate_block(bool out_of_band_receive = false);
0369 
0370   /** Potentially emit a receive event out of band. Returns true if an event 
0371    *  was actually sent, false otherwise. 
0372    */
0373   bool maybe_emit_receive(int process, int encoded_tag) const;
0374 
0375   /** Emit a receive event. Returns true if an event was actually
0376    * sent, false otherwise. 
0377    */
0378   bool emit_receive(int process, int encoded_tag) const;
0379 
0380   /** Emit an on-synchronize event to all block handlers. */
0381   void emit_on_synchronize() const;
0382 
0383   /** Retrieve a reference to the stored receiver in this block.  */
0384   template<typename Receiver>
0385   Receiver* get_receiver();
0386 
0387   template<typename T>
0388   void
0389   send_impl(int dest, int tag, const T& value,
0390             mpl::true_ /*is_mpi_datatype*/) const;
0391 
0392   template<typename T>
0393   void
0394   send_impl(int dest, int tag, const T& value,
0395             mpl::false_ /*is_mpi_datatype*/) const;
0396 
0397   template<typename T>
0398   typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type
0399   array_send_impl(int dest, int tag, const T values[], std::size_t n) const;
0400 
0401   template<typename T>
0402   bool
0403   receive_impl(int source, int tag, T& value,
0404                mpl::true_ /*is_mpi_datatype*/) const;
0405 
0406   template<typename T>
0407   bool
0408   receive_impl(int source, int tag, T& value,
0409                mpl::false_ /*is_mpi_datatype*/) const;
0410 
0411   // Receive an array of values
0412   template<typename T>
0413   typename disable_if<boost::mpi::is_mpi_datatype<T>, bool>::type
0414   array_receive_impl(int source, int tag, T* values, std::size_t& n) const;
0415 
0416   optional<std::pair<mpi_process_group::process_id_type, int> > probe() const;
0417 
0418   void synchronize() const;
0419 
0420   operator bool() { return bool(impl_); }
0421 
0422   mpi_process_group base() const;
0423 
0424   /**
0425    * Create a new trigger for a specific message tag. Triggers handle
0426    * out-of-band messaging, and the handler itself will be called
0427    * whenever a message is available. The handler itself accepts four
0428    * arguments: the source of the message, the message tag (which will
0429    * be the same as @p tag), the message data (of type @c Type), and a
0430    * boolean flag that states whether the message was received
0431    * out-of-band. The last will be @c true for out-of-band receives,
0432    * or @c false for receives at the end of a synchronization step.
0433    */
0434   template<typename Type, typename Handler>
0435   void trigger(int tag, const Handler& handler);
0436 
0437   /**
0438    * Create a new trigger for a specific message tag, along with a way
0439    * to send a reply with data back to the sender. Triggers handle
0440    * out-of-band messaging, and the handler itself will be called
0441    * whenever a message is available. The handler itself accepts four
0442    * arguments: the source of the message, the message tag (which will
0443    * be the same as @p tag), the message data (of type @c Type), and a
0444    * boolean flag that states whether the message was received
0445    * out-of-band. The last will be @c true for out-of-band receives,
0446    * or @c false for receives at the end of a synchronization
0447    * step. The handler also returns a value, which will be routed back
0448    * to the sender.
0449    */
0450   template<typename Type, typename Handler>
0451   void trigger_with_reply(int tag, const Handler& handler);
0452 
0453   template<typename Type, typename Handler>
0454   void global_trigger(int tag, const Handler& handler, std::size_t buffer_size=0); 
0455 
0456 
0457 
0458   /**
0459    * Poll for any out-of-band messages. This routine will check if any
0460    * out-of-band messages are available. Those that are available will
0461    * be handled immediately, if possible.
0462    *
0463    * @returns if an out-of-band message has been received, but we are
0464    * unable to actually receive the message, a (source, tag) pair will
0465    * be returned. Otherwise, returns an empty optional.
0466    *
0467    * @param wait When true, we should block until a message comes in.
0468    *
0469    * @param synchronizing whether we are currently synchronizing the
0470    *                      process group
0471    */
0472   optional<std::pair<int, int> > 
0473   poll(bool wait = false, int block = -1, bool synchronizing = false) const;
0474 
0475   /**
0476    * Determines the context of the trigger currently executing. If
0477    * multiple triggers are executing (recursively), then the context
0478    * for the most deeply nested trigger will be returned. If no
0479    * triggers are executing, returns @c trc_none. This might be used,
0480    * for example, to determine whether a reply to a message should
0481    * itself be sent out-of-band or whether it can go via the normal,
0482    * slower communication route.
0483    */
0484   trigger_receive_context trigger_context() const;
0485 
0486   /// INTERNAL ONLY
0487   void receive_batch(process_id_type source, outgoing_messages& batch) const;
0488 
0489   /// INTERNAL ONLY
0490   ///
0491   /// Determine the actual communicator and tag will be used for a
0492   /// transmission with the given tag.
0493   std::pair<boost::mpi::communicator, int> 
0494   actual_communicator_and_tag(int tag, int block) const;
0495 
0496   /// set the size of the message buffer used for buffered oob sends
0497   
0498   static void set_message_buffer_size(std::size_t s);
0499 
0500   /// get the size of the message buffer used for buffered oob sends
0501 
0502   static std::size_t message_buffer_size();
0503   static int old_buffer_size;
0504   static void* old_buffer;
0505 private:
0506 
0507   void install_trigger(int tag, int block, 
0508       shared_ptr<trigger_base> const& launcher); 
0509 
0510   void poll_requests(int block=-1) const;
0511 
0512   
0513   // send a batch if the buffer is full now or would get full
0514   void maybe_send_batch(process_id_type dest) const;
0515 
0516   // actually send a batch
0517   void send_batch(process_id_type dest, outgoing_messages& batch) const;
0518   void send_batch(process_id_type dest) const;
0519 
0520   void pack_headers() const;
0521 
0522   /**
0523    * Process a batch of incoming messages immediately.
0524    *
0525    * @param source         the source of these messages
0526    */
0527   void process_batch(process_id_type source) const;
0528   void receive_batch(boost::mpi::status& status) const;
0529 
0530   //void free_finished_sends() const;
0531           
0532   /// Status messages used internally by the process group
0533   enum status_messages {
0534     /// the first of the reserved message tags
0535     msg_reserved_first = 126,
0536     /// Sent from a processor when sending batched messages
0537     msg_batch = 126,
0538     /// Sent from a processor when sending large batched messages, larger than
0539     /// the maximum buffer size for messages to be received by MPI_Irecv
0540     msg_large_batch = 127,
0541     /// Sent from a source processor to everyone else when that
0542     /// processor has entered the synchronize() function.
0543     msg_synchronizing = 128,
0544     /// the last of the reserved message tags
0545     msg_reserved_last = 128
0546   };
0547 
0548   /**
0549    * Description of a block of tags associated to a particular
0550    * distributed data structure. This structure will live as long as
0551    * the distributed data structure is around, and will be used to
0552    * help send messages to the data structure.
0553    */
0554   struct block_type
0555   {
0556     block_type() { }
0557 
0558     /// Handler for receive events
0559     receiver_type     on_receive;
0560 
0561     /// Handler executed at the start of  synchronization 
0562     on_synchronize_event_type  on_synchronize;
0563 
0564     /// Individual message triggers. Note: at present, this vector is
0565     /// indexed by the (local) tag of the trigger.  Any tags that
0566     /// don't have triggers will have NULL pointers in that spot.
0567     std::vector<shared_ptr<trigger_base> > triggers;
0568   };
0569 
0570   /**
0571    * Data structure containing all of the blocks for the distributed
0572    * data structures attached to a process group.
0573    */
0574   typedef std::vector<block_type*> blocks_type;
0575 
0576   /// Iterator into @c blocks_type.
0577   typedef blocks_type::iterator block_iterator;
0578 
0579   /**
0580    * Deleter used to deallocate a block when its distributed data
0581    * structure is destroyed. This type will be used as the deleter for
0582    * @c block_num.
0583    */
0584   struct deallocate_block;
0585   
0586   static std::vector<char> message_buffer;
0587 
0588 public:
0589   /**
0590    * Data associated with the process group and all of its attached
0591    * distributed data structures.
0592    */
0593   shared_ptr<impl> impl_;
0594 
0595   /**
0596    * When non-null, indicates that this copy of the process group is
0597    * associated with a particular distributed data structure. The
0598    * integer value contains the block number (a value > 0) associated
0599    * with that data structure. The deleter for this @c shared_ptr is a
0600    * @c deallocate_block object that will deallocate the associated
0601    * block in @c impl_->blocks.
0602    */
0603   shared_ptr<int>  block_num;
0604 
0605   /**
0606    * Rank of this process, to avoid having to call rank() repeatedly.
0607    */
0608   int rank;
0609 
0610   /**
0611    * Number of processes in this process group, to avoid having to
0612    * call communicator::size() repeatedly.
0613    */
0614   int size;
0615 };
0616 
0617 inline mpi_process_group::process_id_type 
0618 process_id(const mpi_process_group& pg)
0619 { return pg.rank; }
0620 
0621 inline mpi_process_group::process_size_type 
0622 num_processes(const mpi_process_group& pg)
0623 { return pg.size; }
0624 
0625 mpi_process_group::communicator_type communicator(const mpi_process_group& pg);
0626 
0627 template<typename T>
0628 void
0629 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
0630      int tag, const T& value);
0631 
0632 template<typename InputIterator>
0633 void
0634 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
0635      int tag, InputIterator first, InputIterator last);
0636 
0637 template<typename T>
0638 inline void
0639 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
0640      int tag, T* first, T* last)
0641 { send(pg, dest, tag, first, last - first); }
0642 
0643 template<typename T>
0644 inline void
0645 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
0646      int tag, const T* first, const T* last)
0647 { send(pg, dest, tag, first, last - first); }
0648 
0649 template<typename T>
0650 mpi_process_group::process_id_type
0651 receive(const mpi_process_group& pg, int tag, T& value);
0652 
0653 template<typename T>
0654 mpi_process_group::process_id_type
0655 receive(const mpi_process_group& pg,
0656         mpi_process_group::process_id_type source, int tag, T& value);
0657 
0658 optional<std::pair<mpi_process_group::process_id_type, int> >
0659 probe(const mpi_process_group& pg);
0660 
0661 void synchronize(const mpi_process_group& pg);
0662 
0663 template<typename T, typename BinaryOperation>
0664 T*
0665 all_reduce(const mpi_process_group& pg, T* first, T* last, T* out,
0666            BinaryOperation bin_op);
0667 
0668 template<typename T, typename BinaryOperation>
0669 T*
0670 scan(const mpi_process_group& pg, T* first, T* last, T* out,
0671            BinaryOperation bin_op);
0672 
0673 template<typename InputIterator, typename T>
0674 void
0675 all_gather(const mpi_process_group& pg,
0676            InputIterator first, InputIterator last, std::vector<T>& out);
0677 
0678 template<typename InputIterator>
0679 mpi_process_group
0680 process_subgroup(const mpi_process_group& pg,
0681                  InputIterator first, InputIterator last);
0682 
0683 template<typename T>
0684 void
0685 broadcast(const mpi_process_group& pg, T& val, 
0686           mpi_process_group::process_id_type root);
0687 
0688 
0689 /// optimized swap for outgoing messages
0690 inline void
0691 swap(mpi_process_group::outgoing_messages& x,
0692      mpi_process_group::outgoing_messages& y)
0693 {
0694   x.swap(y);
0695 }
0696 
0697 
0698 /*******************************************************************
0699  * Out-of-band communication                                       *
0700  *******************************************************************/
0701 
0702 template<typename T>
0703 typename enable_if<boost::mpi::is_mpi_datatype<T> >::type
0704 send_oob(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
0705          int tag, const T& value, int block=-1)
0706 {
0707   using boost::mpi::get_mpi_datatype;
0708 
0709   // Determine the actual message tag we will use for the send, and which
0710   // communicator we will use.
0711   std::pair<boost::mpi::communicator, int> actual
0712     = pg.actual_communicator_and_tag(tag, block);
0713 
0714 #ifdef SEND_OOB_BSEND
0715   if (mpi_process_group::message_buffer_size()) {
0716     MPI_Bsend(const_cast<T*>(&value), 1, get_mpi_datatype<T>(value), dest, 
0717               actual.second, actual.first);
0718     return;
0719   }
0720 #endif
0721   MPI_Request request;
0722   MPI_Isend(const_cast<T*>(&value), 1, get_mpi_datatype<T>(value), dest, 
0723             actual.second, actual.first, &request);
0724   
0725   int done=0;
0726   do {
0727     pg.poll();
0728     MPI_Test(&request,&done,MPI_STATUS_IGNORE);
0729   } while (!done);
0730 }
0731 
0732 template<typename T>
0733 typename disable_if<boost::mpi::is_mpi_datatype<T> >::type
0734 send_oob(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
0735          int tag, const T& value, int block=-1)
0736 {
0737   using boost::mpi::packed_oarchive;
0738 
0739   // Determine the actual message tag we will use for the send, and which
0740   // communicator we will use.
0741   std::pair<boost::mpi::communicator, int> actual
0742     = pg.actual_communicator_and_tag(tag, block);
0743 
0744   // Serialize the data into a buffer
0745   packed_oarchive out(actual.first);
0746   out << value;
0747   std::size_t size = out.size();
0748 
0749   // Send the actual message data
0750 #ifdef SEND_OOB_BSEND
0751   if (mpi_process_group::message_buffer_size()) {
0752     MPI_Bsend(const_cast<void*>(out.address()), size, MPI_PACKED,
0753             dest, actual.second, actual.first);
0754    return;
0755   }
0756 #endif
0757   MPI_Request request;
0758   MPI_Isend(const_cast<void*>(out.address()), size, MPI_PACKED,
0759             dest, actual.second, actual.first, &request);
0760 
0761   int done=0;
0762   do {
0763     pg.poll();
0764     MPI_Test(&request,&done,MPI_STATUS_IGNORE);
0765   } while (!done);
0766 }
0767 
0768 template<typename T>
0769 typename enable_if<boost::mpi::is_mpi_datatype<T> >::type
0770 receive_oob(const mpi_process_group& pg, 
0771             mpi_process_group::process_id_type source, int tag, T& value, int block=-1);
0772 
0773 template<typename T>
0774 typename disable_if<boost::mpi::is_mpi_datatype<T> >::type
0775 receive_oob(const mpi_process_group& pg, 
0776             mpi_process_group::process_id_type source, int tag, T& value, int block=-1);
0777 
0778 template<typename SendT, typename ReplyT>
0779 typename enable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
0780 send_oob_with_reply(const mpi_process_group& pg, 
0781                     mpi_process_group::process_id_type dest,
0782                     int tag, const SendT& send_value, ReplyT& reply_value,
0783                     int block = -1);
0784 
0785 template<typename SendT, typename ReplyT>
0786 typename disable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
0787 send_oob_with_reply(const mpi_process_group& pg, 
0788                     mpi_process_group::process_id_type dest,
0789                     int tag, const SendT& send_value, ReplyT& reply_value,
0790                     int block = -1);
0791 
0792 } } } // end namespace boost::graph::distributed
0793 
0794 BOOST_IS_BITWISE_SERIALIZABLE(boost::graph::distributed::mpi_process_group::message_header)
0795 namespace boost { namespace mpi {
0796     template<>
0797     struct is_mpi_datatype<boost::graph::distributed::mpi_process_group::message_header> : mpl::true_ { };
0798 } } // end namespace boost::mpi
0799 
0800 BOOST_CLASS_IMPLEMENTATION(boost::graph::distributed::mpi_process_group::outgoing_messages,object_serializable)
0801 BOOST_CLASS_TRACKING(boost::graph::distributed::mpi_process_group::outgoing_messages,track_never)
0802 
0803 #include <boost/graph/distributed/detail/mpi_process_group.ipp>
0804 
0805 #endif // BOOST_PARALLEL_MPI_MPI_PROCESS_GROUP_HPP