|
||||
File indexing completed on 2025-01-18 09:37:19
0001 // Copyright (C) 2004-2006 The Trustees of Indiana University. 0002 0003 // Use, modification and distribution is subject to the Boost Software 0004 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at 0005 // http://www.boost.org/LICENSE_1_0.txt) 0006 0007 // Authors: Douglas Gregor 0008 // Andrew Lumsdaine 0009 #ifndef BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP 0010 #define BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP 0011 0012 #ifndef BOOST_GRAPH_USE_MPI 0013 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included" 0014 #endif 0015 0016 #include <boost/graph/parallel/process_group.hpp> 0017 #include <boost/optional.hpp> 0018 #include <boost/shared_ptr.hpp> 0019 #include <vector> 0020 0021 namespace boost { namespace graph { namespace distributed { 0022 0023 /// A unary predicate that always returns "true". 0024 struct always_push 0025 { 0026 template<typename T> bool operator()(const T&) const { return true; } 0027 }; 0028 0029 0030 0031 /** A distributed queue adaptor. 0032 * 0033 * Class template @c distributed_queue implements a distributed queue 0034 * across a process group. The distributed queue is an adaptor over an 0035 * existing (local) queue, which must model the @ref Buffer 0036 * concept. Each process stores a distinct copy of the local queue, 0037 * from which it draws or removes elements via the @ref pop and @ref 0038 * top members. 0039 * 0040 * The value type of the local queue must be a model of the @ref 0041 * GlobalDescriptor concept. The @ref push operation of the 0042 * distributed queue passes (via a message) the value to its owning 0043 * processor. Thus, the elements within a particular local queue are 0044 * guaranteed to have the process owning that local queue as an owner. 0045 * 0046 * Synchronization of distributed queues occurs in the @ref empty and 0047 * @ref size functions, which will only return "empty" values (true or 0048 * 0, respectively) when the entire distributed queue is empty. If the 0049 * local queue is empty but the distributed queue is not, the 0050 * operation will block until either condition changes. When the @ref 0051 * size function of a nonempty queue returns, it returns the size of 0052 * the local queue. These semantics were selected so that sequential 0053 * code that processes elements in the queue via the following idiom 0054 * can be parallelized via introduction of a distributed queue: 0055 * 0056 * distributed_queue<...> Q; 0057 * Q.push(x); 0058 * while (!Q.empty()) { 0059 * // do something, that may push a value onto Q 0060 * } 0061 * 0062 * In the parallel version, the initial @ref push operation will place 0063 * the value @c x onto its owner's queue. All processes will 0064 * synchronize at the call to empty, and only the process owning @c x 0065 * will be allowed to execute the loop (@ref Q.empty() returns 0066 * false). This iteration may in turn push values onto other remote 0067 * queues, so when that process finishes execution of the loop body 0068 * and all processes synchronize again in @ref empty, more processes 0069 * may have nonempty local queues to execute. Once all local queues 0070 * are empty, @ref Q.empty() returns @c false for all processes. 0071 * 0072 * The distributed queue can receive messages at two different times: 0073 * during synchronization and when polling @ref empty. Messages are 0074 * always received during synchronization, to ensure that accurate 0075 * local queue sizes can be determines. However, whether @ref empty 0076 * should poll for messages is specified as an option to the 0077 * constructor. Polling may be desired when the order in which 0078 * elements in the queue are processed is not important, because it 0079 * permits fewer synchronization steps and less communication 0080 * overhead. However, when more strict ordering guarantees are 0081 * required, polling may be semantically incorrect. By disabling 0082 * polling, one ensures that parallel execution using the idiom above 0083 * will not process an element at a later "level" before an earlier 0084 * "level". 0085 * 0086 * The distributed queue nearly models the @ref Buffer 0087 * concept. However, the @ref push routine does not necessarily 0088 * increase the result of @c size() by one (although the size of the 0089 * global queue does increase by one). 0090 */ 0091 template<typename ProcessGroup, typename OwnerMap, typename Buffer, 0092 typename UnaryPredicate = always_push> 0093 class distributed_queue 0094 { 0095 typedef distributed_queue self_type; 0096 0097 enum { 0098 /** Message indicating a remote push. The message contains a 0099 * single value x of type value_type that is to be pushed on the 0100 * receiver's queue. 0101 */ 0102 msg_push, 0103 /** Push many elements at once. */ 0104 msg_multipush 0105 }; 0106 0107 public: 0108 typedef ProcessGroup process_group_type; 0109 typedef Buffer buffer_type; 0110 typedef typename buffer_type::value_type value_type; 0111 typedef typename buffer_type::size_type size_type; 0112 0113 /** Construct a new distributed queue. 0114 * 0115 * Build a new distributed queue that communicates over the given @p 0116 * process_group, whose local queue is initialized via @p buffer and 0117 * which may or may not poll for messages. 0118 */ 0119 explicit 0120 distributed_queue(const ProcessGroup& process_group, 0121 const OwnerMap& owner, 0122 const Buffer& buffer, 0123 bool polling = false); 0124 0125 /** Construct a new distributed queue. 0126 * 0127 * Build a new distributed queue that communicates over the given @p 0128 * process_group, whose local queue is initialized via @p buffer and 0129 * which may or may not poll for messages. 0130 */ 0131 explicit 0132 distributed_queue(const ProcessGroup& process_group = ProcessGroup(), 0133 const OwnerMap& owner = OwnerMap(), 0134 const Buffer& buffer = Buffer(), 0135 const UnaryPredicate& pred = UnaryPredicate(), 0136 bool polling = false); 0137 0138 /** Construct a new distributed queue. 0139 * 0140 * Build a new distributed queue that communicates over the given @p 0141 * process_group, whose local queue is default-initalized and which 0142 * may or may not poll for messages. 0143 */ 0144 distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner, 0145 const UnaryPredicate& pred, bool polling = false); 0146 0147 /** Virtual destructor required with virtual functions. 0148 * 0149 */ 0150 virtual ~distributed_queue() {} 0151 0152 /** Push an element onto the distributed queue. 0153 * 0154 * The element will be sent to its owner process to be added to that 0155 * process's local queue. If polling is enabled for this queue and 0156 * the owner process is the current process, the value will be 0157 * immediately pushed onto the local queue. 0158 * 0159 * Complexity: O(1) messages of size O(sizeof(value_type)) will be 0160 * transmitted. 0161 */ 0162 void push(const value_type& x); 0163 0164 /** Pop an element off the local queue. 0165 * 0166 * @p @c !empty() 0167 */ 0168 void pop() { buffer.pop(); } 0169 0170 /** 0171 * Return the element at the top of the local queue. 0172 * 0173 * @p @c !empty() 0174 */ 0175 value_type& top() { return buffer.top(); } 0176 0177 /** 0178 * \overload 0179 */ 0180 const value_type& top() const { return buffer.top(); } 0181 0182 /** Determine if the queue is empty. 0183 * 0184 * When the local queue is nonempty, returns @c true. If the local 0185 * queue is empty, synchronizes with all other processes in the 0186 * process group until either (1) the local queue is nonempty 0187 * (returns @c true) (2) the entire distributed queue is empty 0188 * (returns @c false). 0189 */ 0190 bool empty() const; 0191 0192 /** Determine the size of the local queue. 0193 * 0194 * The behavior of this routine is equivalent to the behavior of 0195 * @ref empty, except that when @ref empty returns true this 0196 * function returns the size of the local queue and when @ref empty 0197 * returns false this function returns zero. 0198 */ 0199 size_type size() const; 0200 0201 // private: 0202 /** Synchronize the distributed queue and determine if all queues 0203 * are empty. 0204 * 0205 * \returns \c true when all local queues are empty, or false if at least 0206 * one of the local queues is nonempty. 0207 * Defined as virtual for derived classes like depth_limited_distributed_queue. 0208 */ 0209 virtual bool do_synchronize() const; 0210 0211 private: 0212 // Setup triggers 0213 void setup_triggers(); 0214 0215 // Message handlers 0216 void 0217 handle_push(int source, int tag, const value_type& value, 0218 trigger_receive_context); 0219 0220 void 0221 handle_multipush(int source, int tag, const std::vector<value_type>& values, 0222 trigger_receive_context); 0223 0224 mutable ProcessGroup process_group; 0225 OwnerMap owner; 0226 mutable Buffer buffer; 0227 UnaryPredicate pred; 0228 bool polling; 0229 0230 typedef std::vector<value_type> outgoing_buffer_t; 0231 typedef std::vector<outgoing_buffer_t> outgoing_buffers_t; 0232 shared_ptr<outgoing_buffers_t> outgoing_buffers; 0233 }; 0234 0235 /// Helper macro containing the normal names for the template 0236 /// parameters to distributed_queue. 0237 #define BOOST_DISTRIBUTED_QUEUE_PARMS \ 0238 typename ProcessGroup, typename OwnerMap, typename Buffer, \ 0239 typename UnaryPredicate 0240 0241 /// Helper macro containing the normal template-id for 0242 /// distributed_queue. 0243 #define BOOST_DISTRIBUTED_QUEUE_TYPE \ 0244 distributed_queue<ProcessGroup, OwnerMap, Buffer, UnaryPredicate> 0245 0246 /** Synchronize all processes involved with the given distributed queue. 0247 * 0248 * This function will synchronize all of the local queues for a given 0249 * distributed queue, by ensuring that no additional messages are in 0250 * transit. It is rarely required by the user, because most 0251 * synchronization of distributed queues occurs via the @c empty or @c 0252 * size methods. 0253 */ 0254 template<BOOST_DISTRIBUTED_QUEUE_PARMS> 0255 inline void 0256 synchronize(const BOOST_DISTRIBUTED_QUEUE_TYPE& Q) 0257 { Q.do_synchronize(); } 0258 0259 /// Construct a new distributed queue. 0260 template<typename ProcessGroup, typename OwnerMap, typename Buffer> 0261 inline distributed_queue<ProcessGroup, OwnerMap, Buffer> 0262 make_distributed_queue(const ProcessGroup& process_group, 0263 const OwnerMap& owner, 0264 const Buffer& buffer, 0265 bool polling = false) 0266 { 0267 typedef distributed_queue<ProcessGroup, OwnerMap, Buffer> result_type; 0268 return result_type(process_group, owner, buffer, polling); 0269 } 0270 0271 } } } // end namespace boost::graph::distributed 0272 0273 #include <boost/graph/distributed/detail/queue.ipp> 0274 0275 #undef BOOST_DISTRIBUTED_QUEUE_TYPE 0276 #undef BOOST_DISTRIBUTED_QUEUE_PARMS 0277 0278 #endif // BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.3.7 LXR engine. The LXR team |