Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 09:37:19

0001 // Copyright (C) 2004-2006 The Trustees of Indiana University.
0002 
0003 // Use, modification and distribution is subject to the Boost Software
0004 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
0005 // http://www.boost.org/LICENSE_1_0.txt)
0006 
0007 //  Authors: Douglas Gregor
0008 //           Andrew Lumsdaine
0009 #ifndef BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP
0010 #define BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP
0011 
0012 #ifndef BOOST_GRAPH_USE_MPI
0013 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
0014 #endif
0015 
0016 #include <boost/graph/parallel/process_group.hpp>
0017 #include <boost/optional.hpp>
0018 #include <boost/shared_ptr.hpp>
0019 #include <vector>
0020 
0021 namespace boost { namespace graph { namespace distributed {
0022 
0023 /// A unary predicate that always returns "true".
0024 struct always_push
0025 {
0026   template<typename T> bool operator()(const T&) const { return true; }
0027 };
0028 
0029 
0030 
0031 /** A distributed queue adaptor.
0032  *
0033  * Class template @c distributed_queue implements a distributed queue
0034  * across a process group. The distributed queue is an adaptor over an
0035  * existing (local) queue, which must model the @ref Buffer
0036  * concept. Each process stores a distinct copy of the local queue,
0037  * from which it draws or removes elements via the @ref pop and @ref
0038  * top members.
0039  *
0040  * The value type of the local queue must be a model of the @ref
0041  * GlobalDescriptor concept. The @ref push operation of the
0042  * distributed queue passes (via a message) the value to its owning
0043  * processor. Thus, the elements within a particular local queue are
0044  * guaranteed to have the process owning that local queue as an owner.
0045  *
0046  * Synchronization of distributed queues occurs in the @ref empty and
0047  * @ref size functions, which will only return "empty" values (true or
0048  * 0, respectively) when the entire distributed queue is empty. If the
0049  * local queue is empty but the distributed queue is not, the
0050  * operation will block until either condition changes. When the @ref
0051  * size function of a nonempty queue returns, it returns the size of
0052  * the local queue. These semantics were selected so that sequential
0053  * code that processes elements in the queue via the following idiom
0054  * can be parallelized via introduction of a distributed queue:
0055  *
0056  *   distributed_queue<...> Q;
0057  *   Q.push(x);
0058  *   while (!Q.empty()) {
0059  *     // do something, that may push a value onto Q
0060  *   }
0061  *
0062  * In the parallel version, the initial @ref push operation will place
0063  * the value @c x onto its owner's queue. All processes will
0064  * synchronize at the call to empty, and only the process owning @c x
0065  * will be allowed to execute the loop (@ref Q.empty() returns
0066  * false). This iteration may in turn push values onto other remote
0067  * queues, so when that process finishes execution of the loop body
0068  * and all processes synchronize again in @ref empty, more processes
0069  * may have nonempty local queues to execute. Once all local queues
0070  * are empty, @ref Q.empty() returns @c false for all processes.
0071  *
0072  * The distributed queue can receive messages at two different times:
0073  * during synchronization and when polling @ref empty. Messages are
0074  * always received during synchronization, to ensure that accurate
0075  * local queue sizes can be determines. However, whether @ref empty
0076  * should poll for messages is specified as an option to the
0077  * constructor. Polling may be desired when the order in which
0078  * elements in the queue are processed is not important, because it
0079  * permits fewer synchronization steps and less communication
0080  * overhead. However, when more strict ordering guarantees are
0081  * required, polling may be semantically incorrect. By disabling
0082  * polling, one ensures that parallel execution using the idiom above
0083  * will not process an element at a later "level" before an earlier
0084  * "level".
0085  *
0086  * The distributed queue nearly models the @ref Buffer
0087  * concept. However, the @ref push routine does not necessarily
0088  * increase the result of @c size() by one (although the size of the
0089  * global queue does increase by one).
0090  */
0091 template<typename ProcessGroup, typename OwnerMap, typename Buffer,
0092          typename UnaryPredicate = always_push>
0093 class distributed_queue
0094 {
0095   typedef distributed_queue self_type;
0096 
0097   enum {
0098     /** Message indicating a remote push. The message contains a
0099      * single value x of type value_type that is to be pushed on the
0100      * receiver's queue.
0101      */
0102     msg_push,
0103     /** Push many elements at once. */
0104     msg_multipush
0105   };
0106 
0107  public:
0108   typedef ProcessGroup                     process_group_type;
0109   typedef Buffer                           buffer_type;
0110   typedef typename buffer_type::value_type value_type;
0111   typedef typename buffer_type::size_type  size_type;
0112 
0113   /** Construct a new distributed queue.
0114    *
0115    * Build a new distributed queue that communicates over the given @p
0116    * process_group, whose local queue is initialized via @p buffer and
0117    * which may or may not poll for messages.
0118    */
0119   explicit
0120   distributed_queue(const ProcessGroup& process_group,
0121                     const OwnerMap& owner,
0122                     const Buffer& buffer,
0123                     bool polling = false);
0124 
0125   /** Construct a new distributed queue.
0126    *
0127    * Build a new distributed queue that communicates over the given @p
0128    * process_group, whose local queue is initialized via @p buffer and
0129    * which may or may not poll for messages.
0130    */
0131   explicit
0132   distributed_queue(const ProcessGroup& process_group = ProcessGroup(),
0133                     const OwnerMap& owner = OwnerMap(),
0134                     const Buffer& buffer = Buffer(),
0135                     const UnaryPredicate& pred = UnaryPredicate(),
0136                     bool polling = false);
0137 
0138   /** Construct a new distributed queue.
0139    *
0140    * Build a new distributed queue that communicates over the given @p
0141    * process_group, whose local queue is default-initalized and which
0142    * may or may not poll for messages.
0143    */
0144   distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
0145                     const UnaryPredicate& pred, bool polling = false);
0146 
0147   /** Virtual destructor required with virtual functions.
0148    *
0149    */
0150   virtual ~distributed_queue() {}
0151 
0152   /** Push an element onto the distributed queue.
0153    *
0154    * The element will be sent to its owner process to be added to that
0155    * process's local queue. If polling is enabled for this queue and
0156    * the owner process is the current process, the value will be
0157    * immediately pushed onto the local queue.
0158    *
0159    * Complexity: O(1) messages of size O(sizeof(value_type)) will be
0160    * transmitted.
0161    */
0162   void push(const value_type& x);
0163 
0164   /** Pop an element off the local queue.
0165    *
0166    * @p @c !empty()
0167    */
0168   void pop() { buffer.pop(); }
0169 
0170   /**
0171    * Return the element at the top of the local queue.
0172    *
0173    * @p @c !empty()
0174    */
0175   value_type& top() { return buffer.top(); }
0176 
0177   /**
0178    * \overload
0179    */
0180   const value_type& top() const { return buffer.top(); }
0181 
0182   /** Determine if the queue is empty.
0183    *
0184    * When the local queue is nonempty, returns @c true. If the local
0185    * queue is empty, synchronizes with all other processes in the
0186    * process group until either (1) the local queue is nonempty
0187    * (returns @c true) (2) the entire distributed queue is empty
0188    * (returns @c false).
0189    */
0190   bool empty() const;
0191 
0192   /** Determine the size of the local queue.
0193    *
0194    * The behavior of this routine is equivalent to the behavior of
0195    * @ref empty, except that when @ref empty returns true this
0196    * function returns the size of the local queue and when @ref empty
0197    * returns false this function returns zero.
0198    */
0199   size_type size() const;
0200 
0201   // private:
0202   /** Synchronize the distributed queue and determine if all queues
0203    * are empty.
0204    *
0205    * \returns \c true when all local queues are empty, or false if at least
0206    * one of the local queues is nonempty.
0207    * Defined as virtual for derived classes like depth_limited_distributed_queue.
0208    */
0209   virtual bool do_synchronize() const;
0210 
0211  private:
0212   // Setup triggers
0213   void setup_triggers();
0214 
0215   // Message handlers
0216   void 
0217   handle_push(int source, int tag, const value_type& value, 
0218               trigger_receive_context);
0219 
0220   void 
0221   handle_multipush(int source, int tag, const std::vector<value_type>& values, 
0222                    trigger_receive_context);
0223 
0224   mutable ProcessGroup process_group;
0225   OwnerMap owner;
0226   mutable Buffer buffer;
0227   UnaryPredicate pred;
0228   bool polling;
0229 
0230   typedef std::vector<value_type> outgoing_buffer_t;
0231   typedef std::vector<outgoing_buffer_t> outgoing_buffers_t;
0232   shared_ptr<outgoing_buffers_t> outgoing_buffers;
0233 };
0234 
0235 /// Helper macro containing the normal names for the template
0236 /// parameters to distributed_queue.
0237 #define BOOST_DISTRIBUTED_QUEUE_PARMS                           \
0238   typename ProcessGroup, typename OwnerMap, typename Buffer,    \
0239   typename UnaryPredicate
0240 
0241 /// Helper macro containing the normal template-id for
0242 /// distributed_queue.
0243 #define BOOST_DISTRIBUTED_QUEUE_TYPE                                    \
0244   distributed_queue<ProcessGroup, OwnerMap, Buffer, UnaryPredicate>
0245 
0246 /** Synchronize all processes involved with the given distributed queue.
0247  *
0248  * This function will synchronize all of the local queues for a given
0249  * distributed queue, by ensuring that no additional messages are in
0250  * transit. It is rarely required by the user, because most
0251  * synchronization of distributed queues occurs via the @c empty or @c
0252  * size methods.
0253  */
0254 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
0255 inline void
0256 synchronize(const BOOST_DISTRIBUTED_QUEUE_TYPE& Q)
0257 { Q.do_synchronize(); }
0258 
0259 /// Construct a new distributed queue.
0260 template<typename ProcessGroup, typename OwnerMap, typename Buffer>
0261 inline distributed_queue<ProcessGroup, OwnerMap, Buffer>
0262 make_distributed_queue(const ProcessGroup& process_group,
0263                        const OwnerMap& owner,
0264                        const Buffer& buffer,
0265                        bool polling = false)
0266 {
0267   typedef distributed_queue<ProcessGroup, OwnerMap, Buffer> result_type;
0268   return result_type(process_group, owner, buffer, polling);
0269 }
0270 
0271 } } } // end namespace boost::graph::distributed
0272 
0273 #include <boost/graph/distributed/detail/queue.ipp>
0274 
0275 #undef BOOST_DISTRIBUTED_QUEUE_TYPE
0276 #undef BOOST_DISTRIBUTED_QUEUE_PARMS
0277 
0278 #endif // BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP