File indexing completed on 2025-01-18 09:37:18
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 #ifndef BOOST_GRAPH_DISTRIBUTED_MPI_PROCESS_GROUP
0013 #define BOOST_GRAPH_DISTRIBUTED_MPI_PROCESS_GROUP
0014
0015 #ifndef BOOST_GRAPH_USE_MPI
0016 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
0017 #endif
0018
0019
0020 #define SEND_OOB_BSEND
0021
0022 #include <boost/optional.hpp>
0023 #include <boost/shared_ptr.hpp>
0024 #include <boost/weak_ptr.hpp>
0025 #include <utility>
0026 #include <memory>
0027 #include <boost/function/function1.hpp>
0028 #include <boost/function/function2.hpp>
0029 #include <boost/function/function0.hpp>
0030 #include <boost/mpi.hpp>
0031 #include <boost/property_map/parallel/process_group.hpp>
0032 #include <boost/serialization/vector.hpp>
0033 #include <boost/utility/enable_if.hpp>
0034
0035 namespace boost { namespace graph { namespace distributed {
0036
0037
0038 struct mpi_process_group_tag : virtual boost::parallel::linear_process_group_tag { };
0039
0040 class mpi_process_group
0041 {
0042 struct impl;
0043
0044 public:
0045
0046 static const int max_tags = 256;
0047
0048
0049
0050
0051
0052
0053
0054 typedef function<void(int source, int tag)> receiver_type;
0055
0056
0057
0058
0059
0060 typedef function0<void> on_synchronize_event_type;
0061
0062
0063 struct create_empty {};
0064
0065
0066 typedef boost::mpi::packed_oprimitive::buffer_type buffer_type;
0067
0068
0069 typedef int process_id_type;
0070
0071
0072 typedef int process_size_type;
0073
0074
0075 typedef boost::mpi::communicator communicator_type;
0076
0077
0078 struct communication_category
0079 : virtual boost::parallel::bsp_process_group_tag,
0080 virtual mpi_process_group_tag { };
0081
0082
0083
0084 struct message_header {
0085
0086 process_id_type source;
0087
0088
0089 int tag;
0090
0091
0092 std::size_t offset;
0093
0094
0095 std::size_t bytes;
0096
0097 template <class Archive>
0098 void serialize(Archive& ar, int)
0099 {
0100 ar & source & tag & offset & bytes;
0101 }
0102 };
0103
0104
0105
0106
0107
0108
0109
0110
0111 struct outgoing_messages {
0112 outgoing_messages() {}
0113 ~outgoing_messages() {}
0114
0115 std::vector<message_header> headers;
0116 buffer_type buffer;
0117
0118 template <class Archive>
0119 void serialize(Archive& ar, int)
0120 {
0121 ar & headers & buffer;
0122 }
0123
0124 void swap(outgoing_messages& x)
0125 {
0126 headers.swap(x.headers);
0127 buffer.swap(x.buffer);
0128 }
0129 };
0130
0131 private:
0132
0133
0134
0135
0136 class trigger_base : boost::noncopyable
0137 {
0138 public:
0139 explicit trigger_base(int tag) : tag_(tag) { }
0140
0141
0142 int tag() const { return tag_; }
0143
0144 virtual ~trigger_base() { }
0145
0146
0147
0148
0149
0150
0151
0152
0153
0154 virtual void
0155 receive(mpi_process_group const& pg, int source, int tag,
0156 trigger_receive_context context, int block=-1) const = 0;
0157
0158 protected:
0159
0160 int tag_;
0161 };
0162
0163
0164
0165
0166
0167
0168 template<typename Type, typename Handler>
0169 class trigger_launcher : public trigger_base
0170 {
0171 public:
0172 explicit trigger_launcher(mpi_process_group& self, int tag,
0173 const Handler& handler)
0174 : trigger_base(tag), self(self), handler(handler)
0175 {}
0176
0177 void
0178 receive(mpi_process_group const& pg, int source, int tag,
0179 trigger_receive_context context, int block=-1) const;
0180
0181 private:
0182 mpi_process_group& self;
0183 mutable Handler handler;
0184 };
0185
0186
0187
0188
0189
0190
0191 template<typename Type, typename Handler>
0192 class reply_trigger_launcher : public trigger_base
0193 {
0194 public:
0195 explicit reply_trigger_launcher(mpi_process_group& self, int tag,
0196 const Handler& handler)
0197 : trigger_base(tag), self(self), handler(handler)
0198 {}
0199
0200 void
0201 receive(mpi_process_group const& pg, int source, int tag,
0202 trigger_receive_context context, int block=-1) const;
0203
0204 private:
0205 mpi_process_group& self;
0206 mutable Handler handler;
0207 };
0208
0209 template<typename Type, typename Handler>
0210 class global_trigger_launcher : public trigger_base
0211 {
0212 public:
0213 explicit global_trigger_launcher(mpi_process_group& self, int tag,
0214 const Handler& handler)
0215 : trigger_base(tag), handler(handler)
0216 {
0217 }
0218
0219 void
0220 receive(mpi_process_group const& pg, int source, int tag,
0221 trigger_receive_context context, int block=-1) const;
0222
0223 private:
0224 mutable Handler handler;
0225
0226
0227 };
0228
0229 template<typename Type, typename Handler>
0230 class global_irecv_trigger_launcher : public trigger_base
0231 {
0232 public:
0233 explicit global_irecv_trigger_launcher(mpi_process_group& self, int tag,
0234 const Handler& handler, int sz)
0235 : trigger_base(tag), handler(handler), buffer_size(sz)
0236 {
0237 prepare_receive(self,tag);
0238 }
0239
0240 void
0241 receive(mpi_process_group const& pg, int source, int tag,
0242 trigger_receive_context context, int block=-1) const;
0243
0244 private:
0245 void prepare_receive(mpi_process_group const& pg, int tag, bool force=false) const;
0246 Handler handler;
0247 int buffer_size;
0248
0249
0250 };
0251
0252 public:
0253
0254
0255
0256
0257
0258 mpi_process_group(communicator_type parent_comm = communicator_type());
0259
0260
0261
0262
0263
0264
0265
0266
0267
0268
0269
0270
0271 mpi_process_group( std::size_t num_headers, std::size_t buffer_size,
0272 communicator_type parent_comm = communicator_type());
0273
0274
0275
0276
0277
0278
0279
0280
0281
0282
0283
0284
0285
0286
0287
0288
0289
0290
0291
0292 mpi_process_group(const mpi_process_group& other,
0293 const receiver_type& handler,
0294 bool out_of_band_receive = false);
0295
0296
0297
0298
0299
0300
0301
0302 mpi_process_group(const mpi_process_group& other,
0303 attach_distributed_object,
0304 bool out_of_band_receive = false);
0305
0306
0307
0308
0309
0310 explicit mpi_process_group(create_empty) {}
0311
0312
0313
0314
0315 ~mpi_process_group();
0316
0317
0318
0319
0320
0321
0322
0323 void replace_handler(const receiver_type& handler,
0324 bool out_of_band_receive = false);
0325
0326
0327
0328
0329
0330
0331 void make_distributed_object();
0332
0333
0334
0335
0336 void
0337 replace_on_synchronize_handler(const on_synchronize_event_type& handler = 0);
0338
0339
0340
0341
0342
0343
0344 int my_block_number() const { return block_num? *block_num : 0; }
0345
0346
0347
0348
0349
0350 int encode_tag(int block_num, int tag) const
0351 { return block_num * max_tags + tag; }
0352
0353
0354
0355
0356 std::pair<int, int> decode_tag(int encoded_tag) const
0357 { return std::make_pair(encoded_tag / max_tags, encoded_tag % max_tags); }
0358
0359
0360
0361
0362
0363
0364
0365
0366
0367
0368 int allocate_block(bool out_of_band_receive = false);
0369
0370
0371
0372
0373 bool maybe_emit_receive(int process, int encoded_tag) const;
0374
0375
0376
0377
0378 bool emit_receive(int process, int encoded_tag) const;
0379
0380
0381 void emit_on_synchronize() const;
0382
0383
0384 template<typename Receiver>
0385 Receiver* get_receiver();
0386
0387 template<typename T>
0388 void
0389 send_impl(int dest, int tag, const T& value,
0390 mpl::true_ ) const;
0391
0392 template<typename T>
0393 void
0394 send_impl(int dest, int tag, const T& value,
0395 mpl::false_ ) const;
0396
0397 template<typename T>
0398 typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type
0399 array_send_impl(int dest, int tag, const T values[], std::size_t n) const;
0400
0401 template<typename T>
0402 bool
0403 receive_impl(int source, int tag, T& value,
0404 mpl::true_ ) const;
0405
0406 template<typename T>
0407 bool
0408 receive_impl(int source, int tag, T& value,
0409 mpl::false_ ) const;
0410
0411
0412 template<typename T>
0413 typename disable_if<boost::mpi::is_mpi_datatype<T>, bool>::type
0414 array_receive_impl(int source, int tag, T* values, std::size_t& n) const;
0415
0416 optional<std::pair<mpi_process_group::process_id_type, int> > probe() const;
0417
0418 void synchronize() const;
0419
0420 operator bool() { return bool(impl_); }
0421
0422 mpi_process_group base() const;
0423
0424
0425
0426
0427
0428
0429
0430
0431
0432
0433
0434 template<typename Type, typename Handler>
0435 void trigger(int tag, const Handler& handler);
0436
0437
0438
0439
0440
0441
0442
0443
0444
0445
0446
0447
0448
0449
0450 template<typename Type, typename Handler>
0451 void trigger_with_reply(int tag, const Handler& handler);
0452
0453 template<typename Type, typename Handler>
0454 void global_trigger(int tag, const Handler& handler, std::size_t buffer_size=0);
0455
0456
0457
0458
0459
0460
0461
0462
0463
0464
0465
0466
0467
0468
0469
0470
0471
0472 optional<std::pair<int, int> >
0473 poll(bool wait = false, int block = -1, bool synchronizing = false) const;
0474
0475
0476
0477
0478
0479
0480
0481
0482
0483
0484 trigger_receive_context trigger_context() const;
0485
0486
0487 void receive_batch(process_id_type source, outgoing_messages& batch) const;
0488
0489
0490
0491
0492
0493 std::pair<boost::mpi::communicator, int>
0494 actual_communicator_and_tag(int tag, int block) const;
0495
0496
0497
0498 static void set_message_buffer_size(std::size_t s);
0499
0500
0501
0502 static std::size_t message_buffer_size();
0503 static int old_buffer_size;
0504 static void* old_buffer;
0505 private:
0506
0507 void install_trigger(int tag, int block,
0508 shared_ptr<trigger_base> const& launcher);
0509
0510 void poll_requests(int block=-1) const;
0511
0512
0513
0514 void maybe_send_batch(process_id_type dest) const;
0515
0516
0517 void send_batch(process_id_type dest, outgoing_messages& batch) const;
0518 void send_batch(process_id_type dest) const;
0519
0520 void pack_headers() const;
0521
0522
0523
0524
0525
0526
0527 void process_batch(process_id_type source) const;
0528 void receive_batch(boost::mpi::status& status) const;
0529
0530
0531
0532
0533 enum status_messages {
0534
0535 msg_reserved_first = 126,
0536
0537 msg_batch = 126,
0538
0539
0540 msg_large_batch = 127,
0541
0542
0543 msg_synchronizing = 128,
0544
0545 msg_reserved_last = 128
0546 };
0547
0548
0549
0550
0551
0552
0553
0554 struct block_type
0555 {
0556 block_type() { }
0557
0558
0559 receiver_type on_receive;
0560
0561
0562 on_synchronize_event_type on_synchronize;
0563
0564
0565
0566
0567 std::vector<shared_ptr<trigger_base> > triggers;
0568 };
0569
0570
0571
0572
0573
0574 typedef std::vector<block_type*> blocks_type;
0575
0576
0577 typedef blocks_type::iterator block_iterator;
0578
0579
0580
0581
0582
0583
0584 struct deallocate_block;
0585
0586 static std::vector<char> message_buffer;
0587
0588 public:
0589
0590
0591
0592
0593 shared_ptr<impl> impl_;
0594
0595
0596
0597
0598
0599
0600
0601
0602
0603 shared_ptr<int> block_num;
0604
0605
0606
0607
0608 int rank;
0609
0610
0611
0612
0613
0614 int size;
0615 };
0616
0617 inline mpi_process_group::process_id_type
0618 process_id(const mpi_process_group& pg)
0619 { return pg.rank; }
0620
0621 inline mpi_process_group::process_size_type
0622 num_processes(const mpi_process_group& pg)
0623 { return pg.size; }
0624
0625 mpi_process_group::communicator_type communicator(const mpi_process_group& pg);
0626
0627 template<typename T>
0628 void
0629 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
0630 int tag, const T& value);
0631
0632 template<typename InputIterator>
0633 void
0634 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
0635 int tag, InputIterator first, InputIterator last);
0636
0637 template<typename T>
0638 inline void
0639 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
0640 int tag, T* first, T* last)
0641 { send(pg, dest, tag, first, last - first); }
0642
0643 template<typename T>
0644 inline void
0645 send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
0646 int tag, const T* first, const T* last)
0647 { send(pg, dest, tag, first, last - first); }
0648
0649 template<typename T>
0650 mpi_process_group::process_id_type
0651 receive(const mpi_process_group& pg, int tag, T& value);
0652
0653 template<typename T>
0654 mpi_process_group::process_id_type
0655 receive(const mpi_process_group& pg,
0656 mpi_process_group::process_id_type source, int tag, T& value);
0657
0658 optional<std::pair<mpi_process_group::process_id_type, int> >
0659 probe(const mpi_process_group& pg);
0660
0661 void synchronize(const mpi_process_group& pg);
0662
0663 template<typename T, typename BinaryOperation>
0664 T*
0665 all_reduce(const mpi_process_group& pg, T* first, T* last, T* out,
0666 BinaryOperation bin_op);
0667
0668 template<typename T, typename BinaryOperation>
0669 T*
0670 scan(const mpi_process_group& pg, T* first, T* last, T* out,
0671 BinaryOperation bin_op);
0672
0673 template<typename InputIterator, typename T>
0674 void
0675 all_gather(const mpi_process_group& pg,
0676 InputIterator first, InputIterator last, std::vector<T>& out);
0677
0678 template<typename InputIterator>
0679 mpi_process_group
0680 process_subgroup(const mpi_process_group& pg,
0681 InputIterator first, InputIterator last);
0682
0683 template<typename T>
0684 void
0685 broadcast(const mpi_process_group& pg, T& val,
0686 mpi_process_group::process_id_type root);
0687
0688
0689
0690 inline void
0691 swap(mpi_process_group::outgoing_messages& x,
0692 mpi_process_group::outgoing_messages& y)
0693 {
0694 x.swap(y);
0695 }
0696
0697
0698
0699
0700
0701
0702 template<typename T>
0703 typename enable_if<boost::mpi::is_mpi_datatype<T> >::type
0704 send_oob(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
0705 int tag, const T& value, int block=-1)
0706 {
0707 using boost::mpi::get_mpi_datatype;
0708
0709
0710
0711 std::pair<boost::mpi::communicator, int> actual
0712 = pg.actual_communicator_and_tag(tag, block);
0713
0714 #ifdef SEND_OOB_BSEND
0715 if (mpi_process_group::message_buffer_size()) {
0716 MPI_Bsend(const_cast<T*>(&value), 1, get_mpi_datatype<T>(value), dest,
0717 actual.second, actual.first);
0718 return;
0719 }
0720 #endif
0721 MPI_Request request;
0722 MPI_Isend(const_cast<T*>(&value), 1, get_mpi_datatype<T>(value), dest,
0723 actual.second, actual.first, &request);
0724
0725 int done=0;
0726 do {
0727 pg.poll();
0728 MPI_Test(&request,&done,MPI_STATUS_IGNORE);
0729 } while (!done);
0730 }
0731
0732 template<typename T>
0733 typename disable_if<boost::mpi::is_mpi_datatype<T> >::type
0734 send_oob(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
0735 int tag, const T& value, int block=-1)
0736 {
0737 using boost::mpi::packed_oarchive;
0738
0739
0740
0741 std::pair<boost::mpi::communicator, int> actual
0742 = pg.actual_communicator_and_tag(tag, block);
0743
0744
0745 packed_oarchive out(actual.first);
0746 out << value;
0747 std::size_t size = out.size();
0748
0749
0750 #ifdef SEND_OOB_BSEND
0751 if (mpi_process_group::message_buffer_size()) {
0752 MPI_Bsend(const_cast<void*>(out.address()), size, MPI_PACKED,
0753 dest, actual.second, actual.first);
0754 return;
0755 }
0756 #endif
0757 MPI_Request request;
0758 MPI_Isend(const_cast<void*>(out.address()), size, MPI_PACKED,
0759 dest, actual.second, actual.first, &request);
0760
0761 int done=0;
0762 do {
0763 pg.poll();
0764 MPI_Test(&request,&done,MPI_STATUS_IGNORE);
0765 } while (!done);
0766 }
0767
0768 template<typename T>
0769 typename enable_if<boost::mpi::is_mpi_datatype<T> >::type
0770 receive_oob(const mpi_process_group& pg,
0771 mpi_process_group::process_id_type source, int tag, T& value, int block=-1);
0772
0773 template<typename T>
0774 typename disable_if<boost::mpi::is_mpi_datatype<T> >::type
0775 receive_oob(const mpi_process_group& pg,
0776 mpi_process_group::process_id_type source, int tag, T& value, int block=-1);
0777
0778 template<typename SendT, typename ReplyT>
0779 typename enable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
0780 send_oob_with_reply(const mpi_process_group& pg,
0781 mpi_process_group::process_id_type dest,
0782 int tag, const SendT& send_value, ReplyT& reply_value,
0783 int block = -1);
0784
0785 template<typename SendT, typename ReplyT>
0786 typename disable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
0787 send_oob_with_reply(const mpi_process_group& pg,
0788 mpi_process_group::process_id_type dest,
0789 int tag, const SendT& send_value, ReplyT& reply_value,
0790 int block = -1);
0791
0792 } } }
0793
0794 BOOST_IS_BITWISE_SERIALIZABLE(boost::graph::distributed::mpi_process_group::message_header)
0795 namespace boost { namespace mpi {
0796 template<>
0797 struct is_mpi_datatype<boost::graph::distributed::mpi_process_group::message_header> : mpl::true_ { };
0798 } }
0799
0800 BOOST_CLASS_IMPLEMENTATION(boost::graph::distributed::mpi_process_group::outgoing_messages,object_serializable)
0801 BOOST_CLASS_TRACKING(boost::graph::distributed::mpi_process_group::outgoing_messages,track_never)
0802
0803 #include <boost/graph/distributed/detail/mpi_process_group.ipp>
0804
0805 #endif