File indexing completed on 2025-01-18 09:38:29
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 #ifndef BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP
0012 #define BOOST_INTERPROCESS_MESSAGE_QUEUE_HPP
0013
0014 #ifndef BOOST_CONFIG_HPP
0015 # include <boost/config.hpp>
0016 #endif
0017 #
0018 #if defined(BOOST_HAS_PRAGMA_ONCE)
0019 # pragma once
0020 #endif
0021
0022 #include <boost/interprocess/detail/config_begin.hpp>
0023 #include <boost/interprocess/detail/workaround.hpp>
0024
0025 #include <boost/interprocess/shared_memory_object.hpp>
0026 #include <boost/interprocess/detail/managed_open_or_create_impl.hpp>
0027 #include <boost/interprocess/sync/interprocess_condition.hpp>
0028 #include <boost/interprocess/sync/interprocess_mutex.hpp>
0029 #include <boost/interprocess/sync/scoped_lock.hpp>
0030 #include <boost/interprocess/detail/utilities.hpp>
0031 #include <boost/interprocess/detail/timed_utils.hpp>
0032 #include <boost/interprocess/offset_ptr.hpp>
0033 #include <boost/interprocess/creation_tags.hpp>
0034 #include <boost/interprocess/exceptions.hpp>
0035 #include <boost/interprocess/permissions.hpp>
0036 #include <boost/core/no_exceptions_support.hpp>
0037 #include <boost/interprocess/detail/type_traits.hpp>
0038 #include <boost/intrusive/pointer_traits.hpp>
0039 #include <boost/move/detail/type_traits.hpp> //make_unsigned, alignment_of
0040 #include <boost/intrusive/pointer_traits.hpp>
0041 #include <boost/move/detail/force_ptr.hpp>
0042 #include <boost/assert.hpp>
0043 #include <algorithm> //std::lower_bound
0044 #include <cstddef> //std::size_t
0045 #include <cstring> //memcpy
0046
0047
0048
0049
0050
0051
0052
0053 namespace boost{ namespace interprocess{
0054
0055 namespace ipcdetail
0056 {
0057 template<class VoidPointer>
0058 class msg_queue_initialization_func_t;
0059
0060 }
0061
0062
0063 enum mqblock_types { blocking, timed, non_blocking };
0064
0065
0066
0067 template<class VoidPointer>
0068 class message_queue_t
0069 {
0070 #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
0071
0072 message_queue_t();
0073 #endif
0074
0075 public:
0076 typedef VoidPointer void_pointer;
0077 typedef typename boost::intrusive::
0078 pointer_traits<void_pointer>::template
0079 rebind_pointer<char>::type char_ptr;
0080 typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type;
0081 typedef typename boost::container::dtl::make_unsigned<difference_type>::type size_type;
0082
0083
0084
0085
0086 message_queue_t(create_only_t,
0087 const char *name,
0088 size_type max_num_msg,
0089 size_type max_msg_size,
0090 const permissions &perm = permissions());
0091
0092
0093
0094
0095
0096
0097 message_queue_t(open_or_create_t,
0098 const char *name,
0099 size_type max_num_msg,
0100 size_type max_msg_size,
0101 const permissions &perm = permissions());
0102
0103
0104
0105
0106 message_queue_t(open_only_t, const char *name);
0107
0108 #if defined(BOOST_INTERPROCESS_WCHAR_NAMED_RESOURCES) || defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
0109
0110
0111
0112
0113
0114
0115
0116 message_queue_t(create_only_t,
0117 const wchar_t *name,
0118 size_type max_num_msg,
0119 size_type max_msg_size,
0120 const permissions &perm = permissions());
0121
0122
0123
0124
0125
0126
0127
0128
0129
0130 message_queue_t(open_or_create_t,
0131 const wchar_t *name,
0132 size_type max_num_msg,
0133 size_type max_msg_size,
0134 const permissions &perm = permissions());
0135
0136
0137
0138
0139
0140
0141
0142 message_queue_t(open_only_t, const wchar_t *name);
0143
0144 #endif
0145
0146
0147
0148
0149
0150
0151
0152
0153 ~message_queue_t();
0154
0155
0156
0157
0158 void send (const void *buffer, size_type buffer_size,
0159 unsigned int priority);
0160
0161
0162
0163
0164
0165 bool try_send (const void *buffer, size_type buffer_size,
0166 unsigned int priority);
0167
0168
0169
0170
0171
0172
0173 template<class TimePoint>
0174 bool timed_send (const void *buffer, size_type buffer_size,
0175 unsigned int priority, const TimePoint& abs_time);
0176
0177
0178
0179
0180
0181 void receive (void *buffer, size_type buffer_size,
0182 size_type &recvd_size,unsigned int &priority);
0183
0184
0185
0186
0187
0188
0189 bool try_receive (void *buffer, size_type buffer_size,
0190 size_type &recvd_size,unsigned int &priority);
0191
0192
0193
0194
0195
0196
0197
0198 template<class TimePoint>
0199 bool timed_receive (void *buffer, size_type buffer_size,
0200 size_type &recvd_size,unsigned int &priority,
0201 const TimePoint &abs_time);
0202
0203
0204
0205
0206 size_type get_max_msg() const;
0207
0208
0209
0210
0211 size_type get_max_msg_size() const;
0212
0213
0214
0215 size_type get_num_msg() const;
0216
0217
0218
0219 static bool remove(const char *name);
0220
0221 #if defined(BOOST_INTERPROCESS_WCHAR_NAMED_RESOURCES) || defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
0222
0223
0224
0225
0226
0227
0228 static bool remove(const wchar_t *name);
0229
0230 #endif
0231
0232 #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
0233 private:
0234
0235 friend class ipcdetail::msg_queue_initialization_func_t<VoidPointer>;
0236
0237 template<mqblock_types Block, class TimePoint>
0238 bool do_receive(void *buffer, size_type buffer_size,
0239 size_type &recvd_size, unsigned int &priority,
0240 const TimePoint &abs_time);
0241
0242 template<mqblock_types Block, class TimePoint>
0243 bool do_send(const void *buffer, size_type buffer_size,
0244 unsigned int priority, const TimePoint &abs_time);
0245
0246
0247
0248 static size_type get_mem_size(size_type max_msg_size, size_type max_num_msg);
0249 typedef ipcdetail::managed_open_or_create_impl<shared_memory_object, 0, true, false> open_create_impl_t;
0250 open_create_impl_t m_shmem;
0251
0252 template<class Lock, class TimePoint>
0253 static bool do_cond_wait(ipcdetail::bool_<true>, interprocess_condition &cond, Lock &lock, const TimePoint &abs_time)
0254 { return cond.timed_wait(lock, abs_time); }
0255
0256 template<class Lock, class TimePoint>
0257 static bool do_cond_wait(ipcdetail::bool_<false>, interprocess_condition &cond, Lock &lock, const TimePoint &)
0258 { cond.wait(lock); return true; }
0259
0260 #endif
0261 };
0262
0263 #if !defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
0264
0265 namespace ipcdetail {
0266
0267
0268 template<class VoidPointer>
0269 class msg_hdr_t
0270 {
0271 typedef VoidPointer void_pointer;
0272 typedef typename boost::intrusive::
0273 pointer_traits<void_pointer>::template
0274 rebind_pointer<char>::type char_ptr;
0275 typedef typename boost::intrusive::pointer_traits<char_ptr>::difference_type difference_type;
0276 typedef typename boost::container::dtl::make_unsigned<difference_type>::type size_type;
0277
0278 public:
0279 size_type len;
0280 unsigned int priority;
0281
0282 void * data(){ return this+1; }
0283 };
0284
0285
0286 template<class VoidPointer>
0287 class priority_functor
0288 {
0289 typedef typename boost::intrusive::
0290 pointer_traits<VoidPointer>::template
0291 rebind_pointer<msg_hdr_t<VoidPointer> >::type msg_hdr_ptr_t;
0292
0293 public:
0294 bool operator()(const msg_hdr_ptr_t &msg1,
0295 const msg_hdr_ptr_t &msg2) const
0296 { return msg1->priority < msg2->priority; }
0297 };
0298
0299
0300
0301
0302
0303
0304
0305
0306
0307
0308
0309
0310
0311
0312
0313
0314
0315
0316
0317
0318
0319
0320
0321
0322
0323
0324
0325
0326
0327
0328
0329
0330
0331
0332
0333
0334
0335
0336
0337
0338
0339
0340
0341
0342
0343
0344
0345
0346
0347
0348
0349
0350 template<class VoidPointer>
0351 class mq_hdr_t
0352 : public ipcdetail::priority_functor<VoidPointer>
0353 {
0354 typedef VoidPointer void_pointer;
0355 typedef msg_hdr_t<void_pointer> msg_header;
0356 typedef typename boost::intrusive::
0357 pointer_traits<void_pointer>::template
0358 rebind_pointer<msg_header>::type msg_hdr_ptr_t;
0359 typedef typename boost::intrusive::pointer_traits
0360 <msg_hdr_ptr_t>::difference_type difference_type;
0361 typedef typename boost::container::
0362 dtl::make_unsigned<difference_type>::type size_type;
0363 typedef typename boost::intrusive::
0364 pointer_traits<void_pointer>::template
0365 rebind_pointer<msg_hdr_ptr_t>::type msg_hdr_ptr_ptr_t;
0366 typedef ipcdetail::managed_open_or_create_impl<shared_memory_object, 0, true, false> open_create_impl_t;
0367
0368 public:
0369
0370
0371
0372
0373 mq_hdr_t(size_type max_num_msg, size_type max_msg_size)
0374 : m_max_num_msg(max_num_msg),
0375 m_max_msg_size(max_msg_size),
0376 m_cur_num_msg(0)
0377 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
0378 ,m_cur_first_msg(0u)
0379 ,m_blocked_senders(0u)
0380 ,m_blocked_receivers(0u)
0381 #endif
0382 { this->initialize_memory(); }
0383
0384
0385 bool is_full() const
0386 { return m_cur_num_msg == m_max_num_msg; }
0387
0388
0389 bool is_empty() const
0390 { return !m_cur_num_msg; }
0391
0392
0393 void free_top_msg()
0394 { --m_cur_num_msg; }
0395
0396 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
0397
0398 typedef msg_hdr_ptr_t *iterator;
0399
0400 size_type end_pos() const
0401 {
0402 const size_type space_until_bufend = m_max_num_msg - m_cur_first_msg;
0403 return space_until_bufend > m_cur_num_msg
0404 ? m_cur_first_msg + m_cur_num_msg : m_cur_num_msg - space_until_bufend;
0405 }
0406
0407
0408 msg_header &top_msg()
0409 {
0410 size_type pos = this->end_pos();
0411 return *mp_index[difference_type(pos ? --pos : m_max_num_msg - 1)];
0412 }
0413
0414
0415 msg_header &bottom_msg()
0416 { return *mp_index[difference_type(m_cur_first_msg)]; }
0417
0418 iterator inserted_ptr_begin() const
0419 { return &mp_index[difference_type(m_cur_first_msg)]; }
0420
0421 iterator inserted_ptr_end() const
0422 { return &mp_index[difference_type(this->end_pos())]; }
0423
0424 iterator lower_bound(const msg_hdr_ptr_t & value, priority_functor<VoidPointer> func)
0425 {
0426 iterator begin(this->inserted_ptr_begin()), end(this->inserted_ptr_end());
0427 if(end < begin){
0428 iterator idx_end = &mp_index[difference_type(m_max_num_msg)];
0429 iterator ret = std::lower_bound(begin, idx_end, value, func);
0430 if(idx_end == ret){
0431 iterator idx_beg = &mp_index[0];
0432 ret = std::lower_bound(idx_beg, end, value, func);
0433
0434 BOOST_ASSERT(ret != end);
0435 BOOST_ASSERT(ret != begin);
0436 return ret;
0437 }
0438 else{
0439 return ret;
0440 }
0441 }
0442 else{
0443 return std::lower_bound(begin, end, value, func);
0444 }
0445 }
0446
0447 msg_header & insert_at(iterator where)
0448 {
0449 iterator it_inserted_ptr_end = this->inserted_ptr_end();
0450 iterator it_inserted_ptr_beg = this->inserted_ptr_begin();
0451 if(where == it_inserted_ptr_beg){
0452
0453 m_cur_first_msg = m_cur_first_msg ? m_cur_first_msg : m_max_num_msg;
0454 --m_cur_first_msg;
0455 ++m_cur_num_msg;
0456 return *mp_index[difference_type(m_cur_first_msg)];
0457 }
0458 else if(where == it_inserted_ptr_end){
0459 ++m_cur_num_msg;
0460 return **it_inserted_ptr_end;
0461 }
0462 else{
0463 size_type pos = size_type(where - &mp_index[0]);
0464 size_type circ_pos = pos >= m_cur_first_msg ? pos - m_cur_first_msg : pos + (m_max_num_msg - m_cur_first_msg);
0465
0466 if(circ_pos < m_cur_num_msg/2){
0467
0468
0469 if(!pos){
0470 pos = m_max_num_msg;
0471 where = &mp_index[difference_type(m_max_num_msg-1u)];
0472 }
0473 else{
0474 --where;
0475 }
0476 const bool unique_segment = m_cur_first_msg && m_cur_first_msg <= pos;
0477 const size_type first_segment_beg = unique_segment ? m_cur_first_msg : 1u;
0478 const size_type first_segment_end = pos;
0479 const size_type second_segment_beg = unique_segment || !m_cur_first_msg ? m_max_num_msg : m_cur_first_msg;
0480 const size_type second_segment_end = m_max_num_msg;
0481 const msg_hdr_ptr_t backup = *(&mp_index[0] + (unique_segment ? first_segment_beg : second_segment_beg) - 1);
0482
0483
0484 if(!unique_segment){
0485 std::copy( &mp_index[0] + second_segment_beg
0486 , &mp_index[0] + second_segment_end
0487 , &mp_index[0] + second_segment_beg - 1);
0488 mp_index[difference_type(m_max_num_msg-1u)] = mp_index[0];
0489 }
0490 std::copy( &mp_index[0] + first_segment_beg
0491 , &mp_index[0] + first_segment_end
0492 , &mp_index[0] + first_segment_beg - 1);
0493 *where = backup;
0494 m_cur_first_msg = m_cur_first_msg ? m_cur_first_msg : m_max_num_msg;
0495 --m_cur_first_msg;
0496 ++m_cur_num_msg;
0497 return **where;
0498 }
0499 else{
0500
0501
0502 const size_type pos_end = this->end_pos();
0503 const bool unique_segment = pos < pos_end;
0504 const size_type first_segment_beg = pos;
0505 const size_type first_segment_end = unique_segment ? pos_end : m_max_num_msg-1;
0506 const size_type second_segment_beg = 0u;
0507 const size_type second_segment_end = unique_segment ? 0u : pos_end;
0508 const msg_hdr_ptr_t backup = *it_inserted_ptr_end;
0509
0510
0511 if(!unique_segment){
0512 std::copy_backward( &mp_index[0] + second_segment_beg
0513 , &mp_index[0] + second_segment_end
0514 , &mp_index[0] + second_segment_end + 1u);
0515 mp_index[0] = mp_index[difference_type(m_max_num_msg-1u)];
0516 }
0517 std::copy_backward( &mp_index[0] + first_segment_beg
0518 , &mp_index[0] + first_segment_end
0519 , &mp_index[0] + first_segment_end + 1u);
0520 *where = backup;
0521 ++m_cur_num_msg;
0522 return **where;
0523 }
0524 }
0525 }
0526
0527 #else
0528
0529 typedef msg_hdr_ptr_t *iterator;
0530
0531
0532 msg_header &top_msg()
0533 { return *mp_index[difference_type(m_cur_num_msg-1u)]; }
0534
0535
0536 msg_header &bottom_msg()
0537 { return *mp_index[0]; }
0538
0539 iterator inserted_ptr_begin() const
0540 { return &mp_index[0]; }
0541
0542 iterator inserted_ptr_end() const
0543 { return &mp_index[difference_type(m_cur_num_msg)]; }
0544
0545 iterator lower_bound(const msg_hdr_ptr_t & value, priority_functor<VoidPointer> func)
0546 { return std::lower_bound(this->inserted_ptr_begin(), this->inserted_ptr_end(), value, func); }
0547
0548 msg_header & insert_at(iterator pos)
0549 {
0550 const msg_hdr_ptr_t backup = *inserted_ptr_end();
0551 std::copy_backward(pos, inserted_ptr_end(), inserted_ptr_end()+1);
0552 *pos = backup;
0553 ++m_cur_num_msg;
0554 return **pos;
0555 }
0556
0557 #endif
0558
0559
0560 msg_header & queue_free_msg(unsigned int priority)
0561 {
0562
0563 iterator it (inserted_ptr_begin()), it_end(inserted_ptr_end());
0564
0565 if(m_cur_num_msg && priority > this->bottom_msg().priority){
0566
0567 if(priority > this->top_msg().priority){
0568 it = it_end;
0569 }
0570 else{
0571
0572
0573 msg_header dummy_hdr;
0574 dummy_hdr.priority = priority;
0575
0576
0577 msg_hdr_ptr_t dummy_ptr(&dummy_hdr);
0578
0579
0580 it = this->lower_bound(dummy_ptr, static_cast<priority_functor<VoidPointer>&>(*this));
0581 }
0582 }
0583
0584 return this->insert_at(it);
0585 }
0586
0587
0588
0589
0590 static size_type get_mem_size
0591 (size_type max_msg_size, size_type max_num_msg)
0592 {
0593 const size_type
0594 msg_hdr_align = ::boost::container::dtl::alignment_of<msg_header>::value,
0595 index_align = ::boost::container::dtl::alignment_of<msg_hdr_ptr_t>::value,
0596 r_hdr_size = ipcdetail::ct_rounded_size<sizeof(mq_hdr_t), index_align>::value,
0597 r_index_size = ipcdetail::get_rounded_size<size_type>(max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align),
0598 r_max_msg_size = ipcdetail::get_rounded_size<size_type>(max_msg_size, msg_hdr_align) + sizeof(msg_header);
0599 return r_hdr_size + r_index_size + (max_num_msg*r_max_msg_size) +
0600 open_create_impl_t::ManagedOpenOrCreateUserOffset;
0601 }
0602
0603
0604
0605 void initialize_memory()
0606 {
0607 const size_type
0608 msg_hdr_align = ::boost::container::dtl::alignment_of<msg_header>::value,
0609 index_align = ::boost::container::dtl::alignment_of<msg_hdr_ptr_t>::value,
0610 r_hdr_size = ipcdetail::ct_rounded_size<sizeof(mq_hdr_t), index_align>::value,
0611 r_index_size = ipcdetail::get_rounded_size<size_type>(m_max_num_msg*sizeof(msg_hdr_ptr_t), msg_hdr_align),
0612 r_max_msg_size = ipcdetail::get_rounded_size<size_type>(m_max_msg_size, msg_hdr_align) + sizeof(msg_header);
0613
0614
0615 msg_hdr_ptr_t *index = move_detail::force_ptr<msg_hdr_ptr_t*>
0616 (reinterpret_cast<char*>(this)+r_hdr_size);
0617
0618
0619 msg_header *msg_hdr = move_detail::force_ptr<msg_header*>
0620 (reinterpret_cast<char*>(this)+r_hdr_size+r_index_size);
0621
0622
0623 mp_index = index;
0624
0625
0626 for(size_type i = 0; i < m_max_num_msg; ++i){
0627 index[i] = msg_hdr;
0628 msg_hdr = move_detail::force_ptr<msg_header*>
0629 (reinterpret_cast<char*>(msg_hdr)+r_max_msg_size);
0630 }
0631 }
0632
0633 public:
0634
0635 msg_hdr_ptr_ptr_t mp_index;
0636
0637 const size_type m_max_num_msg;
0638
0639 const size_type m_max_msg_size;
0640
0641 size_type m_cur_num_msg;
0642
0643 interprocess_mutex m_mutex;
0644
0645 interprocess_condition m_cond_recv;
0646
0647 interprocess_condition m_cond_send;
0648 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
0649
0650 size_type m_cur_first_msg;
0651 size_type m_blocked_senders;
0652 size_type m_blocked_receivers;
0653 #endif
0654 };
0655
0656
0657
0658
0659 template<class VoidPointer>
0660 class msg_queue_initialization_func_t
0661 {
0662 public:
0663 typedef typename boost::intrusive::
0664 pointer_traits<VoidPointer>::template
0665 rebind_pointer<char>::type char_ptr;
0666 typedef typename boost::intrusive::pointer_traits<char_ptr>::
0667 difference_type difference_type;
0668 typedef typename boost::container::dtl::
0669 make_unsigned<difference_type>::type size_type;
0670
0671 msg_queue_initialization_func_t(size_type maxmsg = 0,
0672 size_type maxmsgsize = 0)
0673 : m_maxmsg (maxmsg), m_maxmsgsize(maxmsgsize) {}
0674
0675 bool operator()(void *address, size_type, bool created)
0676 {
0677 char *mptr;
0678
0679 if(created){
0680 mptr = reinterpret_cast<char*>(address);
0681
0682 BOOST_TRY{
0683 new (mptr) mq_hdr_t<VoidPointer>(m_maxmsg, m_maxmsgsize);
0684 }
0685 BOOST_CATCH(...){
0686 return false;
0687 } BOOST_CATCH_END
0688 }
0689 return true;
0690 }
0691
0692 std::size_t get_min_size() const
0693 {
0694 return mq_hdr_t<VoidPointer>::get_mem_size(m_maxmsgsize, m_maxmsg)
0695 - message_queue_t<VoidPointer>::open_create_impl_t::ManagedOpenOrCreateUserOffset;
0696 }
0697
0698 const size_type m_maxmsg;
0699 const size_type m_maxmsgsize;
0700 };
0701
0702 }
0703
0704 template<class VoidPointer>
0705 inline message_queue_t<VoidPointer>::~message_queue_t()
0706 {}
0707
0708 template<class VoidPointer>
0709 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_mem_size
0710 (size_type max_msg_size, size_type max_num_msg)
0711 { return ipcdetail::mq_hdr_t<VoidPointer>::get_mem_size(max_msg_size, max_num_msg); }
0712
0713 template<class VoidPointer>
0714 inline message_queue_t<VoidPointer>::message_queue_t(create_only_t,
0715 const char *name,
0716 size_type max_num_msg,
0717 size_type max_msg_size,
0718 const permissions &perm)
0719
0720 : m_shmem(create_only,
0721 name,
0722 get_mem_size(max_msg_size, max_num_msg),
0723 read_write,
0724 static_cast<void*>(0),
0725
0726 ipcdetail::msg_queue_initialization_func_t<VoidPointer> (max_num_msg, max_msg_size),
0727 perm)
0728 {}
0729
0730 template<class VoidPointer>
0731 inline message_queue_t<VoidPointer>::message_queue_t(open_or_create_t,
0732 const char *name,
0733 size_type max_num_msg,
0734 size_type max_msg_size,
0735 const permissions &perm)
0736
0737 : m_shmem(open_or_create,
0738 name,
0739 get_mem_size(max_msg_size, max_num_msg),
0740 read_write,
0741 static_cast<void*>(0),
0742
0743 ipcdetail::msg_queue_initialization_func_t<VoidPointer> (max_num_msg, max_msg_size),
0744 perm)
0745 {}
0746
0747 template<class VoidPointer>
0748 inline message_queue_t<VoidPointer>::message_queue_t(open_only_t, const char *name)
0749
0750 : m_shmem(open_only,
0751 name,
0752 read_write,
0753 static_cast<void*>(0),
0754
0755 ipcdetail::msg_queue_initialization_func_t<VoidPointer> ())
0756 {}
0757
0758 #if defined(BOOST_INTERPROCESS_WCHAR_NAMED_RESOURCES) || defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
0759
0760 template<class VoidPointer>
0761 inline message_queue_t<VoidPointer>::message_queue_t(create_only_t,
0762 const wchar_t *name,
0763 size_type max_num_msg,
0764 size_type max_msg_size,
0765 const permissions &perm)
0766
0767 : m_shmem(create_only,
0768 name,
0769 get_mem_size(max_msg_size, max_num_msg),
0770 read_write,
0771 static_cast<void*>(0),
0772
0773 ipcdetail::msg_queue_initialization_func_t<VoidPointer> (max_num_msg, max_msg_size),
0774 perm)
0775 {}
0776
0777 template<class VoidPointer>
0778 inline message_queue_t<VoidPointer>::message_queue_t(open_or_create_t,
0779 const wchar_t *name,
0780 size_type max_num_msg,
0781 size_type max_msg_size,
0782 const permissions &perm)
0783
0784 : m_shmem(open_or_create,
0785 name,
0786 get_mem_size(max_msg_size, max_num_msg),
0787 read_write,
0788 static_cast<void*>(0),
0789
0790 ipcdetail::msg_queue_initialization_func_t<VoidPointer> (max_num_msg, max_msg_size),
0791 perm)
0792 {}
0793
0794 template<class VoidPointer>
0795 inline message_queue_t<VoidPointer>::message_queue_t(open_only_t, const wchar_t *name)
0796
0797 : m_shmem(open_only,
0798 name,
0799 read_write,
0800 static_cast<void*>(0),
0801
0802 ipcdetail::msg_queue_initialization_func_t<VoidPointer> ())
0803 {}
0804
0805 #endif
0806
0807 template<class VoidPointer>
0808 inline void message_queue_t<VoidPointer>::send
0809 (const void *buffer, size_type buffer_size, unsigned int priority)
0810 { this->do_send<blocking>(buffer, buffer_size, priority, 0); }
0811
0812 template<class VoidPointer>
0813 inline bool message_queue_t<VoidPointer>::try_send
0814 (const void *buffer, size_type buffer_size, unsigned int priority)
0815 { return this->do_send<non_blocking>(buffer, buffer_size, priority, 0); }
0816
0817 template<class VoidPointer>
0818 template<class TimePoint>
0819 inline bool message_queue_t<VoidPointer>::timed_send
0820 (const void *buffer, size_type buffer_size
0821 ,unsigned int priority, const TimePoint &abs_time)
0822 {
0823 if(ipcdetail::is_pos_infinity(abs_time)){
0824 this->send(buffer, buffer_size, priority);
0825 return true;
0826 }
0827 return this->do_send<timed>(buffer, buffer_size, priority, abs_time);
0828 }
0829
0830 template<class VoidPointer>
0831 template<mqblock_types Block, class TimePoint>
0832 inline bool message_queue_t<VoidPointer>::do_send(
0833 const void *buffer, size_type buffer_size,
0834 unsigned int priority, const TimePoint &abs_time)
0835 {
0836 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
0837
0838 if (buffer_size > p_hdr->m_max_msg_size) {
0839 throw interprocess_exception(size_error);
0840 }
0841
0842 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
0843 bool notify_blocked_receivers = false;
0844 #endif
0845
0846 scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);
0847
0848 {
0849
0850 if (p_hdr->is_full()) {
0851 BOOST_TRY{
0852 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
0853 ++p_hdr->m_blocked_senders;
0854 #endif
0855 switch(Block){
0856 case non_blocking :
0857 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
0858 --p_hdr->m_blocked_senders;
0859 #endif
0860 return false;
0861 break;
0862
0863 case blocking :
0864 do{
0865 (void)do_cond_wait(ipcdetail::bool_<false>(), p_hdr->m_cond_send, lock, abs_time);
0866 }
0867 while (p_hdr->is_full());
0868 break;
0869
0870 case timed :
0871 do{
0872 if(!do_cond_wait(ipcdetail::bool_<Block == timed>(), p_hdr->m_cond_send, lock, abs_time)) {
0873 if(p_hdr->is_full()){
0874 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
0875 --p_hdr->m_blocked_senders;
0876 #endif
0877 return false;
0878 }
0879 break;
0880 }
0881 }
0882 while (p_hdr->is_full());
0883 break;
0884 default:
0885 break;
0886 }
0887 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
0888 --p_hdr->m_blocked_senders;
0889 #endif
0890 }
0891 BOOST_CATCH(...){
0892 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
0893 --p_hdr->m_blocked_senders;
0894 #endif
0895 BOOST_RETHROW;
0896 } BOOST_CATCH_END
0897 }
0898
0899 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
0900 notify_blocked_receivers = 0 != p_hdr->m_blocked_receivers;
0901 #endif
0902
0903 ipcdetail::msg_hdr_t<VoidPointer> &free_msg_hdr = p_hdr->queue_free_msg(priority);
0904
0905
0906 BOOST_ASSERT(free_msg_hdr.priority == 0);
0907 BOOST_ASSERT(free_msg_hdr.len == 0);
0908
0909
0910 free_msg_hdr.priority = priority;
0911 free_msg_hdr.len = buffer_size;
0912
0913
0914 std::memcpy(free_msg_hdr.data(), buffer, buffer_size);
0915 }
0916
0917
0918
0919
0920 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
0921 if (notify_blocked_receivers){
0922 p_hdr->m_cond_recv.notify_one();
0923 }
0924 #else
0925 p_hdr->m_cond_recv.notify_one();
0926 #endif
0927
0928 return true;
0929 }
0930
0931 template<class VoidPointer>
0932 inline void message_queue_t<VoidPointer>::receive(void *buffer, size_type buffer_size,
0933 size_type &recvd_size, unsigned int &priority)
0934 { this->do_receive<blocking>(buffer, buffer_size, recvd_size, priority, 0); }
0935
0936 template<class VoidPointer>
0937 inline bool
0938 message_queue_t<VoidPointer>::try_receive(void *buffer, size_type buffer_size,
0939 size_type &recvd_size, unsigned int &priority)
0940 { return this->do_receive<non_blocking>(buffer, buffer_size, recvd_size, priority, 0); }
0941
0942 template<class VoidPointer>
0943 template<class TimePoint>
0944 inline bool
0945 message_queue_t<VoidPointer>::timed_receive(void *buffer, size_type buffer_size,
0946 size_type &recvd_size, unsigned int &priority,
0947 const TimePoint &abs_time)
0948 {
0949 if(ipcdetail::is_pos_infinity(abs_time)){
0950 this->receive(buffer, buffer_size, recvd_size, priority);
0951 return true;
0952 }
0953 return this->do_receive<timed>(buffer, buffer_size, recvd_size, priority, abs_time);
0954 }
0955
0956 template<class VoidPointer>
0957 template<mqblock_types Block, class TimePoint>
0958 inline bool
0959 message_queue_t<VoidPointer>::do_receive(
0960 void *buffer, size_type buffer_size,
0961 size_type &recvd_size, unsigned int &priority,
0962 const TimePoint &abs_time)
0963 {
0964 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
0965
0966 if (buffer_size < p_hdr->m_max_msg_size) {
0967 throw interprocess_exception(size_error);
0968 }
0969
0970 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
0971 bool notify_blocked_senders = false;
0972 #endif
0973
0974 scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);
0975
0976 {
0977
0978 if (p_hdr->is_empty()) {
0979 BOOST_TRY{
0980 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
0981 ++p_hdr->m_blocked_receivers;
0982 #endif
0983 switch(Block){
0984 case non_blocking :
0985 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
0986 --p_hdr->m_blocked_receivers;
0987 #endif
0988 return false;
0989 break;
0990
0991 case blocking :
0992 do{
0993 (void)do_cond_wait(ipcdetail::bool_<false>(), p_hdr->m_cond_recv, lock, abs_time);
0994 }
0995 while (p_hdr->is_empty());
0996 break;
0997
0998 case timed :
0999 do{
1000 if(!do_cond_wait(ipcdetail::bool_<Block == timed>(), p_hdr->m_cond_recv, lock, abs_time)) {
1001 if(p_hdr->is_empty()){
1002 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
1003 --p_hdr->m_blocked_receivers;
1004 #endif
1005 return false;
1006 }
1007 break;
1008 }
1009 }
1010 while (p_hdr->is_empty());
1011 break;
1012
1013
1014 default:
1015 break;
1016 }
1017 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
1018 --p_hdr->m_blocked_receivers;
1019 #endif
1020 }
1021 BOOST_CATCH(...){
1022 #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
1023 --p_hdr->m_blocked_receivers;
1024 #endif
1025 BOOST_RETHROW;
1026 } BOOST_CATCH_END
1027 }
1028
1029 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
1030 notify_blocked_senders = 0 != p_hdr->m_blocked_senders;
1031 #endif
1032
1033
1034 ipcdetail::msg_hdr_t<VoidPointer> &top_msg = p_hdr->top_msg();
1035
1036
1037 recvd_size = top_msg.len;
1038 priority = top_msg.priority;
1039
1040
1041 top_msg.len = 0;
1042 top_msg.priority = 0;
1043
1044
1045 std::memcpy(buffer, top_msg.data(), recvd_size);
1046
1047
1048 p_hdr->free_top_msg();
1049 }
1050
1051
1052
1053
1054 #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
1055 if (notify_blocked_senders){
1056 p_hdr->m_cond_send.notify_one();
1057 }
1058 #else
1059 p_hdr->m_cond_send.notify_one();
1060 #endif
1061
1062 return true;
1063 }
1064
1065 template<class VoidPointer>
1066 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_max_msg() const
1067 {
1068 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
1069 return p_hdr ? p_hdr->m_max_num_msg : 0; }
1070
1071 template<class VoidPointer>
1072 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_max_msg_size() const
1073 {
1074 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
1075 return p_hdr ? p_hdr->m_max_msg_size : 0;
1076 }
1077
1078 template<class VoidPointer>
1079 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_num_msg() const
1080 {
1081 ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
1082 if(p_hdr){
1083
1084 scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);
1085
1086 return p_hdr->m_cur_num_msg;
1087 }
1088
1089 return 0;
1090 }
1091
1092 template<class VoidPointer>
1093 inline bool message_queue_t<VoidPointer>::remove(const char *name)
1094 { return shared_memory_object::remove(name); }
1095
1096 #if defined(BOOST_INTERPROCESS_WCHAR_NAMED_RESOURCES) || defined(BOOST_INTERPROCESS_DOXYGEN_INVOKED)
1097
1098 template<class VoidPointer>
1099 inline bool message_queue_t<VoidPointer>::remove(const wchar_t *name)
1100 { return shared_memory_object::remove(name); }
1101
1102 #endif
1103
1104 #else
1105
1106
1107
1108 typedef message_queue_t<offset_ptr<void> > message_queue;
1109
1110 #endif
1111
1112 }}
1113
1114 #include <boost/interprocess/detail/config_end.hpp>
1115
1116 #endif