Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 09:40:57

0001 // Copyright (C) 2018 Alain Miniussi <alain.miniussi@oca.eu>.
0002 
0003 // Use, modification and distribution is subject to the Boost Software
0004 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
0005 // http://www.boost.org/LICENSE_1_0.txt)
0006 
0007 // Request implementation dtails
0008 
0009 // This header should be included only after the communicator and request 
0010 // classes has been defined.
0011 #ifndef BOOST_MPI_REQUEST_HANDLERS_HPP
0012 #define BOOST_MPI_REQUEST_HANDLERS_HPP
0013 
0014 #include <boost/mpi/skeleton_and_content_types.hpp>
0015 
0016 namespace boost { namespace mpi {
0017 
0018 namespace detail {
0019 /**
0020  * Internal data structure that stores everything required to manage
0021  * the receipt of serialized data via a request object.
0022  */
0023 template<typename T>
0024 struct serialized_irecv_data {
0025   serialized_irecv_data(const communicator& comm, T& value)
0026     : m_ia(comm), m_value(value) {}
0027 
0028   void deserialize(status& stat) 
0029   { 
0030     m_ia >> m_value; 
0031     stat.m_count = 1;
0032   }
0033 
0034   std::size_t     m_count;
0035   packed_iarchive m_ia;
0036   T&              m_value;
0037 };
0038 
0039 template<>
0040 struct serialized_irecv_data<packed_iarchive>
0041 {
0042   serialized_irecv_data(communicator const&, packed_iarchive& ia) : m_ia(ia) { }
0043 
0044   void deserialize(status&) { /* Do nothing. */ }
0045 
0046   std::size_t      m_count;
0047   packed_iarchive& m_ia;
0048 };
0049 
0050 /**
0051  * Internal data structure that stores everything required to manage
0052  * the receipt of an array of serialized data via a request object.
0053  */
0054 template<typename T>
0055 struct serialized_array_irecv_data
0056 {
0057   serialized_array_irecv_data(const communicator& comm, T* values, int n)
0058     : m_count(0), m_ia(comm), m_values(values), m_nb(n) {}
0059 
0060   void deserialize(status& stat);
0061 
0062   std::size_t     m_count;
0063   packed_iarchive m_ia;
0064   T*              m_values;
0065   int             m_nb;
0066 };
0067 
0068 template<typename T>
0069 void serialized_array_irecv_data<T>::deserialize(status& stat)
0070 {
0071   T* v = m_values;
0072   T* end =  m_values+m_nb;
0073   while (v < end) {
0074     m_ia >> *v++;
0075   }
0076   stat.m_count = m_nb;
0077 }
0078 
0079 /**
0080  * Internal data structure that stores everything required to manage
0081  * the receipt of an array of primitive data but unknown size.
0082  * Such an array can have been send with blocking operation and so must
0083  * be compatible with the (size_t,raw_data[]) format.
0084  */
0085 template<typename T, class A>
0086 struct dynamic_array_irecv_data
0087 {
0088   BOOST_STATIC_ASSERT_MSG(is_mpi_datatype<T>::value, "Can only be specialized for MPI datatypes.");
0089 
0090   dynamic_array_irecv_data(std::vector<T,A>& values)
0091     : m_count(-1), m_values(values) {}
0092 
0093   std::size_t       m_count;
0094   std::vector<T,A>& m_values;
0095 };
0096 
0097 template<typename T>
0098 struct serialized_irecv_data<const skeleton_proxy<T> >
0099 {
0100   serialized_irecv_data(const communicator& comm, skeleton_proxy<T> proxy)
0101     : m_isa(comm), m_ia(m_isa.get_skeleton()), m_proxy(proxy) { }
0102 
0103   void deserialize(status& stat) 
0104   { 
0105     m_isa >> m_proxy.object;
0106     stat.m_count = 1;
0107   }
0108 
0109   std::size_t              m_count;
0110   packed_skeleton_iarchive m_isa;
0111   packed_iarchive&         m_ia;
0112   skeleton_proxy<T>        m_proxy;
0113 };
0114 
0115 template<typename T>
0116 struct serialized_irecv_data<skeleton_proxy<T> >
0117   : public serialized_irecv_data<const skeleton_proxy<T> >
0118 {
0119   typedef serialized_irecv_data<const skeleton_proxy<T> > inherited;
0120 
0121   serialized_irecv_data(const communicator& comm, const skeleton_proxy<T>& proxy)
0122     : inherited(comm, proxy) { }
0123 };
0124 }
0125 
0126 #if BOOST_MPI_VERSION >= 3
0127 template<class Data>
0128 class request::probe_handler
0129   : public request::handler,
0130     protected Data {
0131 
0132 protected:
0133   template<typename I1>
0134   probe_handler(communicator const& comm, int source, int tag, I1& i1)
0135     : Data(comm, i1),
0136       m_comm(comm),
0137       m_source(source),
0138       m_tag(tag) {}
0139   // no variadic template for now
0140   template<typename I1, typename I2>
0141   probe_handler(communicator const& comm, int source, int tag, I1& i1, I2& i2)
0142     : Data(comm, i1, i2),
0143       m_comm(comm),
0144       m_source(source),
0145       m_tag(tag) {}
0146 
0147 public:
0148   bool active() const { return m_source != MPI_PROC_NULL; }
0149   optional<MPI_Request&> trivial() { return boost::none; }
0150   void cancel() { m_source = MPI_PROC_NULL; }
0151 
0152   status wait() {
0153     MPI_Message msg;
0154     status stat;
0155     BOOST_MPI_CHECK_RESULT(MPI_Mprobe, (m_source,m_tag,m_comm,&msg,&stat.m_status));
0156     return unpack(msg, stat);
0157   }
0158   
0159   optional<status> test() {
0160     status stat;
0161     int flag = 0;
0162     MPI_Message msg;
0163     BOOST_MPI_CHECK_RESULT(MPI_Improbe, (m_source,m_tag,m_comm,&flag,&msg,&stat.m_status));
0164     if (flag) {
0165       return unpack(msg, stat);
0166     } else {
0167       return optional<status>();
0168     } 
0169   }
0170 
0171 protected:
0172   friend class request;
0173 
0174   status unpack(MPI_Message& msg, status& stat) {
0175     int count;
0176     MPI_Datatype datatype = this->Data::datatype();
0177     BOOST_MPI_CHECK_RESULT(MPI_Get_count, (&stat.m_status, datatype, &count));
0178     this->Data::resize(count);
0179     BOOST_MPI_CHECK_RESULT(MPI_Mrecv, (this->Data::buffer(), count, datatype, &msg, &stat.m_status));
0180     this->Data::deserialize();
0181     m_source = MPI_PROC_NULL;
0182     stat.m_count = 1;
0183     return stat;
0184   }
0185   
0186   communicator const& m_comm;
0187   int m_source;
0188   int m_tag;
0189 };
0190 #endif // BOOST_MPI_VERSION >= 3
0191 
0192 namespace detail {
0193 template<class A>
0194 struct dynamic_primitive_array_data {
0195   dynamic_primitive_array_data(communicator const&, A& arr) : m_buffer(arr) {}
0196   
0197   void* buffer() { return m_buffer.data(); }
0198   void  resize(std::size_t sz) { m_buffer.resize(sz); }
0199   void  deserialize() {}
0200   MPI_Datatype datatype() { return get_mpi_datatype<typename A::value_type>(); }
0201   
0202   A& m_buffer;
0203 };
0204 
0205 template<typename T>
0206 struct serialized_data {
0207   serialized_data(communicator const& comm, T& value) : m_archive(comm), m_value(value) {}
0208 
0209   void* buffer() { return m_archive.address(); }
0210   void  resize(std::size_t sz) { m_archive.resize(sz); }
0211   void  deserialize() { m_archive >> m_value; }
0212   MPI_Datatype datatype() { return MPI_PACKED; }
0213 
0214   packed_iarchive m_archive;
0215   T& m_value;
0216 };
0217 
0218 template<>
0219 struct serialized_data<packed_iarchive> {
0220   serialized_data(communicator const& comm, packed_iarchive& ar) : m_archive(ar) {}
0221   
0222   void* buffer() { return m_archive.address(); }
0223   void  resize(std::size_t sz) { m_archive.resize(sz); }
0224   void  deserialize() {}
0225   MPI_Datatype datatype() { return MPI_PACKED; }
0226 
0227   packed_iarchive& m_archive;
0228 };
0229 
0230 template<typename T>
0231 struct serialized_data<const skeleton_proxy<T> > {
0232   serialized_data(communicator const& comm, skeleton_proxy<T> skel)
0233     : m_proxy(skel),
0234       m_archive(comm) {}
0235   
0236   void* buffer() { return m_archive.get_skeleton().address(); }
0237   void  resize(std::size_t sz) { m_archive.get_skeleton().resize(sz); }
0238   void  deserialize() { m_archive >> m_proxy.object; }
0239   MPI_Datatype datatype() { return MPI_PACKED; }
0240 
0241   skeleton_proxy<T> m_proxy;
0242   packed_skeleton_iarchive m_archive;
0243 };
0244 
0245 template<typename T>
0246 struct serialized_data<skeleton_proxy<T> >
0247   : public serialized_data<const skeleton_proxy<T> > {
0248   typedef serialized_data<const skeleton_proxy<T> > super;
0249   serialized_data(communicator const& comm, skeleton_proxy<T> skel)
0250     : super(comm, skel) {}
0251 };
0252 
0253 template<typename T>
0254 struct serialized_array_data {
0255   serialized_array_data(communicator const& comm, T* values, int nb)
0256     : m_archive(comm), m_values(values), m_nb(nb) {}
0257 
0258   void* buffer() { return m_archive.address(); }
0259   void  resize(std::size_t sz) { m_archive.resize(sz); }
0260   void  deserialize() {
0261     T* end = m_values + m_nb;
0262     T* v = m_values;
0263     while (v != end) {
0264       m_archive >> *v++;
0265     }
0266   }
0267   MPI_Datatype datatype() { return MPI_PACKED; }
0268 
0269   packed_iarchive m_archive;
0270   T*  m_values;
0271   int m_nb;
0272 };
0273 
0274 }
0275 
0276 class BOOST_MPI_DECL request::legacy_handler : public request::handler {
0277 public:
0278   legacy_handler(communicator const& comm, int source, int tag);
0279   
0280   void cancel() {
0281     for (int i = 0; i < 2; ++i) {
0282       if (m_requests[i] != MPI_REQUEST_NULL) {
0283         BOOST_MPI_CHECK_RESULT(MPI_Cancel, (m_requests+i));
0284       }
0285     }
0286   }
0287   
0288   bool active() const;
0289   optional<MPI_Request&> trivial();
0290   
0291   MPI_Request      m_requests[2];
0292   communicator     m_comm;
0293   int              m_source;
0294   int              m_tag;
0295 };
0296 
0297 template<typename T>
0298 class request::legacy_serialized_handler 
0299   : public request::legacy_handler, 
0300     protected detail::serialized_irecv_data<T> {
0301 public:
0302   typedef detail::serialized_irecv_data<T> extra;
0303   legacy_serialized_handler(communicator const& comm, int source, int tag, T& value)
0304     : legacy_handler(comm, source, tag),
0305       extra(comm, value)  {
0306     BOOST_MPI_CHECK_RESULT(MPI_Irecv,
0307                (&this->extra::m_count, 1, 
0308                 get_mpi_datatype(this->extra::m_count),
0309                 source, tag, comm, m_requests+0));
0310     
0311   }
0312 
0313   status wait() {
0314     status stat;
0315     if (m_requests[1] == MPI_REQUEST_NULL) {
0316       // Wait for the count message to complete
0317       BOOST_MPI_CHECK_RESULT(MPI_Wait,
0318                              (m_requests, &stat.m_status));
0319       // Resize our buffer and get ready to receive its data
0320       this->extra::m_ia.resize(this->extra::m_count);
0321       BOOST_MPI_CHECK_RESULT(MPI_Irecv,
0322                              (this->extra::m_ia.address(), this->extra::m_ia.size(), MPI_PACKED,
0323                               stat.source(), stat.tag(), 
0324                               MPI_Comm(m_comm), m_requests + 1));
0325     }
0326 
0327     // Wait until we have received the entire message
0328     BOOST_MPI_CHECK_RESULT(MPI_Wait,
0329                            (m_requests + 1, &stat.m_status));
0330 
0331     this->deserialize(stat);
0332     return stat;    
0333   }
0334   
0335   optional<status> test() {
0336     status stat;
0337     int flag = 0;
0338     
0339     if (m_requests[1] == MPI_REQUEST_NULL) {
0340       // Check if the count message has completed
0341       BOOST_MPI_CHECK_RESULT(MPI_Test,
0342                              (m_requests, &flag, &stat.m_status));
0343       if (flag) {
0344         // Resize our buffer and get ready to receive its data
0345         this->extra::m_ia.resize(this->extra::m_count);
0346         BOOST_MPI_CHECK_RESULT(MPI_Irecv,
0347                                (this->extra::m_ia.address(), this->extra::m_ia.size(),MPI_PACKED,
0348                                 stat.source(), stat.tag(), 
0349                                 MPI_Comm(m_comm), m_requests + 1));
0350       } else
0351         return optional<status>(); // We have not finished yet
0352     } 
0353 
0354     // Check if we have received the message data
0355     BOOST_MPI_CHECK_RESULT(MPI_Test,
0356                            (m_requests + 1, &flag, &stat.m_status));
0357     if (flag) {
0358       this->deserialize(stat);
0359       return stat;
0360     } else 
0361       return optional<status>();
0362   }
0363 };
0364 
0365 template<typename T>
0366 class request::legacy_serialized_array_handler 
0367   : public    request::legacy_handler,
0368     protected detail::serialized_array_irecv_data<T> {
0369   typedef detail::serialized_array_irecv_data<T> extra;
0370 
0371 public:
0372   legacy_serialized_array_handler(communicator const& comm, int source, int tag, T* values, int n)
0373     : legacy_handler(comm, source, tag),
0374       extra(comm, values, n) {
0375     BOOST_MPI_CHECK_RESULT(MPI_Irecv,
0376                            (&this->extra::m_count, 1, 
0377                             get_mpi_datatype(this->extra::m_count),
0378                             source, tag, comm, m_requests+0));
0379   }
0380 
0381   status wait() {
0382     status stat;
0383     if (m_requests[1] == MPI_REQUEST_NULL) {
0384       // Wait for the count message to complete
0385       BOOST_MPI_CHECK_RESULT(MPI_Wait,
0386                              (m_requests, &stat.m_status));
0387       // Resize our buffer and get ready to receive its data
0388       this->extra::m_ia.resize(this->extra::m_count);
0389       BOOST_MPI_CHECK_RESULT(MPI_Irecv,
0390                              (this->extra::m_ia.address(), this->extra::m_ia.size(), MPI_PACKED,
0391                               stat.source(), stat.tag(), 
0392                               MPI_Comm(m_comm), m_requests + 1));
0393     }
0394 
0395     // Wait until we have received the entire message
0396     BOOST_MPI_CHECK_RESULT(MPI_Wait,
0397                            (m_requests + 1, &stat.m_status));
0398 
0399     this->deserialize(stat);
0400     return stat;
0401   }
0402   
0403   optional<status> test() {
0404     status stat;
0405     int flag = 0;
0406     
0407     if (m_requests[1] == MPI_REQUEST_NULL) {
0408       // Check if the count message has completed
0409       BOOST_MPI_CHECK_RESULT(MPI_Test,
0410                              (m_requests, &flag, &stat.m_status));
0411       if (flag) {
0412         // Resize our buffer and get ready to receive its data
0413         this->extra::m_ia.resize(this->extra::m_count);
0414         BOOST_MPI_CHECK_RESULT(MPI_Irecv,
0415                                (this->extra::m_ia.address(), this->extra::m_ia.size(),MPI_PACKED,
0416                                 stat.source(), stat.tag(), 
0417                                 MPI_Comm(m_comm), m_requests + 1));
0418       } else
0419         return optional<status>(); // We have not finished yet
0420     } 
0421 
0422     // Check if we have received the message data
0423     BOOST_MPI_CHECK_RESULT(MPI_Test,
0424                            (m_requests + 1, &flag, &stat.m_status));
0425     if (flag) {
0426       this->deserialize(stat);
0427       return stat;
0428     } else 
0429       return optional<status>();
0430   }
0431 };
0432 
0433 template<typename T, class A>
0434 class request::legacy_dynamic_primitive_array_handler 
0435   : public request::legacy_handler,
0436     protected detail::dynamic_array_irecv_data<T,A>
0437 {
0438   typedef detail::dynamic_array_irecv_data<T,A> extra;
0439 
0440 public:
0441   legacy_dynamic_primitive_array_handler(communicator const& comm, int source, int tag, std::vector<T,A>& values)
0442     : legacy_handler(comm, source, tag),
0443       extra(values) {
0444     BOOST_MPI_CHECK_RESULT(MPI_Irecv,
0445                            (&this->extra::m_count, 1, 
0446                             get_mpi_datatype(this->extra::m_count),
0447                             source, tag, comm, m_requests+0));
0448   }
0449 
0450   status wait() {
0451     status stat;
0452     if (m_requests[1] == MPI_REQUEST_NULL) {
0453       // Wait for the count message to complete
0454       BOOST_MPI_CHECK_RESULT(MPI_Wait,
0455                              (m_requests, &stat.m_status));
0456       // Resize our buffer and get ready to receive its data
0457       this->extra::m_values.resize(this->extra::m_count);
0458       BOOST_MPI_CHECK_RESULT(MPI_Irecv,
0459                              (detail::c_data(this->extra::m_values), this->extra::m_values.size(), get_mpi_datatype<T>(),
0460                               stat.source(), stat.tag(), 
0461                               MPI_Comm(m_comm), m_requests + 1));
0462     }
0463     // Wait until we have received the entire message
0464     BOOST_MPI_CHECK_RESULT(MPI_Wait,
0465                            (m_requests + 1, &stat.m_status));
0466     return stat;    
0467   }
0468 
0469   optional<status> test() {
0470     status stat;
0471     int flag = 0;
0472     
0473     if (m_requests[1] == MPI_REQUEST_NULL) {
0474       // Check if the count message has completed
0475       BOOST_MPI_CHECK_RESULT(MPI_Test,
0476                              (m_requests, &flag, &stat.m_status));
0477       if (flag) {
0478         // Resize our buffer and get ready to receive its data
0479         this->extra::m_values.resize(this->extra::m_count);
0480         BOOST_MPI_CHECK_RESULT(MPI_Irecv,
0481                                (detail::c_data(this->extra::m_values), this->extra::m_values.size(), get_mpi_datatype<T>(),
0482                                 stat.source(), stat.tag(), 
0483                                 MPI_Comm(m_comm), m_requests + 1));
0484       } else
0485         return optional<status>(); // We have not finished yet
0486     } 
0487 
0488     // Check if we have received the message data
0489     BOOST_MPI_CHECK_RESULT(MPI_Test,
0490                            (m_requests + 1, &flag, &stat.m_status));
0491     if (flag) {
0492       return stat;
0493     } else 
0494       return optional<status>();
0495   }
0496 };
0497 
0498 class BOOST_MPI_DECL request::trivial_handler : public request::handler {
0499 
0500 public:
0501   trivial_handler();
0502   
0503   status wait();
0504   optional<status> test();
0505   void cancel();
0506   
0507   bool active() const;
0508   optional<MPI_Request&> trivial();
0509 
0510 private:
0511   friend class request;
0512   MPI_Request      m_request;
0513 };
0514 
0515 class request::dynamic_handler : public request::handler {
0516   dynamic_handler();
0517   
0518   status wait();
0519   optional<status> test();
0520   void cancel();
0521   
0522   bool active() const;
0523   optional<MPI_Request&> trivial();
0524 
0525 private:
0526   friend class request;
0527   MPI_Request      m_requests[2];
0528 };
0529 
0530 template<typename T> 
0531 request request::make_serialized(communicator const& comm, int source, int tag, T& value) {
0532 #if defined(BOOST_MPI_USE_IMPROBE)
0533   return request(new probe_handler<detail::serialized_data<T> >(comm, source, tag, value));
0534 #else
0535   return request(new legacy_serialized_handler<T>(comm, source, tag, value));
0536 #endif
0537 }
0538 
0539 template<typename T>
0540 request request::make_serialized_array(communicator const& comm, int source, int tag, T* values, int n) {
0541 #if defined(BOOST_MPI_USE_IMPROBE)
0542   return request(new probe_handler<detail::serialized_array_data<T> >(comm, source, tag, values, n));
0543 #else
0544   return request(new legacy_serialized_array_handler<T>(comm, source, tag, values, n));
0545 #endif
0546 }
0547 
0548 template<typename T, class A>
0549 request request::make_dynamic_primitive_array_recv(communicator const& comm, int source, int tag, 
0550                                                    std::vector<T,A>& values) {
0551 #if defined(BOOST_MPI_USE_IMPROBE)
0552   return request(new probe_handler<detail::dynamic_primitive_array_data<std::vector<T,A> > >(comm,source,tag,values));
0553 #else
0554   return request(new legacy_dynamic_primitive_array_handler<T,A>(comm, source, tag, values));
0555 #endif
0556 }
0557 
0558 template<typename T>
0559 request
0560 request::make_trivial_send(communicator const& comm, int dest, int tag, T const* values, int n) {
0561   trivial_handler* handler = new trivial_handler;
0562   BOOST_MPI_CHECK_RESULT(MPI_Isend,
0563                          (const_cast<T*>(values), n, 
0564                           get_mpi_datatype<T>(),
0565                           dest, tag, comm, &handler->m_request));
0566   return request(handler);
0567 }
0568 
0569 template<typename T>
0570 request
0571 request::make_trivial_send(communicator const& comm, int dest, int tag, T const& value) {
0572   return make_trivial_send(comm, dest, tag, &value, 1);
0573 }
0574 
0575 template<typename T>
0576 request
0577 request::make_trivial_recv(communicator const& comm, int dest, int tag, T* values, int n) {
0578   trivial_handler* handler = new trivial_handler;
0579   BOOST_MPI_CHECK_RESULT(MPI_Irecv,
0580                          (values, n, 
0581                           get_mpi_datatype<T>(),
0582                           dest, tag, comm, &handler->m_request));
0583   return request(handler);
0584 }
0585 
0586 template<typename T>
0587 request
0588 request::make_trivial_recv(communicator const& comm, int dest, int tag, T& value) {
0589   return make_trivial_recv(comm, dest, tag, &value, 1);
0590 }
0591 
0592 template<typename T, class A>
0593 request request::make_dynamic_primitive_array_send(communicator const& comm, int dest, int tag, 
0594                                                    std::vector<T,A> const& values) {
0595 #if defined(BOOST_MPI_USE_IMPROBE)
0596   return make_trivial_send(comm, dest, tag, values.data(), values.size());
0597 #else
0598   {
0599     // non blocking recv by legacy_dynamic_primitive_array_handler
0600     // blocking recv by status recv_vector(source,tag,value,primitive)
0601     boost::shared_ptr<std::size_t> size(new std::size_t(values.size()));
0602     dynamic_handler* handler = new dynamic_handler;
0603     request req(handler);
0604     req.preserve(size);
0605     
0606     BOOST_MPI_CHECK_RESULT(MPI_Isend,
0607                            (size.get(), 1,
0608                             get_mpi_datatype(*size),
0609                             dest, tag, comm, handler->m_requests+0));
0610     BOOST_MPI_CHECK_RESULT(MPI_Isend,
0611                            (const_cast<T*>(values.data()), *size, 
0612                             get_mpi_datatype<T>(),
0613                             dest, tag, comm, handler->m_requests+1));
0614     return req;
0615   }
0616 #endif
0617 }
0618 
0619 inline
0620 request::legacy_handler::legacy_handler(communicator const& comm, int source, int tag)
0621   : m_comm(comm),
0622     m_source(source),
0623     m_tag(tag)
0624 {
0625   m_requests[0] = MPI_REQUEST_NULL;
0626   m_requests[1] = MPI_REQUEST_NULL;
0627 }
0628     
0629 }}
0630 
0631 #endif // BOOST_MPI_REQUEST_HANDLERS_HPP