File indexing completed on 2025-01-18 09:37:13
0001
0002
0003
0004
0005
0006
0007
0008
0009 #include <boost/optional.hpp>
0010 #include <cassert>
0011 #include <boost/graph/parallel/algorithm.hpp>
0012 #include <boost/graph/parallel/process_group.hpp>
0013 #include <functional>
0014 #include <algorithm>
0015 #include <boost/graph/parallel/simple_trigger.hpp>
0016
0017 #ifndef BOOST_GRAPH_USE_MPI
0018 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
0019 #endif
0020
0021 namespace boost { namespace graph { namespace distributed {
0022
0023 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
0024 BOOST_DISTRIBUTED_QUEUE_TYPE::
0025 distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
0026 const Buffer& buffer, bool polling)
0027 : process_group(process_group, attach_distributed_object()),
0028 owner(owner),
0029 buffer(buffer),
0030 polling(polling)
0031 {
0032 if (!polling)
0033 outgoing_buffers.reset(
0034 new outgoing_buffers_t(num_processes(process_group)));
0035
0036 setup_triggers();
0037 }
0038
0039 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
0040 BOOST_DISTRIBUTED_QUEUE_TYPE::
0041 distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
0042 const Buffer& buffer, const UnaryPredicate& pred,
0043 bool polling)
0044 : process_group(process_group, attach_distributed_object()),
0045 owner(owner),
0046 buffer(buffer),
0047 pred(pred),
0048 polling(polling)
0049 {
0050 if (!polling)
0051 outgoing_buffers.reset(
0052 new outgoing_buffers_t(num_processes(process_group)));
0053
0054 setup_triggers();
0055 }
0056
0057 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
0058 BOOST_DISTRIBUTED_QUEUE_TYPE::
0059 distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
0060 const UnaryPredicate& pred, bool polling)
0061 : process_group(process_group, attach_distributed_object()),
0062 owner(owner),
0063 pred(pred),
0064 polling(polling)
0065 {
0066 if (!polling)
0067 outgoing_buffers.reset(
0068 new outgoing_buffers_t(num_processes(process_group)));
0069
0070 setup_triggers();
0071 }
0072
0073 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
0074 void
0075 BOOST_DISTRIBUTED_QUEUE_TYPE::push(const value_type& x)
0076 {
0077 typename ProcessGroup::process_id_type dest = get(owner, x);
0078 if (outgoing_buffers)
0079 outgoing_buffers->at(dest).push_back(x);
0080 else if (dest == process_id(process_group))
0081 buffer.push(x);
0082 else
0083 send(process_group, get(owner, x), msg_push, x);
0084 }
0085
0086 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
0087 bool
0088 BOOST_DISTRIBUTED_QUEUE_TYPE::empty() const
0089 {
0090
0091
0092
0093
0094 while (buffer.empty() && !do_synchronize()) ;
0095
0096 return buffer.empty();
0097 }
0098
0099 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
0100 typename BOOST_DISTRIBUTED_QUEUE_TYPE::size_type
0101 BOOST_DISTRIBUTED_QUEUE_TYPE::size() const
0102 {
0103 empty();
0104 return buffer.size();
0105 }
0106
0107 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
0108 void BOOST_DISTRIBUTED_QUEUE_TYPE::setup_triggers()
0109 {
0110 using boost::graph::parallel::simple_trigger;
0111
0112 simple_trigger(process_group, msg_push, this,
0113 &distributed_queue::handle_push);
0114 simple_trigger(process_group, msg_multipush, this,
0115 &distributed_queue::handle_multipush);
0116 }
0117
0118 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
0119 void
0120 BOOST_DISTRIBUTED_QUEUE_TYPE::
0121 handle_push(int , int , const value_type& value,
0122 trigger_receive_context)
0123 {
0124 if (pred(value)) buffer.push(value);
0125 }
0126
0127 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
0128 void
0129 BOOST_DISTRIBUTED_QUEUE_TYPE::
0130 handle_multipush(int , int ,
0131 const std::vector<value_type>& values,
0132 trigger_receive_context)
0133 {
0134 for (std::size_t i = 0; i < values.size(); ++i)
0135 if (pred(values[i])) buffer.push(values[i]);
0136 }
0137
0138 template<BOOST_DISTRIBUTED_QUEUE_PARMS>
0139 bool
0140 BOOST_DISTRIBUTED_QUEUE_TYPE::do_synchronize() const
0141 {
0142 #ifdef PBGL_ACCOUNTING
0143 ++num_synchronizations;
0144 #endif
0145
0146 using boost::parallel::all_reduce;
0147 using std::swap;
0148
0149 typedef typename ProcessGroup::process_id_type process_id_type;
0150
0151 if (outgoing_buffers) {
0152
0153 process_id_type id = process_id(process_group);
0154 process_id_type np = num_processes(process_group);
0155 for (process_id_type dest = 0; dest < np; ++dest) {
0156 outgoing_buffer_t& outgoing = outgoing_buffers->at(dest);
0157 std::size_t size = outgoing.size();
0158 if (size != 0) {
0159 if (dest != id) {
0160 send(process_group, dest, msg_multipush, outgoing);
0161 } else {
0162 for (std::size_t i = 0; i < size; ++i)
0163 buffer.push(outgoing[i]);
0164 }
0165 outgoing.clear();
0166 }
0167 }
0168 }
0169 synchronize(process_group);
0170
0171 unsigned local_size = buffer.size();
0172 unsigned global_size =
0173 all_reduce(process_group, local_size, std::plus<unsigned>());
0174 return global_size == 0;
0175 }
0176
0177 } } }