File indexing completed on 2025-09-15 08:38:23
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 #ifndef BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP
0012 #define BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP
0013
0014 #ifndef BOOST_CONFIG_HPP
0015 # include <boost/config.hpp>
0016 #endif
0017 #
0018 #if defined(BOOST_HAS_PRAGMA_ONCE)
0019 # pragma once
0020 #endif
0021
0022 #include <boost/interprocess/detail/config_begin.hpp>
0023 #include <boost/interprocess/detail/workaround.hpp>
0024
0025 #include <boost/interprocess/shared_memory_object.hpp>
0026 #include <boost/interprocess/detail/managed_open_or_create_impl.hpp>
0027 #include <boost/interprocess/sync/interprocess_condition.hpp>
0028 #include <boost/interprocess/sync/interprocess_mutex.hpp>
0029 #include <boost/interprocess/sync/scoped_lock.hpp>
0030 #include <boost/interprocess/detail/utilities.hpp>
0031 #include <boost/interprocess/timed_utils.hpp>
0032 #include <boost/interprocess/offset_ptr.hpp>
0033 #include <boost/interprocess/creation_tags.hpp>
0034 #include <boost/interprocess/exceptions.hpp>
0035 #include <boost/interprocess/permissions.hpp>
0036 #include <boost/interprocess/detail/type_traits.hpp>
0037 #include <boost/intrusive/pointer_traits.hpp>
0038 #include <boost/move/detail/type_traits.hpp> //make_unsigned, alignment_of
0039 #include <boost/intrusive/pointer_traits.hpp>
0040 #include <boost/move/detail/force_ptr.hpp>
0041 #include <boost/assert.hpp>
0042 #include <algorithm> //std::lower_bound
0043 #include <cstddef> //std::size_t
0044 #include <cstring> //memcpy
0045
0046
0047
0048
0049
0050
0051
0052 namespace boost{ namespace interprocess{
0053
0054 namespace ipcdetail
0055 {
0056 template<class VoidPointer>
0057 class msg_queue_initialization_func_t;
0058
0059 }
0060
0061
0062 enum mqblock_types { blocking, timed, non_blocking };
0063
0064
0065
0066 template<class VoidPointer>
0067 class message_queue_t
0068 {
0069 #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
0070
0071 message_queue_t();
0072 #endif
0073
0074 public:
0075 typedef VoidPointer void_pointer;
0076 typedef typename boost::intrusive::
0077 pointer_traits<void_pointer>::template
0078 rebind_pointer<char>::type char_ptr;
0079 typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type;
0080 typedef typename boost::container::dtl::make_unsigned<difference_type>::type size_type;
0081
0082
0083
0084
0085 message_queue_t(create_only_t,
0086 const char *name,
0087 size_type max_num_msg,
0088 size_type max_msg_size,
0089 const permissions &perm = permissions());
0090
0091
0092
0093
0094
0095
0096 message_queue_t(open_or_create_t,
0097 const char *name,
0098 size_type max_num_msg,
0099 size_type max_msg_size,
0100 const permissions &perm = permissions());
0101
0102
0103
0104
0105 message_queue_t(open_only_t, const char *name);
0106
0107 #if defined(BOOST_INTERPROCESS_WCHAR_NAMED_RESOURCES) || defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
0108
0109
0110
0111
0112
0113
0114
0115 message_queue_t(create_only_t,
0116 const wchar_t *name,
0117 size_type max_num_msg,
0118 size_type max_msg_size,
0119 const permissions &perm = permissions());
0120
0121
0122
0123
0124
0125
0126
0127
0128
0129 message_queue_t(open_or_create_t,
0130 const wchar_t *name,
0131 size_type max_num_msg,
0132 size_type max_msg_size,
0133 const permissions &perm = permissions());
0134
0135
0136
0137
0138
0139
0140
0141 message_queue_t(open_only_t, const wchar_t *name);
0142
0143 #endif
0144
0145
0146
0147
0148 message_queue_t(size_type max_num_msg,
0149 size_type max_msg_size);
0150
0151
0152
0153
0154
0155
0156
0157
0158 ~message_queue_t();
0159
0160
0161
0162
0163 void send (const void *buffer, size_type buffer_size,
0164 unsigned int priority);
0165
0166
0167
0168
0169
0170 bool try_send (const void *buffer, size_type buffer_size,
0171 unsigned int priority);
0172
0173
0174
0175
0176
0177
0178 template<class TimePoint>
0179 bool timed_send (const void *buffer, size_type buffer_size,
0180 unsigned int priority, const TimePoint& abs_time);
0181
0182
0183
0184
0185
0186 void receive (void *buffer, size_type buffer_size,
0187 size_type &recvd_size,unsigned int &priority);
0188
0189
0190
0191
0192
0193
0194 bool try_receive (void *buffer, size_type buffer_size,
0195 size_type &recvd_size,unsigned int &priority);
0196
0197
0198
0199
0200
0201
0202
0203 template<class TimePoint>
0204 bool timed_receive (void *buffer, size_type buffer_size,
0205 size_type &recvd_size,unsigned int &priority,
0206 const TimePoint &abs_time);
0207
0208
0209
0210
0211 size_type get_max_msg() const;
0212
0213
0214
0215
0216 size_type get_max_msg_size() const;
0217
0218
0219
0220 size_type get_num_msg() const;
0221
0222
0223
0224 static bool remove(const char *name);
0225
0226 #if defined(BOOST_INTERPROCESS_WCHAR_NAMED_RESOURCES) || defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
0227
0228
0229
0230
0231
0232
0233 static bool remove(const wchar_t *name);
0234
0235 #endif
0236
0237 #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
0238 private:
0239
0240 friend class ipcdetail::msg_queue_initialization_func_t<VoidPointer>;
0241
0242 template<mqblock_types Block, class TimePoint>
0243 bool do_receive(void *buffer, size_type buffer_size,
0244 size_type &recvd_size, unsigned int &priority,
0245 const TimePoint &abs_time);
0246
0247 template<mqblock_types Block, class TimePoint>
0248 bool do_send(const void *buffer, size_type buffer_size,
0249 unsigned int priority, const TimePoint &abs_time);
0250
0251
0252
0253 static size_type get_mem_size(size_type max_msg_size, size_type max_num_msg);
0254 typedef ipcdetail::managed_open_or_create_impl<shared_memory_object, 0, true, false> open_create_impl_t;
0255 open_create_impl_t m_shmem;
0256
0257 template<class Lock, class TimePoint>
0258 static bool do_cond_wait(ipcdetail::bool_<true>, interprocess_condition &cond, Lock &lock, const TimePoint &abs_time)
0259 { return cond.timed_wait(lock, abs_time); }
0260
0261 template<class Lock, class TimePoint>
0262 static bool do_cond_wait(ipcdetail::bool_<false>, interprocess_condition &cond, Lock &lock, const TimePoint &)
0263 { cond.wait(lock); return true; }
0264
0265 #endif
0266 };
0267
0268 #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
0269
0270 namespace ipcdetail {
0271
0272
0273 template<class VoidPointer>
0274 class msg_hdr_t
0275 {
0276 typedef VoidPointer void_pointer;
0277 typedef typename boost::intrusive::
0278 pointer_traits<void_pointer>::template
0279 rebind_pointer<char>::type char_ptr;
0280 typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type;
0281 typedef typename boost::container::dtl::make_unsigned<difference_type>::type size_type;
0282
0283 public:
0284 size_type len;
0285 unsigned int priority;
0286
0287 void * data(){ return this+1; }
0288 };
0289
0290
0291 template<class VoidPointer>
0292 class priority_functor
0293 {
0294 typedef typename boost::intrusive::
0295 pointer_traits<VoidPointer>::template
0296 rebind_pointer<msg_hdr_t<VoidPointer> >::type msg_hdr_ptr_t;
0297
0298 public:
0299 bool operator()(const msg_hdr_ptr_t &msg1,
0300 const msg_hdr_ptr_t &msg2) const
0301 { return msg1->priority < msg2->priority; }
0302 };
0303
0304
0305
0306
0307
0308
0309
0310
0311
0312
0313
0314
0315
0316
0317
0318
0319
0320
0321
0322
0323
0324
0325
0326
0327
0328
0329
0330
0331
0332
0333
0334
0335
0336
0337
0338
0339
0340
0341
0342
0343
0344
0345
0346
0347
0348
0349
0350
0351
0352
0353
0354
0355 template<class VoidPointer>
0356 class mq_hdr_t
0357 : public ipcdetail::priority_functor<VoidPointer>
0358 {
0359 typedef VoidPointer void_pointer;
0360 typedef msg_hdr_t<void_pointer> msg_header;
0361 typedef typename boost::intrusive::
0362 pointer_traits<void_pointer>::template
0363 rebind_pointer<msg_header>::type msg_hdr_ptr_t;
0364 typedef typename boost::intrusive::pointer_traits
0365 <msg_hdr_ptr_t>::difference_type difference_type;
0366 typedef typename boost::container::
0367 dtl::make_unsigned<difference_type>::type size_type;
0368 typedef typename boost::intrusive::
0369 pointer_traits<void_pointer>::template
0370 rebind_pointer<msg_hdr_ptr_t>::type msg_hdr_ptr_ptr_t;
0371 typedef ipcdetail::managed_open_or_create_impl<shared_memory_object, 0, true, false> open_create_impl_t;
0372
0373 public:
0374
0375
0376
0377
0378 mq_hdr_t(size_type max_num_msg, size_type max_msg_size)
0379 : m_max_num_msg(max_num_msg),
0380 m_max_msg_size(max_msg_size),
0381 m_cur_num_msg(0)
0382 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
0383 ,m_cur_first_msg(0u)
0384 ,m_blocked_senders(0u)
0385 ,m_blocked_receivers(0u)
0386 #endif
0387 { this->initialize_memory(); }
0388
0389
0390 bool is_full() const
0391 { return m_cur_num_msg == m_max_num_msg; }
0392
0393
0394 bool is_empty() const
0395 { return !m_cur_num_msg; }
0396
0397
0398 void free_top_msg()
0399 { --m_cur_num_msg; }
0400
0401 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
0402
0403 typedef msg_hdr_ptr_t *iterator;
0404
0405 size_type end_pos() const
0406 {
0407 const size_type space_until_bufend = m_max_num_msg - m_cur_first_msg;
0408 return space_until_bufend > m_cur_num_msg
0409 ? m_cur_first_msg + m_cur_num_msg : m_cur_num_msg - space_until_bufend;
0410 }
0411
0412
0413 msg_header &top_msg()
0414 {
0415 size_type pos = this->end_pos();
0416 return *mp_index[difference_type(pos ? --pos : m_max_num_msg - 1)];
0417 }
0418
0419
0420 msg_header &bottom_msg()
0421 { return *mp_index[difference_type(m_cur_first_msg)]; }
0422
0423 iterator inserted_ptr_begin() const
0424 { return &mp_index[difference_type(m_cur_first_msg)]; }
0425
0426 iterator inserted_ptr_end() const
0427 { return &mp_index[difference_type(this->end_pos())]; }
0428
0429 iterator lower_bound(const msg_hdr_ptr_t & value, priority_functor<VoidPointer> func)
0430 {
0431 iterator begin(this->inserted_ptr_begin()), end(this->inserted_ptr_end());
0432 if(end < begin){
0433 iterator idx_end = &mp_index[difference_type(m_max_num_msg)];
0434 iterator ret = std::lower_bound(begin, idx_end, value, func);
0435 if(idx_end == ret){
0436 iterator idx_beg = &mp_index[0];
0437 ret = std::lower_bound(idx_beg, end, value, func);
0438
0439 BOOST_ASSERT(ret != end);
0440 BOOST_ASSERT(ret != begin);
0441 return ret;
0442 }
0443 else{
0444 return ret;
0445 }
0446 }
0447 else{
0448 return std::lower_bound(begin, end, value, func);
0449 }
0450 }
0451
0452 msg_header & insert_at(iterator where)
0453 {
0454 iterator it_inserted_ptr_end = this->inserted_ptr_end();
0455 iterator it_inserted_ptr_beg = this->inserted_ptr_begin();
0456 if(where == it_inserted_ptr_beg){
0457
0458 m_cur_first_msg = m_cur_first_msg ? m_cur_first_msg : m_max_num_msg;
0459 --m_cur_first_msg;
0460 ++m_cur_num_msg;
0461 return *mp_index[difference_type(m_cur_first_msg)];
0462 }
0463 else if(where == it_inserted_ptr_end){
0464 ++m_cur_num_msg;
0465 return **it_inserted_ptr_end;
0466 }
0467 else{
0468 size_type pos = size_type(where - &mp_index[0]);
0469 size_type circ_pos = pos >= m_cur_first_msg ? pos - m_cur_first_msg : pos + (m_max_num_msg - m_cur_first_msg);
0470
0471 if(circ_pos < m_cur_num_msg/2){
0472
0473
0474 if(!pos){
0475 pos = m_max_num_msg;
0476 where = &mp_index[difference_type(m_max_num_msg-1u)];
0477 }
0478 else{
0479 --where;
0480 }
0481 const bool unique_segment = m_cur_first_msg && m_cur_first_msg <= pos;
0482 const size_type first_segment_beg = unique_segment ? m_cur_first_msg : 1u;
0483 const size_type first_segment_end = pos;
0484 const size_type second_segment_beg = unique_segment || !m_cur_first_msg ? m_max_num_msg : m_cur_first_msg;
0485 const size_type second_segment_end = m_max_num_msg;
0486 const msg_hdr_ptr_t backup = *(&mp_index[0] + (unique_segment ? first_segment_beg : second_segment_beg) - 1);
0487
0488
0489 if(!unique_segment){
0490 std::copy( &mp_index[0] + second_segment_beg
0491 , &mp_index[0] + second_segment_end
0492 , &mp_index[0] + second_segment_beg - 1);
0493 mp_index[difference_type(m_max_num_msg-1u)] = mp_index[0];
0494 }
0495 std::copy( &mp_index[0] + first_segment_beg
0496 , &mp_index[0] + first_segment_end
0497 , &mp_index[0] + first_segment_beg - 1);
0498 *where = backup;
0499 m_cur_first_msg = m_cur_first_msg ? m_cur_first_msg : m_max_num_msg;
0500 --m_cur_first_msg;
0501 ++m_cur_num_msg;
0502 return **where;
0503 }
0504 else{
0505
0506
0507 const size_type pos_end = this->end_pos();
0508 const bool unique_segment = pos < pos_end;
0509 const size_type first_segment_beg = pos;
0510 const size_type first_segment_end = unique_segment ? pos_end : m_max_num_msg-1;
0511 const size_type second_segment_beg = 0u;
0512 const size_type second_segment_end = unique_segment ? 0u : pos_end;
0513 const msg_hdr_ptr_t backup = *it_inserted_ptr_end;
0514
0515
0516 if(!unique_segment){
0517 std::copy_backward( &mp_index[0] + second_segment_beg
0518 , &mp_index[0] + second_segment_end
0519 , &mp_index[0] + second_segment_end + 1u);
0520 mp_index[0] = mp_index[difference_type(m_max_num_msg-1u)];
0521 }
0522 std::copy_backward( &mp_index[0] + first_segment_beg
0523 , &mp_index[0] + first_segment_end
0524 , &mp_index[0] + first_segment_end + 1u);
0525 *where = backup;
0526 ++m_cur_num_msg;
0527 return **where;
0528 }
0529 }
0530 }
0531
0532 #else
0533
0534 typedef msg_hdr_ptr_t *iterator;
0535
0536
0537 msg_header &top_msg()
0538 { return *mp_index[difference_type(m_cur_num_msg-1u)]; }
0539
0540
0541 msg_header &bottom_msg()
0542 { return *mp_index[0]; }
0543
0544 iterator inserted_ptr_begin() const
0545 { return &mp_index[0]; }
0546
0547 iterator inserted_ptr_end() const
0548 { return &mp_index[difference_type(m_cur_num_msg)]; }
0549
0550 iterator lower_bound(const msg_hdr_ptr_t & value, priority_functor<VoidPointer> func)
0551 { return std::lower_bound(this->inserted_ptr_begin(), this->inserted_ptr_end(), value, func); }
0552
0553 msg_header & insert_at(iterator pos)
0554 {
0555 const msg_hdr_ptr_t backup = *inserted_ptr_end();
0556 std::copy_backward(pos, inserted_ptr_end(), inserted_ptr_end()+1);
0557 *pos = backup;
0558 ++m_cur_num_msg;
0559 return **pos;
0560 }
0561
0562 #endif
0563
0564
0565 msg_header & queue_free_msg(unsigned int priority)
0566 {
0567
0568 iterator it (inserted_ptr_begin()), it_end(inserted_ptr_end());
0569
0570 if(m_cur_num_msg && priority > this->bottom_msg().priority){
0571
0572 if(priority > this->top_msg().priority){
0573 it = it_end;
0574 }
0575 else{
0576
0577
0578 msg_header dummy_hdr;
0579 dummy_hdr.priority = priority;
0580
0581
0582 msg_hdr_ptr_t dummy_ptr(&dummy_hdr);
0583
0584
0585 it = this->lower_bound(dummy_ptr, static_cast<priority_functor<VoidPointer>&>(*this));
0586 }
0587 }
0588
0589 return this->insert_at(it);
0590 }
0591
0592
0593
0594
0595 static size_type get_mem_size
0596 (size_type max_msg_size, size_type max_num_msg)
0597 {
0598 const size_type
0599 msg_hdr_align = ::boost::container::dtl::alignment_of<msg_header>::value,
0600 index_align = ::boost::container::dtl::alignment_of<msg_hdr_ptr_t>::value,
0601 r_hdr_size = ipcdetail::ct_rounded_size<sizeof(mq_hdr_t), index_align>::value,
0602 r_index_size = ipcdetail::get_rounded_size<size_type>(max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align),
0603 r_max_msg_size = ipcdetail::get_rounded_size<size_type>(max_msg_size, msg_hdr_align) + sizeof(msg_header);
0604 return r_hdr_size + r_index_size + (max_num_msg*r_max_msg_size) +
0605 open_create_impl_t::ManagedOpenOrCreateUserOffset;
0606 }
0607
0608
0609
0610 void initialize_memory()
0611 {
0612 const size_type
0613 msg_hdr_align = ::boost::container::dtl::alignment_of<msg_header>::value,
0614 index_align = ::boost::container::dtl::alignment_of<msg_hdr_ptr_t>::value,
0615 r_hdr_size = ipcdetail::ct_rounded_size<sizeof(mq_hdr_t), index_align>::value,
0616 r_index_size = ipcdetail::get_rounded_size<size_type>(m_max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align),
0617 r_max_msg_size = ipcdetail::get_rounded_size<size_type>(m_max_msg_size, msg_hdr_align) + sizeof(msg_header);
0618
0619
0620 msg_hdr_ptr_t *index = move_detail::force_ptr<msg_hdr_ptr_t*>
0621 (reinterpret_cast<char*>(this)+r_hdr_size);
0622
0623
0624 msg_header *msg_hdr = move_detail::force_ptr<msg_header*>
0625 (reinterpret_cast<char*>(this)+r_hdr_size+r_index_size);
0626
0627
0628 mp_index = index;
0629
0630
0631 for(size_type i = 0; i < m_max_num_msg; ++i){
0632 index[i] = msg_hdr;
0633 msg_hdr = move_detail::force_ptr<msg_header*>
0634 (reinterpret_cast<char*>(msg_hdr)+r_max_msg_size);
0635 }
0636 }
0637
0638 public:
0639
0640 msg_hdr_ptr_ptr_t mp_index;
0641
0642 const size_type m_max_num_msg;
0643
0644 const size_type m_max_msg_size;
0645
0646 size_type m_cur_num_msg;
0647
0648 interprocess_mutex m_mutex;
0649
0650 interprocess_condition m_cond_recv;
0651
0652 interprocess_condition m_cond_send;
0653 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
0654
0655 size_type m_cur_first_msg;
0656 size_type m_blocked_senders;
0657 size_type m_blocked_receivers;
0658 #endif
0659 };
0660
0661
0662
0663
0664 template<class VoidPointer>
0665 class msg_queue_initialization_func_t
0666 {
0667 public:
0668 typedef typename boost::intrusive::
0669 pointer_traits<VoidPointer>::template
0670 rebind_pointer<char>::type char_ptr;
0671 typedef typename boost::intrusive::pointer_traits<char_ptr>::
0672 difference_type difference_type;
0673 typedef typename boost::container::dtl::
0674 make_unsigned<difference_type>::type size_type;
0675
0676 msg_queue_initialization_func_t(size_type maxmsg = 0,
0677 size_type maxmsgsize = 0)
0678 : m_maxmsg (maxmsg), m_maxmsgsize(maxmsgsize) {}
0679
0680 bool operator()(void *address, size_type, bool created)
0681 {
0682 char *mptr;
0683
0684 if(created){
0685 mptr = reinterpret_cast<char*>(address);
0686
0687 BOOST_INTERPROCESS_TRY{
0688 new (mptr) mq_hdr_t<VoidPointer>(m_maxmsg, m_maxmsgsize);
0689 }
0690 BOOST_INTERPROCESS_CATCH(...){
0691 return false;
0692 } BOOST_INTERPROCESS_CATCH_END
0693 }
0694 return true;
0695 }
0696
0697 std::size_t get_min_size() const
0698 {
0699 return mq_hdr_t<VoidPointer>::get_mem_size(m_maxmsgsize, m_maxmsg)
0700 - message_queue_t<VoidPointer>::open_create_impl_t::ManagedOpenOrCreateUserOffset;
0701 }
0702
0703 const size_type m_maxmsg;
0704 const size_type m_maxmsgsize;
0705 };
0706
0707 }
0708
0709 template<class VoidPointer>
0710 inline message_queue_t<VoidPointer>::~message_queue_t()
0711 {}
0712
0713 template<class VoidPointer>
0714 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_mem_size
0715 (size_type max_msg_size, size_type max_num_msg)
0716 { return ipcdetail::mq_hdr_t<VoidPointer>::get_mem_size(max_msg_size, max_num_msg); }
0717
0718 template<class VoidPointer>
0719 inline message_queue_t<VoidPointer>::message_queue_t(create_only_t,
0720 const char *name,
0721 size_type max_num_msg,
0722 size_type max_msg_size,
0723 const permissions &perm)
0724
0725 : m_shmem(create_only,
0726 name,
0727 get_mem_size(max_msg_size, max_num_msg),
0728 read_write,
0729 static_cast<void*>(0),
0730
0731 ipcdetail::msg_queue_initialization_func_t<VoidPointer> (max_num_msg, max_msg_size),
0732 perm)
0733 {}
0734
0735 template<class VoidPointer>
0736 inline message_queue_t<VoidPointer>::message_queue_t(open_or_create_t,
0737 const char *name,
0738 size_type max_num_msg,
0739 size_type max_msg_size,
0740 const permissions &perm)
0741
0742 : m_shmem(open_or_create,
0743 name,
0744 get_mem_size(max_msg_size, max_num_msg),
0745 read_write,
0746 static_cast<void*>(0),
0747
0748 ipcdetail::msg_queue_initialization_func_t<VoidPointer> (max_num_msg, max_msg_size),
0749 perm)
0750 {}
0751
0752 template<class VoidPointer>
0753 inline message_queue_t<VoidPointer>::message_queue_t(open_only_t, const char *name)
0754
0755 : m_shmem(open_only,
0756 name,
0757 read_write,
0758 static_cast<void*>(0),
0759
0760 ipcdetail::msg_queue_initialization_func_t<VoidPointer> ())
0761 {}
0762
0763 #if defined(BOOST_INTERPROCESS_WCHAR_NAMED_RESOURCES) || defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
0764
0765 template<class VoidPointer>
0766 inline message_queue_t<VoidPointer>::message_queue_t(create_only_t,
0767 const wchar_t *name,
0768 size_type max_num_msg,
0769 size_type max_msg_size,
0770 const permissions &perm)
0771
0772 : m_shmem(create_only,
0773 name,
0774 get_mem_size(max_msg_size, max_num_msg),
0775 read_write,
0776 static_cast<void*>(0),
0777
0778 ipcdetail::msg_queue_initialization_func_t<VoidPointer> (max_num_msg, max_msg_size),
0779 perm)
0780 {}
0781
0782 template<class VoidPointer>
0783 inline message_queue_t<VoidPointer>::message_queue_t(open_or_create_t,
0784 const wchar_t *name,
0785 size_type max_num_msg,
0786 size_type max_msg_size,
0787 const permissions &perm)
0788
0789 : m_shmem(open_or_create,
0790 name,
0791 get_mem_size(max_msg_size, max_num_msg),
0792 read_write,
0793 static_cast<void*>(0),
0794
0795 ipcdetail::msg_queue_initialization_func_t<VoidPointer> (max_num_msg, max_msg_size),
0796 perm)
0797 {}
0798
0799 template<class VoidPointer>
0800 inline message_queue_t<VoidPointer>::message_queue_t(open_only_t, const wchar_t *name)
0801
0802 : m_shmem(open_only,
0803 name,
0804 read_write,
0805 static_cast<void*>(0),
0806
0807 ipcdetail::msg_queue_initialization_func_t<VoidPointer> ())
0808 {}
0809
0810 #endif
0811
0812 template <class VoidPointer>
0813 inline message_queue_t<VoidPointer>::message_queue_t(size_type max_num_msg,
0814 size_type max_msg_size)
0815 : m_shmem(get_mem_size(max_msg_size, max_num_msg),
0816 static_cast<void*>(0),
0817
0818 ipcdetail::msg_queue_initialization_func_t<VoidPointer> (max_num_msg, max_msg_size))
0819 {}
0820
0821 template<class VoidPointer>
0822 inline void message_queue_t<VoidPointer>::send
0823 (const void *buffer, size_type buffer_size, unsigned int priority)
0824 { this->do_send<blocking>(buffer, buffer_size, priority, 0); }
0825
0826 template<class VoidPointer>
0827 inline bool message_queue_t<VoidPointer>::try_send
0828 (const void *buffer, size_type buffer_size, unsigned int priority)
0829 { return this->do_send<non_blocking>(buffer, buffer_size, priority, 0); }
0830
0831 template<class VoidPointer>
0832 template<class TimePoint>
0833 inline bool message_queue_t<VoidPointer>::timed_send
0834 (const void *buffer, size_type buffer_size
0835 ,unsigned int priority, const TimePoint &abs_time)
0836 {
0837 if(ipcdetail::is_pos_infinity(abs_time)){
0838 this->send(buffer, buffer_size, priority);
0839 return true;
0840 }
0841 return this->do_send<timed>(buffer, buffer_size, priority, abs_time);
0842 }
0843
0844 template<class VoidPointer>
0845 template<mqblock_types Block, class TimePoint>
0846 inline bool message_queue_t<VoidPointer>::do_send(
0847 const void *buffer, size_type buffer_size,
0848 unsigned int priority, const TimePoint &abs_time)
0849 {
0850 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
0851
0852 if (buffer_size > p_hdr->m_max_msg_size) {
0853 throw interprocess_exception(size_error);
0854 }
0855
0856 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
0857 bool notify_blocked_receivers = false;
0858 #endif
0859
0860 scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);
0861
0862 {
0863
0864 if (p_hdr->is_full()) {
0865 BOOST_INTERPROCESS_TRY{
0866 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
0867 ++p_hdr->m_blocked_senders;
0868 #endif
0869 switch(Block){
0870 case non_blocking :
0871 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
0872 --p_hdr->m_blocked_senders;
0873 #endif
0874 return false;
0875 break;
0876
0877 case blocking :
0878 do{
0879 (void)do_cond_wait(ipcdetail::bool_<false>(), p_hdr->m_cond_send, lock, abs_time);
0880 }
0881 while (p_hdr->is_full());
0882 break;
0883
0884 case timed :
0885 do{
0886 if(!do_cond_wait(ipcdetail::bool_<Block == timed>(), p_hdr->m_cond_send, lock, abs_time)) {
0887 if(p_hdr->is_full()){
0888 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
0889 --p_hdr->m_blocked_senders;
0890 #endif
0891 return false;
0892 }
0893 break;
0894 }
0895 }
0896 while (p_hdr->is_full());
0897 break;
0898 default:
0899 break;
0900 }
0901 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
0902 --p_hdr->m_blocked_senders;
0903 #endif
0904 }
0905 BOOST_INTERPROCESS_CATCH(...){
0906 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
0907 --p_hdr->m_blocked_senders;
0908 #endif
0909 BOOST_INTERPROCESS_RETHROW;
0910 } BOOST_INTERPROCESS_CATCH_END
0911 }
0912
0913 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
0914 notify_blocked_receivers = 0 != p_hdr->m_blocked_receivers;
0915 #endif
0916
0917 ipcdetail::msg_hdr_t<VoidPointer> &free_msg_hdr = p_hdr->queue_free_msg(priority);
0918
0919
0920 BOOST_ASSERT(free_msg_hdr.priority == 0);
0921 BOOST_ASSERT(free_msg_hdr.len == 0);
0922
0923
0924 free_msg_hdr.priority = priority;
0925 free_msg_hdr.len = buffer_size;
0926
0927
0928 std::memcpy(free_msg_hdr.data(), buffer, buffer_size);
0929 }
0930
0931
0932
0933
0934 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
0935 if (notify_blocked_receivers){
0936 p_hdr->m_cond_recv.notify_one();
0937 }
0938 #else
0939 p_hdr->m_cond_recv.notify_one();
0940 #endif
0941
0942 return true;
0943 }
0944
0945 template<class VoidPointer>
0946 inline void message_queue_t<VoidPointer>::receive(void *buffer, size_type buffer_size,
0947 size_type &recvd_size, unsigned int &priority)
0948 { this->do_receive<blocking>(buffer, buffer_size, recvd_size, priority, 0); }
0949
0950 template<class VoidPointer>
0951 inline bool
0952 message_queue_t<VoidPointer>::try_receive(void *buffer, size_type buffer_size,
0953 size_type &recvd_size, unsigned int &priority)
0954 { return this->do_receive<non_blocking>(buffer, buffer_size, recvd_size, priority, 0); }
0955
0956 template<class VoidPointer>
0957 template<class TimePoint>
0958 inline bool
0959 message_queue_t<VoidPointer>::timed_receive(void *buffer, size_type buffer_size,
0960 size_type &recvd_size, unsigned int &priority,
0961 const TimePoint &abs_time)
0962 {
0963 if(ipcdetail::is_pos_infinity(abs_time)){
0964 this->receive(buffer, buffer_size, recvd_size, priority);
0965 return true;
0966 }
0967 return this->do_receive<timed>(buffer, buffer_size, recvd_size, priority, abs_time);
0968 }
0969
0970 template<class VoidPointer>
0971 template<mqblock_types Block, class TimePoint>
0972 inline bool
0973 message_queue_t<VoidPointer>::do_receive(
0974 void *buffer, size_type buffer_size,
0975 size_type &recvd_size, unsigned int &priority,
0976 const TimePoint &abs_time)
0977 {
0978 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
0979
0980 if (buffer_size < p_hdr->m_max_msg_size) {
0981 throw interprocess_exception(size_error);
0982 }
0983
0984 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
0985 bool notify_blocked_senders = false;
0986 #endif
0987
0988 scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);
0989
0990 {
0991
0992 if (p_hdr->is_empty()) {
0993 BOOST_INTERPROCESS_TRY{
0994 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
0995 ++p_hdr->m_blocked_receivers;
0996 #endif
0997 switch(Block){
0998 case non_blocking :
0999 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
1000 --p_hdr->m_blocked_receivers;
1001 #endif
1002 return false;
1003 break;
1004
1005 case blocking :
1006 do{
1007 (void)do_cond_wait(ipcdetail::bool_<false>(), p_hdr->m_cond_recv, lock, abs_time);
1008 }
1009 while (p_hdr->is_empty());
1010 break;
1011
1012 case timed :
1013 do{
1014 if(!do_cond_wait(ipcdetail::bool_<Block == timed>(), p_hdr->m_cond_recv, lock, abs_time)) {
1015 if(p_hdr->is_empty()){
1016 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
1017 --p_hdr->m_blocked_receivers;
1018 #endif
1019 return false;
1020 }
1021 break;
1022 }
1023 }
1024 while (p_hdr->is_empty());
1025 break;
1026
1027
1028 default:
1029 break;
1030 }
1031 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
1032 --p_hdr->m_blocked_receivers;
1033 #endif
1034 }
1035 BOOST_INTERPROCESS_CATCH(...){
1036 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
1037 --p_hdr->m_blocked_receivers;
1038 #endif
1039 BOOST_INTERPROCESS_RETHROW;
1040 } BOOST_INTERPROCESS_CATCH_END
1041 }
1042
1043 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
1044 notify_blocked_senders = 0 != p_hdr->m_blocked_senders;
1045 #endif
1046
1047
1048 ipcdetail::msg_hdr_t<VoidPointer> &top_msg = p_hdr->top_msg();
1049
1050
1051 recvd_size = top_msg.len;
1052 priority = top_msg.priority;
1053
1054
1055 top_msg.len = 0;
1056 top_msg.priority = 0;
1057
1058
1059 std::memcpy(buffer, top_msg.data(), recvd_size);
1060
1061
1062 p_hdr->free_top_msg();
1063 }
1064
1065
1066
1067
1068 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
1069 if (notify_blocked_senders){
1070 p_hdr->m_cond_send.notify_one();
1071 }
1072 #else
1073 p_hdr->m_cond_send.notify_one();
1074 #endif
1075
1076 return true;
1077 }
1078
1079 template<class VoidPointer>
1080 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_max_msg() const
1081 {
1082 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
1083 return p_hdr ? p_hdr->m_max_num_msg : 0; }
1084
1085 template<class VoidPointer>
1086 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_max_msg_size() const
1087 {
1088 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
1089 return p_hdr ? p_hdr->m_max_msg_size : 0;
1090 }
1091
1092 template<class VoidPointer>
1093 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_num_msg() const
1094 {
1095 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
1096 if(p_hdr){
1097
1098 scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);
1099
1100 return p_hdr->m_cur_num_msg;
1101 }
1102
1103 return 0;
1104 }
1105
1106 template<class VoidPointer>
1107 inline bool message_queue_t<VoidPointer>::remove(const char *name)
1108 { return shared_memory_object::remove(name); }
1109
1110 #if defined(BOOST_INTERPROCESS_WCHAR_NAMED_RESOURCES) || defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
1111
1112 template<class VoidPointer>
1113 inline bool message_queue_t<VoidPointer>::remove(const wchar_t *name)
1114 { return shared_memory_object::remove(name); }
1115
1116 #endif
1117
1118 #else
1119
1120
1121
1122 typedef message_queue_t<offset_ptr<void> > message_queue;
1123
1124 #endif
1125
1126 }}
1127
1128 #include <boost/interprocess/detail/config_end.hpp>
1129
1130 #endif