File indexing completed on 2025-07-12 08:17:09
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 #ifndef BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP
0012 #define BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP
0013
0014 #ifndef BOOST_CONFIG_HPP
0015 # include <boost/config.hpp>
0016 #endif
0017 #
0018 #if defined(BOOST_HAS_PRAGMA_ONCE)
0019 # pragma once
0020 #endif
0021
0022 #include <boost/interprocess/detail/config_begin.hpp>
0023 #include <boost/interprocess/detail/workaround.hpp>
0024
0025 #include <boost/interprocess/shared_memory_object.hpp>
0026 #include <boost/interprocess/detail/managed_open_or_create_impl.hpp>
0027 #include <boost/interprocess/sync/interprocess_condition.hpp>
0028 #include <boost/interprocess/sync/interprocess_mutex.hpp>
0029 #include <boost/interprocess/sync/scoped_lock.hpp>
0030 #include <boost/interprocess/detail/utilities.hpp>
0031 #include <boost/interprocess/timed_utils.hpp>
0032 #include <boost/interprocess/offset_ptr.hpp>
0033 #include <boost/interprocess/creation_tags.hpp>
0034 #include <boost/interprocess/exceptions.hpp>
0035 #include <boost/interprocess/permissions.hpp>
0036 #include <boost/core/no_exceptions_support.hpp>
0037 #include <boost/interprocess/detail/type_traits.hpp>
0038 #include <boost/intrusive/pointer_traits.hpp>
0039 #include <boost/move/detail/type_traits.hpp> //make_unsigned, alignment_of
0040 #include <boost/intrusive/pointer_traits.hpp>
0041 #include <boost/move/detail/force_ptr.hpp>
0042 #include <boost/assert.hpp>
0043 #include <algorithm> //std::lower_bound
0044 #include <cstddef> //std::size_t
0045 #include <cstring> //memcpy
0046
0047
0048
0049
0050
0051
0052
0053 namespace boost{ namespace interprocess{
0054
0055 namespace ipcdetail
0056 {
0057 template<class VoidPointer>
0058 class msg_queue_initialization_func_t;
0059
0060 }
0061
0062
0063 enum mqblock_types { blocking, timed, non_blocking };
0064
0065
0066
0067 template<class VoidPointer>
0068 class message_queue_t
0069 {
0070 #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
0071
0072 message_queue_t();
0073 #endif
0074
0075 public:
0076 typedef VoidPointer void_pointer;
0077 typedef typename boost::intrusive::
0078 pointer_traits<void_pointer>::template
0079 rebind_pointer<char>::type char_ptr;
0080 typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type;
0081 typedef typename boost::container::dtl::make_unsigned<difference_type>::type size_type;
0082
0083
0084
0085
0086 message_queue_t(create_only_t,
0087 const char *name,
0088 size_type max_num_msg,
0089 size_type max_msg_size,
0090 const permissions &perm = permissions());
0091
0092
0093
0094
0095
0096
0097 message_queue_t(open_or_create_t,
0098 const char *name,
0099 size_type max_num_msg,
0100 size_type max_msg_size,
0101 const permissions &perm = permissions());
0102
0103
0104
0105
0106 message_queue_t(open_only_t, const char *name);
0107
0108 #if defined(BOOST_INTERPROCESS_WCHAR_NAMED_RESOURCES) || defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
0109
0110
0111
0112
0113
0114
0115
0116 message_queue_t(create_only_t,
0117 const wchar_t *name,
0118 size_type max_num_msg,
0119 size_type max_msg_size,
0120 const permissions &perm = permissions());
0121
0122
0123
0124
0125
0126
0127
0128
0129
0130 message_queue_t(open_or_create_t,
0131 const wchar_t *name,
0132 size_type max_num_msg,
0133 size_type max_msg_size,
0134 const permissions &perm = permissions());
0135
0136
0137
0138
0139
0140
0141
0142 message_queue_t(open_only_t, const wchar_t *name);
0143
0144 #endif
0145
0146
0147
0148
0149 message_queue_t(size_type max_num_msg,
0150 size_type max_msg_size);
0151
0152
0153
0154
0155
0156
0157
0158
0159 ~message_queue_t();
0160
0161
0162
0163
0164 void send (const void *buffer, size_type buffer_size,
0165 unsigned int priority);
0166
0167
0168
0169
0170
0171 bool try_send (const void *buffer, size_type buffer_size,
0172 unsigned int priority);
0173
0174
0175
0176
0177
0178
0179 template<class TimePoint>
0180 bool timed_send (const void *buffer, size_type buffer_size,
0181 unsigned int priority, const TimePoint& abs_time);
0182
0183
0184
0185
0186
0187 void receive (void *buffer, size_type buffer_size,
0188 size_type &recvd_size,unsigned int &priority);
0189
0190
0191
0192
0193
0194
0195 bool try_receive (void *buffer, size_type buffer_size,
0196 size_type &recvd_size,unsigned int &priority);
0197
0198
0199
0200
0201
0202
0203
0204 template<class TimePoint>
0205 bool timed_receive (void *buffer, size_type buffer_size,
0206 size_type &recvd_size,unsigned int &priority,
0207 const TimePoint &abs_time);
0208
0209
0210
0211
0212 size_type get_max_msg() const;
0213
0214
0215
0216
0217 size_type get_max_msg_size() const;
0218
0219
0220
0221 size_type get_num_msg() const;
0222
0223
0224
0225 static bool remove(const char *name);
0226
0227 #if defined(BOOST_INTERPROCESS_WCHAR_NAMED_RESOURCES) || defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
0228
0229
0230
0231
0232
0233
0234 static bool remove(const wchar_t *name);
0235
0236 #endif
0237
0238 #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
0239 private:
0240
0241 friend class ipcdetail::msg_queue_initialization_func_t<VoidPointer>;
0242
0243 template<mqblock_types Block, class TimePoint>
0244 bool do_receive(void *buffer, size_type buffer_size,
0245 size_type &recvd_size, unsigned int &priority,
0246 const TimePoint &abs_time);
0247
0248 template<mqblock_types Block, class TimePoint>
0249 bool do_send(const void *buffer, size_type buffer_size,
0250 unsigned int priority, const TimePoint &abs_time);
0251
0252
0253
0254 static size_type get_mem_size(size_type max_msg_size, size_type max_num_msg);
0255 typedef ipcdetail::managed_open_or_create_impl<shared_memory_object, 0, true, false> open_create_impl_t;
0256 open_create_impl_t m_shmem;
0257
0258 template<class Lock, class TimePoint>
0259 static bool do_cond_wait(ipcdetail::bool_<true>, interprocess_condition &cond, Lock &lock, const TimePoint &abs_time)
0260 { return cond.timed_wait(lock, abs_time); }
0261
0262 template<class Lock, class TimePoint>
0263 static bool do_cond_wait(ipcdetail::bool_<false>, interprocess_condition &cond, Lock &lock, const TimePoint &)
0264 { cond.wait(lock); return true; }
0265
0266 #endif
0267 };
0268
0269 #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
0270
0271 namespace ipcdetail {
0272
0273
0274 template<class VoidPointer>
0275 class msg_hdr_t
0276 {
0277 typedef VoidPointer void_pointer;
0278 typedef typename boost::intrusive::
0279 pointer_traits<void_pointer>::template
0280 rebind_pointer<char>::type char_ptr;
0281 typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type;
0282 typedef typename boost::container::dtl::make_unsigned<difference_type>::type size_type;
0283
0284 public:
0285 size_type len;
0286 unsigned int priority;
0287
0288 void * data(){ return this+1; }
0289 };
0290
0291
0292 template<class VoidPointer>
0293 class priority_functor
0294 {
0295 typedef typename boost::intrusive::
0296 pointer_traits<VoidPointer>::template
0297 rebind_pointer<msg_hdr_t<VoidPointer> >::type msg_hdr_ptr_t;
0298
0299 public:
0300 bool operator()(const msg_hdr_ptr_t &msg1,
0301 const msg_hdr_ptr_t &msg2) const
0302 { return msg1->priority < msg2->priority; }
0303 };
0304
0305
0306
0307
0308
0309
0310
0311
0312
0313
0314
0315
0316
0317
0318
0319
0320
0321
0322
0323
0324
0325
0326
0327
0328
0329
0330
0331
0332
0333
0334
0335
0336
0337
0338
0339
0340
0341
0342
0343
0344
0345
0346
0347
0348
0349
0350
0351
0352
0353
0354
0355
0356 template<class VoidPointer>
0357 class mq_hdr_t
0358 : public ipcdetail::priority_functor<VoidPointer>
0359 {
0360 typedef VoidPointer void_pointer;
0361 typedef msg_hdr_t<void_pointer> msg_header;
0362 typedef typename boost::intrusive::
0363 pointer_traits<void_pointer>::template
0364 rebind_pointer<msg_header>::type msg_hdr_ptr_t;
0365 typedef typename boost::intrusive::pointer_traits
0366 <msg_hdr_ptr_t>::difference_type difference_type;
0367 typedef typename boost::container::
0368 dtl::make_unsigned<difference_type>::type size_type;
0369 typedef typename boost::intrusive::
0370 pointer_traits<void_pointer>::template
0371 rebind_pointer<msg_hdr_ptr_t>::type msg_hdr_ptr_ptr_t;
0372 typedef ipcdetail::managed_open_or_create_impl<shared_memory_object, 0, true, false> open_create_impl_t;
0373
0374 public:
0375
0376
0377
0378
0379 mq_hdr_t(size_type max_num_msg, size_type max_msg_size)
0380 : m_max_num_msg(max_num_msg),
0381 m_max_msg_size(max_msg_size),
0382 m_cur_num_msg(0)
0383 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
0384 ,m_cur_first_msg(0u)
0385 ,m_blocked_senders(0u)
0386 ,m_blocked_receivers(0u)
0387 #endif
0388 { this->initialize_memory(); }
0389
0390
0391 bool is_full() const
0392 { return m_cur_num_msg == m_max_num_msg; }
0393
0394
0395 bool is_empty() const
0396 { return !m_cur_num_msg; }
0397
0398
0399 void free_top_msg()
0400 { --m_cur_num_msg; }
0401
0402 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
0403
0404 typedef msg_hdr_ptr_t *iterator;
0405
0406 size_type end_pos() const
0407 {
0408 const size_type space_until_bufend = m_max_num_msg - m_cur_first_msg;
0409 return space_until_bufend > m_cur_num_msg
0410 ? m_cur_first_msg + m_cur_num_msg : m_cur_num_msg - space_until_bufend;
0411 }
0412
0413
0414 msg_header &top_msg()
0415 {
0416 size_type pos = this->end_pos();
0417 return *mp_index[difference_type(pos ? --pos : m_max_num_msg - 1)];
0418 }
0419
0420
0421 msg_header &bottom_msg()
0422 { return *mp_index[difference_type(m_cur_first_msg)]; }
0423
0424 iterator inserted_ptr_begin() const
0425 { return &mp_index[difference_type(m_cur_first_msg)]; }
0426
0427 iterator inserted_ptr_end() const
0428 { return &mp_index[difference_type(this->end_pos())]; }
0429
0430 iterator lower_bound(const msg_hdr_ptr_t & value, priority_functor<VoidPointer> func)
0431 {
0432 iterator begin(this->inserted_ptr_begin()), end(this->inserted_ptr_end());
0433 if(end < begin){
0434 iterator idx_end = &mp_index[difference_type(m_max_num_msg)];
0435 iterator ret = std::lower_bound(begin, idx_end, value, func);
0436 if(idx_end == ret){
0437 iterator idx_beg = &mp_index[0];
0438 ret = std::lower_bound(idx_beg, end, value, func);
0439
0440 BOOST_ASSERT(ret != end);
0441 BOOST_ASSERT(ret != begin);
0442 return ret;
0443 }
0444 else{
0445 return ret;
0446 }
0447 }
0448 else{
0449 return std::lower_bound(begin, end, value, func);
0450 }
0451 }
0452
0453 msg_header & insert_at(iterator where)
0454 {
0455 iterator it_inserted_ptr_end = this->inserted_ptr_end();
0456 iterator it_inserted_ptr_beg = this->inserted_ptr_begin();
0457 if(where == it_inserted_ptr_beg){
0458
0459 m_cur_first_msg = m_cur_first_msg ? m_cur_first_msg : m_max_num_msg;
0460 --m_cur_first_msg;
0461 ++m_cur_num_msg;
0462 return *mp_index[difference_type(m_cur_first_msg)];
0463 }
0464 else if(where == it_inserted_ptr_end){
0465 ++m_cur_num_msg;
0466 return **it_inserted_ptr_end;
0467 }
0468 else{
0469 size_type pos = size_type(where - &mp_index[0]);
0470 size_type circ_pos = pos >= m_cur_first_msg ? pos - m_cur_first_msg : pos + (m_max_num_msg - m_cur_first_msg);
0471
0472 if(circ_pos < m_cur_num_msg/2){
0473
0474
0475 if(!pos){
0476 pos = m_max_num_msg;
0477 where = &mp_index[difference_type(m_max_num_msg-1u)];
0478 }
0479 else{
0480 --where;
0481 }
0482 const bool unique_segment = m_cur_first_msg && m_cur_first_msg <= pos;
0483 const size_type first_segment_beg = unique_segment ? m_cur_first_msg : 1u;
0484 const size_type first_segment_end = pos;
0485 const size_type second_segment_beg = unique_segment || !m_cur_first_msg ? m_max_num_msg : m_cur_first_msg;
0486 const size_type second_segment_end = m_max_num_msg;
0487 const msg_hdr_ptr_t backup = *(&mp_index[0] + (unique_segment ? first_segment_beg : second_segment_beg) - 1);
0488
0489
0490 if(!unique_segment){
0491 std::copy( &mp_index[0] + second_segment_beg
0492 , &mp_index[0] + second_segment_end
0493 , &mp_index[0] + second_segment_beg - 1);
0494 mp_index[difference_type(m_max_num_msg-1u)] = mp_index[0];
0495 }
0496 std::copy( &mp_index[0] + first_segment_beg
0497 , &mp_index[0] + first_segment_end
0498 , &mp_index[0] + first_segment_beg - 1);
0499 *where = backup;
0500 m_cur_first_msg = m_cur_first_msg ? m_cur_first_msg : m_max_num_msg;
0501 --m_cur_first_msg;
0502 ++m_cur_num_msg;
0503 return **where;
0504 }
0505 else{
0506
0507
0508 const size_type pos_end = this->end_pos();
0509 const bool unique_segment = pos < pos_end;
0510 const size_type first_segment_beg = pos;
0511 const size_type first_segment_end = unique_segment ? pos_end : m_max_num_msg-1;
0512 const size_type second_segment_beg = 0u;
0513 const size_type second_segment_end = unique_segment ? 0u : pos_end;
0514 const msg_hdr_ptr_t backup = *it_inserted_ptr_end;
0515
0516
0517 if(!unique_segment){
0518 std::copy_backward( &mp_index[0] + second_segment_beg
0519 , &mp_index[0] + second_segment_end
0520 , &mp_index[0] + second_segment_end + 1u);
0521 mp_index[0] = mp_index[difference_type(m_max_num_msg-1u)];
0522 }
0523 std::copy_backward( &mp_index[0] + first_segment_beg
0524 , &mp_index[0] + first_segment_end
0525 , &mp_index[0] + first_segment_end + 1u);
0526 *where = backup;
0527 ++m_cur_num_msg;
0528 return **where;
0529 }
0530 }
0531 }
0532
0533 #else
0534
0535 typedef msg_hdr_ptr_t *iterator;
0536
0537
0538 msg_header &top_msg()
0539 { return *mp_index[difference_type(m_cur_num_msg-1u)]; }
0540
0541
0542 msg_header &bottom_msg()
0543 { return *mp_index[0]; }
0544
0545 iterator inserted_ptr_begin() const
0546 { return &mp_index[0]; }
0547
0548 iterator inserted_ptr_end() const
0549 { return &mp_index[difference_type(m_cur_num_msg)]; }
0550
0551 iterator lower_bound(const msg_hdr_ptr_t & value, priority_functor<VoidPointer> func)
0552 { return std::lower_bound(this->inserted_ptr_begin(), this->inserted_ptr_end(), value, func); }
0553
0554 msg_header & insert_at(iterator pos)
0555 {
0556 const msg_hdr_ptr_t backup = *inserted_ptr_end();
0557 std::copy_backward(pos, inserted_ptr_end(), inserted_ptr_end()+1);
0558 *pos = backup;
0559 ++m_cur_num_msg;
0560 return **pos;
0561 }
0562
0563 #endif
0564
0565
0566 msg_header & queue_free_msg(unsigned int priority)
0567 {
0568
0569 iterator it (inserted_ptr_begin()), it_end(inserted_ptr_end());
0570
0571 if(m_cur_num_msg && priority > this->bottom_msg().priority){
0572
0573 if(priority > this->top_msg().priority){
0574 it = it_end;
0575 }
0576 else{
0577
0578
0579 msg_header dummy_hdr;
0580 dummy_hdr.priority = priority;
0581
0582
0583 msg_hdr_ptr_t dummy_ptr(&dummy_hdr);
0584
0585
0586 it = this->lower_bound(dummy_ptr, static_cast<priority_functor<VoidPointer>&>(*this));
0587 }
0588 }
0589
0590 return this->insert_at(it);
0591 }
0592
0593
0594
0595
0596 static size_type get_mem_size
0597 (size_type max_msg_size, size_type max_num_msg)
0598 {
0599 const size_type
0600 msg_hdr_align = ::boost::container::dtl::alignment_of<msg_header>::value,
0601 index_align = ::boost::container::dtl::alignment_of<msg_hdr_ptr_t>::value,
0602 r_hdr_size = ipcdetail::ct_rounded_size<sizeof(mq_hdr_t), index_align>::value,
0603 r_index_size = ipcdetail::get_rounded_size<size_type>(max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align),
0604 r_max_msg_size = ipcdetail::get_rounded_size<size_type>(max_msg_size, msg_hdr_align) + sizeof(msg_header);
0605 return r_hdr_size + r_index_size + (max_num_msg*r_max_msg_size) +
0606 open_create_impl_t::ManagedOpenOrCreateUserOffset;
0607 }
0608
0609
0610
0611 void initialize_memory()
0612 {
0613 const size_type
0614 msg_hdr_align = ::boost::container::dtl::alignment_of<msg_header>::value,
0615 index_align = ::boost::container::dtl::alignment_of<msg_hdr_ptr_t>::value,
0616 r_hdr_size = ipcdetail::ct_rounded_size<sizeof(mq_hdr_t), index_align>::value,
0617 r_index_size = ipcdetail::get_rounded_size<size_type>(m_max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align),
0618 r_max_msg_size = ipcdetail::get_rounded_size<size_type>(m_max_msg_size, msg_hdr_align) + sizeof(msg_header);
0619
0620
0621 msg_hdr_ptr_t *index = move_detail::force_ptr<msg_hdr_ptr_t*>
0622 (reinterpret_cast<char*>(this)+r_hdr_size);
0623
0624
0625 msg_header *msg_hdr = move_detail::force_ptr<msg_header*>
0626 (reinterpret_cast<char*>(this)+r_hdr_size+r_index_size);
0627
0628
0629 mp_index = index;
0630
0631
0632 for(size_type i = 0; i < m_max_num_msg; ++i){
0633 index[i] = msg_hdr;
0634 msg_hdr = move_detail::force_ptr<msg_header*>
0635 (reinterpret_cast<char*>(msg_hdr)+r_max_msg_size);
0636 }
0637 }
0638
0639 public:
0640
0641 msg_hdr_ptr_ptr_t mp_index;
0642
0643 const size_type m_max_num_msg;
0644
0645 const size_type m_max_msg_size;
0646
0647 size_type m_cur_num_msg;
0648
0649 interprocess_mutex m_mutex;
0650
0651 interprocess_condition m_cond_recv;
0652
0653 interprocess_condition m_cond_send;
0654 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
0655
0656 size_type m_cur_first_msg;
0657 size_type m_blocked_senders;
0658 size_type m_blocked_receivers;
0659 #endif
0660 };
0661
0662
0663
0664
0665 template<class VoidPointer>
0666 class msg_queue_initialization_func_t
0667 {
0668 public:
0669 typedef typename boost::intrusive::
0670 pointer_traits<VoidPointer>::template
0671 rebind_pointer<char>::type char_ptr;
0672 typedef typename boost::intrusive::pointer_traits<char_ptr>::
0673 difference_type difference_type;
0674 typedef typename boost::container::dtl::
0675 make_unsigned<difference_type>::type size_type;
0676
0677 msg_queue_initialization_func_t(size_type maxmsg = 0,
0678 size_type maxmsgsize = 0)
0679 : m_maxmsg (maxmsg), m_maxmsgsize(maxmsgsize) {}
0680
0681 bool operator()(void *address, size_type, bool created)
0682 {
0683 char *mptr;
0684
0685 if(created){
0686 mptr = reinterpret_cast<char*>(address);
0687
0688 BOOST_TRY{
0689 new (mptr) mq_hdr_t<VoidPointer>(m_maxmsg, m_maxmsgsize);
0690 }
0691 BOOST_CATCH(...){
0692 return false;
0693 } BOOST_CATCH_END
0694 }
0695 return true;
0696 }
0697
0698 std::size_t get_min_size() const
0699 {
0700 return mq_hdr_t<VoidPointer>::get_mem_size(m_maxmsgsize, m_maxmsg)
0701 - message_queue_t<VoidPointer>::open_create_impl_t::ManagedOpenOrCreateUserOffset;
0702 }
0703
0704 const size_type m_maxmsg;
0705 const size_type m_maxmsgsize;
0706 };
0707
0708 }
0709
0710 template<class VoidPointer>
0711 inline message_queue_t<VoidPointer>::~message_queue_t()
0712 {}
0713
0714 template<class VoidPointer>
0715 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_mem_size
0716 (size_type max_msg_size, size_type max_num_msg)
0717 { return ipcdetail::mq_hdr_t<VoidPointer>::get_mem_size(max_msg_size, max_num_msg); }
0718
0719 template<class VoidPointer>
0720 inline message_queue_t<VoidPointer>::message_queue_t(create_only_t,
0721 const char *name,
0722 size_type max_num_msg,
0723 size_type max_msg_size,
0724 const permissions &perm)
0725
0726 : m_shmem(create_only,
0727 name,
0728 get_mem_size(max_msg_size, max_num_msg),
0729 read_write,
0730 static_cast<void*>(0),
0731
0732 ipcdetail::msg_queue_initialization_func_t<VoidPointer> (max_num_msg, max_msg_size),
0733 perm)
0734 {}
0735
0736 template<class VoidPointer>
0737 inline message_queue_t<VoidPointer>::message_queue_t(open_or_create_t,
0738 const char *name,
0739 size_type max_num_msg,
0740 size_type max_msg_size,
0741 const permissions &perm)
0742
0743 : m_shmem(open_or_create,
0744 name,
0745 get_mem_size(max_msg_size, max_num_msg),
0746 read_write,
0747 static_cast<void*>(0),
0748
0749 ipcdetail::msg_queue_initialization_func_t<VoidPointer> (max_num_msg, max_msg_size),
0750 perm)
0751 {}
0752
0753 template<class VoidPointer>
0754 inline message_queue_t<VoidPointer>::message_queue_t(open_only_t, const char *name)
0755
0756 : m_shmem(open_only,
0757 name,
0758 read_write,
0759 static_cast<void*>(0),
0760
0761 ipcdetail::msg_queue_initialization_func_t<VoidPointer> ())
0762 {}
0763
0764 #if defined(BOOST_INTERPROCESS_WCHAR_NAMED_RESOURCES) || defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
0765
0766 template<class VoidPointer>
0767 inline message_queue_t<VoidPointer>::message_queue_t(create_only_t,
0768 const wchar_t *name,
0769 size_type max_num_msg,
0770 size_type max_msg_size,
0771 const permissions &perm)
0772
0773 : m_shmem(create_only,
0774 name,
0775 get_mem_size(max_msg_size, max_num_msg),
0776 read_write,
0777 static_cast<void*>(0),
0778
0779 ipcdetail::msg_queue_initialization_func_t<VoidPointer> (max_num_msg, max_msg_size),
0780 perm)
0781 {}
0782
0783 template<class VoidPointer>
0784 inline message_queue_t<VoidPointer>::message_queue_t(open_or_create_t,
0785 const wchar_t *name,
0786 size_type max_num_msg,
0787 size_type max_msg_size,
0788 const permissions &perm)
0789
0790 : m_shmem(open_or_create,
0791 name,
0792 get_mem_size(max_msg_size, max_num_msg),
0793 read_write,
0794 static_cast<void*>(0),
0795
0796 ipcdetail::msg_queue_initialization_func_t<VoidPointer> (max_num_msg, max_msg_size),
0797 perm)
0798 {}
0799
0800 template<class VoidPointer>
0801 inline message_queue_t<VoidPointer>::message_queue_t(open_only_t, const wchar_t *name)
0802
0803 : m_shmem(open_only,
0804 name,
0805 read_write,
0806 static_cast<void*>(0),
0807
0808 ipcdetail::msg_queue_initialization_func_t<VoidPointer> ())
0809 {}
0810
0811 #endif
0812
0813 template <class VoidPointer>
0814 inline message_queue_t<VoidPointer>::message_queue_t(size_type max_num_msg,
0815 size_type max_msg_size)
0816 : m_shmem(get_mem_size(max_msg_size, max_num_msg),
0817 static_cast<void*>(0),
0818
0819 ipcdetail::msg_queue_initialization_func_t<VoidPointer> (max_num_msg, max_msg_size))
0820 {}
0821
0822 template<class VoidPointer>
0823 inline void message_queue_t<VoidPointer>::send
0824 (const void *buffer, size_type buffer_size, unsigned int priority)
0825 { this->do_send<blocking>(buffer, buffer_size, priority, 0); }
0826
0827 template<class VoidPointer>
0828 inline bool message_queue_t<VoidPointer>::try_send
0829 (const void *buffer, size_type buffer_size, unsigned int priority)
0830 { return this->do_send<non_blocking>(buffer, buffer_size, priority, 0); }
0831
0832 template<class VoidPointer>
0833 template<class TimePoint>
0834 inline bool message_queue_t<VoidPointer>::timed_send
0835 (const void *buffer, size_type buffer_size
0836 ,unsigned int priority, const TimePoint &abs_time)
0837 {
0838 if(ipcdetail::is_pos_infinity(abs_time)){
0839 this->send(buffer, buffer_size, priority);
0840 return true;
0841 }
0842 return this->do_send<timed>(buffer, buffer_size, priority, abs_time);
0843 }
0844
0845 template<class VoidPointer>
0846 template<mqblock_types Block, class TimePoint>
0847 inline bool message_queue_t<VoidPointer>::do_send(
0848 const void *buffer, size_type buffer_size,
0849 unsigned int priority, const TimePoint &abs_time)
0850 {
0851 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
0852
0853 if (buffer_size > p_hdr->m_max_msg_size) {
0854 throw interprocess_exception(size_error);
0855 }
0856
0857 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
0858 bool notify_blocked_receivers = false;
0859 #endif
0860
0861 scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);
0862
0863 {
0864
0865 if (p_hdr->is_full()) {
0866 BOOST_TRY{
0867 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
0868 ++p_hdr->m_blocked_senders;
0869 #endif
0870 switch(Block){
0871 case non_blocking :
0872 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
0873 --p_hdr->m_blocked_senders;
0874 #endif
0875 return false;
0876 break;
0877
0878 case blocking :
0879 do{
0880 (void)do_cond_wait(ipcdetail::bool_<false>(), p_hdr->m_cond_send, lock, abs_time);
0881 }
0882 while (p_hdr->is_full());
0883 break;
0884
0885 case timed :
0886 do{
0887 if(!do_cond_wait(ipcdetail::bool_<Block == timed>(), p_hdr->m_cond_send, lock, abs_time)) {
0888 if(p_hdr->is_full()){
0889 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
0890 --p_hdr->m_blocked_senders;
0891 #endif
0892 return false;
0893 }
0894 break;
0895 }
0896 }
0897 while (p_hdr->is_full());
0898 break;
0899 default:
0900 break;
0901 }
0902 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
0903 --p_hdr->m_blocked_senders;
0904 #endif
0905 }
0906 BOOST_CATCH(...){
0907 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
0908 --p_hdr->m_blocked_senders;
0909 #endif
0910 BOOST_RETHROW;
0911 } BOOST_CATCH_END
0912 }
0913
0914 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
0915 notify_blocked_receivers = 0 != p_hdr->m_blocked_receivers;
0916 #endif
0917
0918 ipcdetail::msg_hdr_t<VoidPointer> &free_msg_hdr = p_hdr->queue_free_msg(priority);
0919
0920
0921 BOOST_ASSERT(free_msg_hdr.priority == 0);
0922 BOOST_ASSERT(free_msg_hdr.len == 0);
0923
0924
0925 free_msg_hdr.priority = priority;
0926 free_msg_hdr.len = buffer_size;
0927
0928
0929 std::memcpy(free_msg_hdr.data(), buffer, buffer_size);
0930 }
0931
0932
0933
0934
0935 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
0936 if (notify_blocked_receivers){
0937 p_hdr->m_cond_recv.notify_one();
0938 }
0939 #else
0940 p_hdr->m_cond_recv.notify_one();
0941 #endif
0942
0943 return true;
0944 }
0945
0946 template<class VoidPointer>
0947 inline void message_queue_t<VoidPointer>::receive(void *buffer, size_type buffer_size,
0948 size_type &recvd_size, unsigned int &priority)
0949 { this->do_receive<blocking>(buffer, buffer_size, recvd_size, priority, 0); }
0950
0951 template<class VoidPointer>
0952 inline bool
0953 message_queue_t<VoidPointer>::try_receive(void *buffer, size_type buffer_size,
0954 size_type &recvd_size, unsigned int &priority)
0955 { return this->do_receive<non_blocking>(buffer, buffer_size, recvd_size, priority, 0); }
0956
0957 template<class VoidPointer>
0958 template<class TimePoint>
0959 inline bool
0960 message_queue_t<VoidPointer>::timed_receive(void *buffer, size_type buffer_size,
0961 size_type &recvd_size, unsigned int &priority,
0962 const TimePoint &abs_time)
0963 {
0964 if(ipcdetail::is_pos_infinity(abs_time)){
0965 this->receive(buffer, buffer_size, recvd_size, priority);
0966 return true;
0967 }
0968 return this->do_receive<timed>(buffer, buffer_size, recvd_size, priority, abs_time);
0969 }
0970
0971 template<class VoidPointer>
0972 template<mqblock_types Block, class TimePoint>
0973 inline bool
0974 message_queue_t<VoidPointer>::do_receive(
0975 void *buffer, size_type buffer_size,
0976 size_type &recvd_size, unsigned int &priority,
0977 const TimePoint &abs_time)
0978 {
0979 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
0980
0981 if (buffer_size < p_hdr->m_max_msg_size) {
0982 throw interprocess_exception(size_error);
0983 }
0984
0985 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
0986 bool notify_blocked_senders = false;
0987 #endif
0988
0989 scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);
0990
0991 {
0992
0993 if (p_hdr->is_empty()) {
0994 BOOST_TRY{
0995 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
0996 ++p_hdr->m_blocked_receivers;
0997 #endif
0998 switch(Block){
0999 case non_blocking :
1000 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
1001 --p_hdr->m_blocked_receivers;
1002 #endif
1003 return false;
1004 break;
1005
1006 case blocking :
1007 do{
1008 (void)do_cond_wait(ipcdetail::bool_<false>(), p_hdr->m_cond_recv, lock, abs_time);
1009 }
1010 while (p_hdr->is_empty());
1011 break;
1012
1013 case timed :
1014 do{
1015 if(!do_cond_wait(ipcdetail::bool_<Block == timed>(), p_hdr->m_cond_recv, lock, abs_time)) {
1016 if(p_hdr->is_empty()){
1017 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
1018 --p_hdr->m_blocked_receivers;
1019 #endif
1020 return false;
1021 }
1022 break;
1023 }
1024 }
1025 while (p_hdr->is_empty());
1026 break;
1027
1028
1029 default:
1030 break;
1031 }
1032 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
1033 --p_hdr->m_blocked_receivers;
1034 #endif
1035 }
1036 BOOST_CATCH(...){
1037 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
1038 --p_hdr->m_blocked_receivers;
1039 #endif
1040 BOOST_RETHROW;
1041 } BOOST_CATCH_END
1042 }
1043
1044 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
1045 notify_blocked_senders = 0 != p_hdr->m_blocked_senders;
1046 #endif
1047
1048
1049 ipcdetail::msg_hdr_t<VoidPointer> &top_msg = p_hdr->top_msg();
1050
1051
1052 recvd_size = top_msg.len;
1053 priority = top_msg.priority;
1054
1055
1056 top_msg.len = 0;
1057 top_msg.priority = 0;
1058
1059
1060 std::memcpy(buffer, top_msg.data(), recvd_size);
1061
1062
1063 p_hdr->free_top_msg();
1064 }
1065
1066
1067
1068
1069 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
1070 if (notify_blocked_senders){
1071 p_hdr->m_cond_send.notify_one();
1072 }
1073 #else
1074 p_hdr->m_cond_send.notify_one();
1075 #endif
1076
1077 return true;
1078 }
1079
1080 template<class VoidPointer>
1081 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_max_msg() const
1082 {
1083 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
1084 return p_hdr ? p_hdr->m_max_num_msg : 0; }
1085
1086 template<class VoidPointer>
1087 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_max_msg_size() const
1088 {
1089 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
1090 return p_hdr ? p_hdr->m_max_msg_size : 0;
1091 }
1092
1093 template<class VoidPointer>
1094 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_num_msg() const
1095 {
1096 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
1097 if(p_hdr){
1098
1099 scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);
1100
1101 return p_hdr->m_cur_num_msg;
1102 }
1103
1104 return 0;
1105 }
1106
1107 template<class VoidPointer>
1108 inline bool message_queue_t<VoidPointer>::remove(const char *name)
1109 { return shared_memory_object::remove(name); }
1110
1111 #if defined(BOOST_INTERPROCESS_WCHAR_NAMED_RESOURCES) || defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
1112
1113 template<class VoidPointer>
1114 inline bool message_queue_t<VoidPointer>::remove(const wchar_t *name)
1115 { return shared_memory_object::remove(name); }
1116
1117 #endif
1118
1119 #else
1120
1121
1122
1123 typedef message_queue_t<offset_ptr<void> > message_queue;
1124
1125 #endif
1126
1127 }}
1128
1129 #include <boost/interprocess/detail/config_end.hpp>
1130
1131 #endif