File indexing completed on 2025-01-18 09:40:57
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 #ifndef BOOST_MPI_REQUEST_HANDLERS_HPP
0012 #define BOOST_MPI_REQUEST_HANDLERS_HPP
0013
0014 #include <boost/mpi/skeleton_and_content_types.hpp>
0015
0016 namespace boost { namespace mpi {
0017
0018 namespace detail {
0019
0020
0021
0022
0023 template<typename T>
0024 struct serialized_irecv_data {
0025 serialized_irecv_data(const communicator& comm, T& value)
0026 : m_ia(comm), m_value(value) {}
0027
0028 void deserialize(status& stat)
0029 {
0030 m_ia >> m_value;
0031 stat.m_count = 1;
0032 }
0033
0034 std::size_t m_count;
0035 packed_iarchive m_ia;
0036 T& m_value;
0037 };
0038
0039 template<>
0040 struct serialized_irecv_data<packed_iarchive>
0041 {
0042 serialized_irecv_data(communicator const&, packed_iarchive& ia) : m_ia(ia) { }
0043
0044 void deserialize(status&) { }
0045
0046 std::size_t m_count;
0047 packed_iarchive& m_ia;
0048 };
0049
0050
0051
0052
0053
0054 template<typename T>
0055 struct serialized_array_irecv_data
0056 {
0057 serialized_array_irecv_data(const communicator& comm, T* values, int n)
0058 : m_count(0), m_ia(comm), m_values(values), m_nb(n) {}
0059
0060 void deserialize(status& stat);
0061
0062 std::size_t m_count;
0063 packed_iarchive m_ia;
0064 T* m_values;
0065 int m_nb;
0066 };
0067
0068 template<typename T>
0069 void serialized_array_irecv_data<T>::deserialize(status& stat)
0070 {
0071 T* v = m_values;
0072 T* end = m_values+m_nb;
0073 while (v < end) {
0074 m_ia >> *v++;
0075 }
0076 stat.m_count = m_nb;
0077 }
0078
0079
0080
0081
0082
0083
0084
0085 template<typename T, class A>
0086 struct dynamic_array_irecv_data
0087 {
0088 BOOST_STATIC_ASSERT_MSG(is_mpi_datatype<T>::value, "Can only be specialized for MPI datatypes.");
0089
0090 dynamic_array_irecv_data(std::vector<T,A>& values)
0091 : m_count(-1), m_values(values) {}
0092
0093 std::size_t m_count;
0094 std::vector<T,A>& m_values;
0095 };
0096
0097 template<typename T>
0098 struct serialized_irecv_data<const skeleton_proxy<T> >
0099 {
0100 serialized_irecv_data(const communicator& comm, skeleton_proxy<T> proxy)
0101 : m_isa(comm), m_ia(m_isa.get_skeleton()), m_proxy(proxy) { }
0102
0103 void deserialize(status& stat)
0104 {
0105 m_isa >> m_proxy.object;
0106 stat.m_count = 1;
0107 }
0108
0109 std::size_t m_count;
0110 packed_skeleton_iarchive m_isa;
0111 packed_iarchive& m_ia;
0112 skeleton_proxy<T> m_proxy;
0113 };
0114
0115 template<typename T>
0116 struct serialized_irecv_data<skeleton_proxy<T> >
0117 : public serialized_irecv_data<const skeleton_proxy<T> >
0118 {
0119 typedef serialized_irecv_data<const skeleton_proxy<T> > inherited;
0120
0121 serialized_irecv_data(const communicator& comm, const skeleton_proxy<T>& proxy)
0122 : inherited(comm, proxy) { }
0123 };
0124 }
0125
0126 #if BOOST_MPI_VERSION >= 3
0127 template<class Data>
0128 class request::probe_handler
0129 : public request::handler,
0130 protected Data {
0131
0132 protected:
0133 template<typename I1>
0134 probe_handler(communicator const& comm, int source, int tag, I1& i1)
0135 : Data(comm, i1),
0136 m_comm(comm),
0137 m_source(source),
0138 m_tag(tag) {}
0139
0140 template<typename I1, typename I2>
0141 probe_handler(communicator const& comm, int source, int tag, I1& i1, I2& i2)
0142 : Data(comm, i1, i2),
0143 m_comm(comm),
0144 m_source(source),
0145 m_tag(tag) {}
0146
0147 public:
0148 bool active() const { return m_source != MPI_PROC_NULL; }
0149 optional<MPI_Request&> trivial() { return boost::none; }
0150 void cancel() { m_source = MPI_PROC_NULL; }
0151
0152 status wait() {
0153 MPI_Message msg;
0154 status stat;
0155 BOOST_MPI_CHECK_RESULT(MPI_Mprobe, (m_source,m_tag,m_comm,&msg,&stat.m_status));
0156 return unpack(msg, stat);
0157 }
0158
0159 optional<status> test() {
0160 status stat;
0161 int flag = 0;
0162 MPI_Message msg;
0163 BOOST_MPI_CHECK_RESULT(MPI_Improbe, (m_source,m_tag,m_comm,&flag,&msg,&stat.m_status));
0164 if (flag) {
0165 return unpack(msg, stat);
0166 } else {
0167 return optional<status>();
0168 }
0169 }
0170
0171 protected:
0172 friend class request;
0173
0174 status unpack(MPI_Message& msg, status& stat) {
0175 int count;
0176 MPI_Datatype datatype = this->Data::datatype();
0177 BOOST_MPI_CHECK_RESULT(MPI_Get_count, (&stat.m_status, datatype, &count));
0178 this->Data::resize(count);
0179 BOOST_MPI_CHECK_RESULT(MPI_Mrecv, (this->Data::buffer(), count, datatype, &msg, &stat.m_status));
0180 this->Data::deserialize();
0181 m_source = MPI_PROC_NULL;
0182 stat.m_count = 1;
0183 return stat;
0184 }
0185
0186 communicator const& m_comm;
0187 int m_source;
0188 int m_tag;
0189 };
0190 #endif
0191
0192 namespace detail {
0193 template<class A>
0194 struct dynamic_primitive_array_data {
0195 dynamic_primitive_array_data(communicator const&, A& arr) : m_buffer(arr) {}
0196
0197 void* buffer() { return m_buffer.data(); }
0198 void resize(std::size_t sz) { m_buffer.resize(sz); }
0199 void deserialize() {}
0200 MPI_Datatype datatype() { return get_mpi_datatype<typename A::value_type>(); }
0201
0202 A& m_buffer;
0203 };
0204
0205 template<typename T>
0206 struct serialized_data {
0207 serialized_data(communicator const& comm, T& value) : m_archive(comm), m_value(value) {}
0208
0209 void* buffer() { return m_archive.address(); }
0210 void resize(std::size_t sz) { m_archive.resize(sz); }
0211 void deserialize() { m_archive >> m_value; }
0212 MPI_Datatype datatype() { return MPI_PACKED; }
0213
0214 packed_iarchive m_archive;
0215 T& m_value;
0216 };
0217
0218 template<>
0219 struct serialized_data<packed_iarchive> {
0220 serialized_data(communicator const& comm, packed_iarchive& ar) : m_archive(ar) {}
0221
0222 void* buffer() { return m_archive.address(); }
0223 void resize(std::size_t sz) { m_archive.resize(sz); }
0224 void deserialize() {}
0225 MPI_Datatype datatype() { return MPI_PACKED; }
0226
0227 packed_iarchive& m_archive;
0228 };
0229
0230 template<typename T>
0231 struct serialized_data<const skeleton_proxy<T> > {
0232 serialized_data(communicator const& comm, skeleton_proxy<T> skel)
0233 : m_proxy(skel),
0234 m_archive(comm) {}
0235
0236 void* buffer() { return m_archive.get_skeleton().address(); }
0237 void resize(std::size_t sz) { m_archive.get_skeleton().resize(sz); }
0238 void deserialize() { m_archive >> m_proxy.object; }
0239 MPI_Datatype datatype() { return MPI_PACKED; }
0240
0241 skeleton_proxy<T> m_proxy;
0242 packed_skeleton_iarchive m_archive;
0243 };
0244
0245 template<typename T>
0246 struct serialized_data<skeleton_proxy<T> >
0247 : public serialized_data<const skeleton_proxy<T> > {
0248 typedef serialized_data<const skeleton_proxy<T> > super;
0249 serialized_data(communicator const& comm, skeleton_proxy<T> skel)
0250 : super(comm, skel) {}
0251 };
0252
0253 template<typename T>
0254 struct serialized_array_data {
0255 serialized_array_data(communicator const& comm, T* values, int nb)
0256 : m_archive(comm), m_values(values), m_nb(nb) {}
0257
0258 void* buffer() { return m_archive.address(); }
0259 void resize(std::size_t sz) { m_archive.resize(sz); }
0260 void deserialize() {
0261 T* end = m_values + m_nb;
0262 T* v = m_values;
0263 while (v != end) {
0264 m_archive >> *v++;
0265 }
0266 }
0267 MPI_Datatype datatype() { return MPI_PACKED; }
0268
0269 packed_iarchive m_archive;
0270 T* m_values;
0271 int m_nb;
0272 };
0273
0274 }
0275
0276 class BOOST_MPI_DECL request::legacy_handler : public request::handler {
0277 public:
0278 legacy_handler(communicator const& comm, int source, int tag);
0279
0280 void cancel() {
0281 for (int i = 0; i < 2; ++i) {
0282 if (m_requests[i] != MPI_REQUEST_NULL) {
0283 BOOST_MPI_CHECK_RESULT(MPI_Cancel, (m_requests+i));
0284 }
0285 }
0286 }
0287
0288 bool active() const;
0289 optional<MPI_Request&> trivial();
0290
0291 MPI_Request m_requests[2];
0292 communicator m_comm;
0293 int m_source;
0294 int m_tag;
0295 };
0296
0297 template<typename T>
0298 class request::legacy_serialized_handler
0299 : public request::legacy_handler,
0300 protected detail::serialized_irecv_data<T> {
0301 public:
0302 typedef detail::serialized_irecv_data<T> extra;
0303 legacy_serialized_handler(communicator const& comm, int source, int tag, T& value)
0304 : legacy_handler(comm, source, tag),
0305 extra(comm, value) {
0306 BOOST_MPI_CHECK_RESULT(MPI_Irecv,
0307 (&this->extra::m_count, 1,
0308 get_mpi_datatype(this->extra::m_count),
0309 source, tag, comm, m_requests+0));
0310
0311 }
0312
0313 status wait() {
0314 status stat;
0315 if (m_requests[1] == MPI_REQUEST_NULL) {
0316
0317 BOOST_MPI_CHECK_RESULT(MPI_Wait,
0318 (m_requests, &stat.m_status));
0319
0320 this->extra::m_ia.resize(this->extra::m_count);
0321 BOOST_MPI_CHECK_RESULT(MPI_Irecv,
0322 (this->extra::m_ia.address(), this->extra::m_ia.size(), MPI_PACKED,
0323 stat.source(), stat.tag(),
0324 MPI_Comm(m_comm), m_requests + 1));
0325 }
0326
0327
0328 BOOST_MPI_CHECK_RESULT(MPI_Wait,
0329 (m_requests + 1, &stat.m_status));
0330
0331 this->deserialize(stat);
0332 return stat;
0333 }
0334
0335 optional<status> test() {
0336 status stat;
0337 int flag = 0;
0338
0339 if (m_requests[1] == MPI_REQUEST_NULL) {
0340
0341 BOOST_MPI_CHECK_RESULT(MPI_Test,
0342 (m_requests, &flag, &stat.m_status));
0343 if (flag) {
0344
0345 this->extra::m_ia.resize(this->extra::m_count);
0346 BOOST_MPI_CHECK_RESULT(MPI_Irecv,
0347 (this->extra::m_ia.address(), this->extra::m_ia.size(),MPI_PACKED,
0348 stat.source(), stat.tag(),
0349 MPI_Comm(m_comm), m_requests + 1));
0350 } else
0351 return optional<status>();
0352 }
0353
0354
0355 BOOST_MPI_CHECK_RESULT(MPI_Test,
0356 (m_requests + 1, &flag, &stat.m_status));
0357 if (flag) {
0358 this->deserialize(stat);
0359 return stat;
0360 } else
0361 return optional<status>();
0362 }
0363 };
0364
0365 template<typename T>
0366 class request::legacy_serialized_array_handler
0367 : public request::legacy_handler,
0368 protected detail::serialized_array_irecv_data<T> {
0369 typedef detail::serialized_array_irecv_data<T> extra;
0370
0371 public:
0372 legacy_serialized_array_handler(communicator const& comm, int source, int tag, T* values, int n)
0373 : legacy_handler(comm, source, tag),
0374 extra(comm, values, n) {
0375 BOOST_MPI_CHECK_RESULT(MPI_Irecv,
0376 (&this->extra::m_count, 1,
0377 get_mpi_datatype(this->extra::m_count),
0378 source, tag, comm, m_requests+0));
0379 }
0380
0381 status wait() {
0382 status stat;
0383 if (m_requests[1] == MPI_REQUEST_NULL) {
0384
0385 BOOST_MPI_CHECK_RESULT(MPI_Wait,
0386 (m_requests, &stat.m_status));
0387
0388 this->extra::m_ia.resize(this->extra::m_count);
0389 BOOST_MPI_CHECK_RESULT(MPI_Irecv,
0390 (this->extra::m_ia.address(), this->extra::m_ia.size(), MPI_PACKED,
0391 stat.source(), stat.tag(),
0392 MPI_Comm(m_comm), m_requests + 1));
0393 }
0394
0395
0396 BOOST_MPI_CHECK_RESULT(MPI_Wait,
0397 (m_requests + 1, &stat.m_status));
0398
0399 this->deserialize(stat);
0400 return stat;
0401 }
0402
0403 optional<status> test() {
0404 status stat;
0405 int flag = 0;
0406
0407 if (m_requests[1] == MPI_REQUEST_NULL) {
0408
0409 BOOST_MPI_CHECK_RESULT(MPI_Test,
0410 (m_requests, &flag, &stat.m_status));
0411 if (flag) {
0412
0413 this->extra::m_ia.resize(this->extra::m_count);
0414 BOOST_MPI_CHECK_RESULT(MPI_Irecv,
0415 (this->extra::m_ia.address(), this->extra::m_ia.size(),MPI_PACKED,
0416 stat.source(), stat.tag(),
0417 MPI_Comm(m_comm), m_requests + 1));
0418 } else
0419 return optional<status>();
0420 }
0421
0422
0423 BOOST_MPI_CHECK_RESULT(MPI_Test,
0424 (m_requests + 1, &flag, &stat.m_status));
0425 if (flag) {
0426 this->deserialize(stat);
0427 return stat;
0428 } else
0429 return optional<status>();
0430 }
0431 };
0432
0433 template<typename T, class A>
0434 class request::legacy_dynamic_primitive_array_handler
0435 : public request::legacy_handler,
0436 protected detail::dynamic_array_irecv_data<T,A>
0437 {
0438 typedef detail::dynamic_array_irecv_data<T,A> extra;
0439
0440 public:
0441 legacy_dynamic_primitive_array_handler(communicator const& comm, int source, int tag, std::vector<T,A>& values)
0442 : legacy_handler(comm, source, tag),
0443 extra(values) {
0444 BOOST_MPI_CHECK_RESULT(MPI_Irecv,
0445 (&this->extra::m_count, 1,
0446 get_mpi_datatype(this->extra::m_count),
0447 source, tag, comm, m_requests+0));
0448 }
0449
0450 status wait() {
0451 status stat;
0452 if (m_requests[1] == MPI_REQUEST_NULL) {
0453
0454 BOOST_MPI_CHECK_RESULT(MPI_Wait,
0455 (m_requests, &stat.m_status));
0456
0457 this->extra::m_values.resize(this->extra::m_count);
0458 BOOST_MPI_CHECK_RESULT(MPI_Irecv,
0459 (detail::c_data(this->extra::m_values), this->extra::m_values.size(), get_mpi_datatype<T>(),
0460 stat.source(), stat.tag(),
0461 MPI_Comm(m_comm), m_requests + 1));
0462 }
0463
0464 BOOST_MPI_CHECK_RESULT(MPI_Wait,
0465 (m_requests + 1, &stat.m_status));
0466 return stat;
0467 }
0468
0469 optional<status> test() {
0470 status stat;
0471 int flag = 0;
0472
0473 if (m_requests[1] == MPI_REQUEST_NULL) {
0474
0475 BOOST_MPI_CHECK_RESULT(MPI_Test,
0476 (m_requests, &flag, &stat.m_status));
0477 if (flag) {
0478
0479 this->extra::m_values.resize(this->extra::m_count);
0480 BOOST_MPI_CHECK_RESULT(MPI_Irecv,
0481 (detail::c_data(this->extra::m_values), this->extra::m_values.size(), get_mpi_datatype<T>(),
0482 stat.source(), stat.tag(),
0483 MPI_Comm(m_comm), m_requests + 1));
0484 } else
0485 return optional<status>();
0486 }
0487
0488
0489 BOOST_MPI_CHECK_RESULT(MPI_Test,
0490 (m_requests + 1, &flag, &stat.m_status));
0491 if (flag) {
0492 return stat;
0493 } else
0494 return optional<status>();
0495 }
0496 };
0497
0498 class BOOST_MPI_DECL request::trivial_handler : public request::handler {
0499
0500 public:
0501 trivial_handler();
0502
0503 status wait();
0504 optional<status> test();
0505 void cancel();
0506
0507 bool active() const;
0508 optional<MPI_Request&> trivial();
0509
0510 private:
0511 friend class request;
0512 MPI_Request m_request;
0513 };
0514
0515 class request::dynamic_handler : public request::handler {
0516 dynamic_handler();
0517
0518 status wait();
0519 optional<status> test();
0520 void cancel();
0521
0522 bool active() const;
0523 optional<MPI_Request&> trivial();
0524
0525 private:
0526 friend class request;
0527 MPI_Request m_requests[2];
0528 };
0529
0530 template<typename T>
0531 request request::make_serialized(communicator const& comm, int source, int tag, T& value) {
0532 #if defined(BOOST_MPI_USE_IMPROBE)
0533 return request(new probe_handler<detail::serialized_data<T> >(comm, source, tag, value));
0534 #else
0535 return request(new legacy_serialized_handler<T>(comm, source, tag, value));
0536 #endif
0537 }
0538
0539 template<typename T>
0540 request request::make_serialized_array(communicator const& comm, int source, int tag, T* values, int n) {
0541 #if defined(BOOST_MPI_USE_IMPROBE)
0542 return request(new probe_handler<detail::serialized_array_data<T> >(comm, source, tag, values, n));
0543 #else
0544 return request(new legacy_serialized_array_handler<T>(comm, source, tag, values, n));
0545 #endif
0546 }
0547
0548 template<typename T, class A>
0549 request request::make_dynamic_primitive_array_recv(communicator const& comm, int source, int tag,
0550 std::vector<T,A>& values) {
0551 #if defined(BOOST_MPI_USE_IMPROBE)
0552 return request(new probe_handler<detail::dynamic_primitive_array_data<std::vector<T,A> > >(comm,source,tag,values));
0553 #else
0554 return request(new legacy_dynamic_primitive_array_handler<T,A>(comm, source, tag, values));
0555 #endif
0556 }
0557
0558 template<typename T>
0559 request
0560 request::make_trivial_send(communicator const& comm, int dest, int tag, T const* values, int n) {
0561 trivial_handler* handler = new trivial_handler;
0562 BOOST_MPI_CHECK_RESULT(MPI_Isend,
0563 (const_cast<T*>(values), n,
0564 get_mpi_datatype<T>(),
0565 dest, tag, comm, &handler->m_request));
0566 return request(handler);
0567 }
0568
0569 template<typename T>
0570 request
0571 request::make_trivial_send(communicator const& comm, int dest, int tag, T const& value) {
0572 return make_trivial_send(comm, dest, tag, &value, 1);
0573 }
0574
0575 template<typename T>
0576 request
0577 request::make_trivial_recv(communicator const& comm, int dest, int tag, T* values, int n) {
0578 trivial_handler* handler = new trivial_handler;
0579 BOOST_MPI_CHECK_RESULT(MPI_Irecv,
0580 (values, n,
0581 get_mpi_datatype<T>(),
0582 dest, tag, comm, &handler->m_request));
0583 return request(handler);
0584 }
0585
0586 template<typename T>
0587 request
0588 request::make_trivial_recv(communicator const& comm, int dest, int tag, T& value) {
0589 return make_trivial_recv(comm, dest, tag, &value, 1);
0590 }
0591
0592 template<typename T, class A>
0593 request request::make_dynamic_primitive_array_send(communicator const& comm, int dest, int tag,
0594 std::vector<T,A> const& values) {
0595 #if defined(BOOST_MPI_USE_IMPROBE)
0596 return make_trivial_send(comm, dest, tag, values.data(), values.size());
0597 #else
0598 {
0599
0600
0601 boost::shared_ptr<std::size_t> size(new std::size_t(values.size()));
0602 dynamic_handler* handler = new dynamic_handler;
0603 request req(handler);
0604 req.preserve(size);
0605
0606 BOOST_MPI_CHECK_RESULT(MPI_Isend,
0607 (size.get(), 1,
0608 get_mpi_datatype(*size),
0609 dest, tag, comm, handler->m_requests+0));
0610 BOOST_MPI_CHECK_RESULT(MPI_Isend,
0611 (const_cast<T*>(values.data()), *size,
0612 get_mpi_datatype<T>(),
0613 dest, tag, comm, handler->m_requests+1));
0614 return req;
0615 }
0616 #endif
0617 }
0618
0619 inline
0620 request::legacy_handler::legacy_handler(communicator const& comm, int source, int tag)
0621 : m_comm(comm),
0622 m_source(source),
0623 m_tag(tag)
0624 {
0625 m_requests[0] = MPI_REQUEST_NULL;
0626 m_requests[1] = MPI_REQUEST_NULL;
0627 }
0628
0629 }}
0630
0631 #endif