Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 09:37:13

0001 // Copyright (C) 2004-2006 The Trustees of Indiana University.
0002 
0003 // Use, modification and distribution is subject to the Boost Software
0004 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
0005 // http://www.boost.org/LICENSE_1_0.txt)
0006 
0007 //  Authors: Douglas Gregor
0008 //           Andrew Lumsdaine
0009 #include <boost/optional.hpp>
0010 #include <cassert>
0011 #include <boost/graph/parallel/algorithm.hpp>
0012 #include <boost/graph/parallel/process_group.hpp>
0013 #include <functional>
0014 #include <algorithm>
0015 #include <boost/graph/parallel/simple_trigger.hpp>
0016 
0017 #ifndef BOOST_GRAPH_USE_MPI
0018 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
0019 #endif
0020 
0021 namespace boost { namespace graph { namespace distributed {
0022 
0023 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
0024 BOOST_DISTRIBUTED_QUEUE_TYPE::
0025 distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
0026                   const Buffer& buffer, bool polling)
0027   : process_group(process_group, attach_distributed_object()),
0028     owner(owner),
0029     buffer(buffer),
0030     polling(polling)
0031 {
0032   if (!polling)
0033     outgoing_buffers.reset(
0034       new outgoing_buffers_t(num_processes(process_group)));
0035 
0036   setup_triggers();
0037 }
0038 
0039 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
0040 BOOST_DISTRIBUTED_QUEUE_TYPE::
0041 distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
0042                   const Buffer& buffer, const UnaryPredicate& pred,
0043                   bool polling)
0044   : process_group(process_group, attach_distributed_object()),
0045     owner(owner),
0046     buffer(buffer),
0047     pred(pred),
0048     polling(polling)
0049 {
0050   if (!polling)
0051     outgoing_buffers.reset(
0052       new outgoing_buffers_t(num_processes(process_group)));
0053 
0054   setup_triggers();
0055 }
0056 
0057 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
0058 BOOST_DISTRIBUTED_QUEUE_TYPE::
0059 distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
0060                   const UnaryPredicate& pred, bool polling)
0061   : process_group(process_group, attach_distributed_object()),
0062     owner(owner),
0063     pred(pred),
0064     polling(polling)
0065 {
0066   if (!polling)
0067     outgoing_buffers.reset(
0068       new outgoing_buffers_t(num_processes(process_group)));
0069 
0070   setup_triggers();
0071 }
0072 
0073 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
0074 void
0075 BOOST_DISTRIBUTED_QUEUE_TYPE::push(const value_type& x)
0076 {
0077   typename ProcessGroup::process_id_type dest = get(owner, x);
0078   if (outgoing_buffers)
0079     outgoing_buffers->at(dest).push_back(x);
0080   else if (dest == process_id(process_group))
0081     buffer.push(x);
0082   else
0083     send(process_group, get(owner, x), msg_push, x);
0084 }
0085 
0086 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
0087 bool
0088 BOOST_DISTRIBUTED_QUEUE_TYPE::empty() const
0089 {
0090   /* Processes will stay here until the buffer is nonempty or
0091      synchronization with the other processes indicates that all local
0092      buffers are empty (and no messages are in transit).
0093    */
0094   while (buffer.empty() && !do_synchronize()) ;
0095 
0096   return buffer.empty();
0097 }
0098 
0099 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
0100 typename BOOST_DISTRIBUTED_QUEUE_TYPE::size_type
0101 BOOST_DISTRIBUTED_QUEUE_TYPE::size() const
0102 {
0103   empty();
0104   return buffer.size();
0105 }
0106 
0107 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
0108 void BOOST_DISTRIBUTED_QUEUE_TYPE::setup_triggers()
0109 {
0110   using boost::graph::parallel::simple_trigger;
0111 
0112   simple_trigger(process_group, msg_push, this, 
0113                  &distributed_queue::handle_push);
0114   simple_trigger(process_group, msg_multipush, this, 
0115                  &distributed_queue::handle_multipush);
0116 }
0117 
0118 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
0119 void 
0120 BOOST_DISTRIBUTED_QUEUE_TYPE::
0121 handle_push(int /*source*/, int /*tag*/, const value_type& value, 
0122             trigger_receive_context)
0123 {
0124   if (pred(value)) buffer.push(value);
0125 }
0126 
0127 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
0128 void 
0129 BOOST_DISTRIBUTED_QUEUE_TYPE::
0130 handle_multipush(int /*source*/, int /*tag*/, 
0131                  const std::vector<value_type>& values, 
0132                  trigger_receive_context)
0133 {
0134   for (std::size_t i = 0; i < values.size(); ++i)
0135     if (pred(values[i])) buffer.push(values[i]);
0136 }
0137 
0138 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
0139 bool
0140 BOOST_DISTRIBUTED_QUEUE_TYPE::do_synchronize() const
0141 {
0142 #ifdef PBGL_ACCOUNTING
0143   ++num_synchronizations;
0144 #endif
0145 
0146   using boost::parallel::all_reduce;
0147   using std::swap;
0148 
0149   typedef typename ProcessGroup::process_id_type process_id_type;
0150 
0151   if (outgoing_buffers) {
0152     // Transfer all of the push requests
0153     process_id_type id = process_id(process_group);
0154     process_id_type np = num_processes(process_group);
0155     for (process_id_type dest = 0; dest < np; ++dest) {
0156       outgoing_buffer_t& outgoing = outgoing_buffers->at(dest);
0157       std::size_t size = outgoing.size();
0158       if (size != 0) {
0159         if (dest != id) {
0160           send(process_group, dest, msg_multipush, outgoing);
0161         } else {
0162           for (std::size_t i = 0; i < size; ++i)
0163             buffer.push(outgoing[i]);
0164         }
0165         outgoing.clear();
0166       }
0167     }
0168   }
0169   synchronize(process_group);
0170 
0171   unsigned local_size = buffer.size();
0172   unsigned global_size =
0173     all_reduce(process_group, local_size, std::plus<unsigned>());
0174   return global_size == 0;
0175 }
0176 
0177 } } } // end namespace boost::graph::distributed