Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 09:28:35

0001 //
0002 // detail/impl/strand_executor_service.ipp
0003 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
0004 //
0005 // Copyright (c) 2003-2023 Christopher M. Kohlhoff (chris at kohlhoff dot com)
0006 //
0007 // Distributed under the Boost Software License, Version 1.0. (See accompanying
0008 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
0009 //
0010 
0011 #ifndef BOOST_ASIO_DETAIL_IMPL_STRAND_EXECUTOR_SERVICE_IPP
0012 #define BOOST_ASIO_DETAIL_IMPL_STRAND_EXECUTOR_SERVICE_IPP
0013 
0014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
0015 # pragma once
0016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
0017 
0018 #include <boost/asio/detail/config.hpp>
0019 #include <boost/asio/detail/strand_executor_service.hpp>
0020 
0021 #include <boost/asio/detail/push_options.hpp>
0022 
0023 namespace boost {
0024 namespace asio {
0025 namespace detail {
0026 
0027 strand_executor_service::strand_executor_service(execution_context& ctx)
0028   : execution_context_service_base<strand_executor_service>(ctx),
0029     mutex_(),
0030     salt_(0),
0031     impl_list_(0)
0032 {
0033 }
0034 
0035 void strand_executor_service::shutdown()
0036 {
0037   op_queue<scheduler_operation> ops;
0038 
0039   boost::asio::detail::mutex::scoped_lock lock(mutex_);
0040 
0041   strand_impl* impl = impl_list_;
0042   while (impl)
0043   {
0044     impl->mutex_->lock();
0045     impl->shutdown_ = true;
0046     ops.push(impl->waiting_queue_);
0047     ops.push(impl->ready_queue_);
0048     impl->mutex_->unlock();
0049     impl = impl->next_;
0050   }
0051 }
0052 
0053 strand_executor_service::implementation_type
0054 strand_executor_service::create_implementation()
0055 {
0056   implementation_type new_impl(new strand_impl);
0057   new_impl->locked_ = false;
0058   new_impl->shutdown_ = false;
0059 
0060   boost::asio::detail::mutex::scoped_lock lock(mutex_);
0061 
0062   // Select a mutex from the pool of shared mutexes.
0063   std::size_t salt = salt_++;
0064   std::size_t mutex_index = reinterpret_cast<std::size_t>(new_impl.get());
0065   mutex_index += (reinterpret_cast<std::size_t>(new_impl.get()) >> 3);
0066   mutex_index ^= salt + 0x9e3779b9 + (mutex_index << 6) + (mutex_index >> 2);
0067   mutex_index = mutex_index % num_mutexes;
0068   if (!mutexes_[mutex_index].get())
0069     mutexes_[mutex_index].reset(new mutex);
0070   new_impl->mutex_ = mutexes_[mutex_index].get();
0071 
0072   // Insert implementation into linked list of all implementations.
0073   new_impl->next_ = impl_list_;
0074   new_impl->prev_ = 0;
0075   if (impl_list_)
0076     impl_list_->prev_ = new_impl.get();
0077   impl_list_ = new_impl.get();
0078   new_impl->service_ = this;
0079 
0080   return new_impl;
0081 }
0082 
0083 strand_executor_service::strand_impl::~strand_impl()
0084 {
0085   boost::asio::detail::mutex::scoped_lock lock(service_->mutex_);
0086 
0087   // Remove implementation from linked list of all implementations.
0088   if (service_->impl_list_ == this)
0089     service_->impl_list_ = next_;
0090   if (prev_)
0091     prev_->next_ = next_;
0092   if (next_)
0093     next_->prev_= prev_;
0094 }
0095 
0096 bool strand_executor_service::enqueue(const implementation_type& impl,
0097     scheduler_operation* op)
0098 {
0099   impl->mutex_->lock();
0100   if (impl->shutdown_)
0101   {
0102     impl->mutex_->unlock();
0103     op->destroy();
0104     return false;
0105   }
0106   else if (impl->locked_)
0107   {
0108     // Some other function already holds the strand lock. Enqueue for later.
0109     impl->waiting_queue_.push(op);
0110     impl->mutex_->unlock();
0111     return false;
0112   }
0113   else
0114   {
0115     // The function is acquiring the strand lock and so is responsible for
0116     // scheduling the strand.
0117     impl->locked_ = true;
0118     impl->mutex_->unlock();
0119     impl->ready_queue_.push(op);
0120     return true;
0121   }
0122 }
0123 
0124 bool strand_executor_service::running_in_this_thread(
0125     const implementation_type& impl)
0126 {
0127   return !!call_stack<strand_impl>::contains(impl.get());
0128 }
0129 
0130 bool strand_executor_service::push_waiting_to_ready(implementation_type& impl)
0131 {
0132   impl->mutex_->lock();
0133   impl->ready_queue_.push(impl->waiting_queue_);
0134   bool more_handlers = impl->locked_ = !impl->ready_queue_.empty();
0135   impl->mutex_->unlock();
0136   return more_handlers;
0137 }
0138 
0139 void strand_executor_service::run_ready_handlers(implementation_type& impl)
0140 {
0141   // Indicate that this strand is executing on the current thread.
0142   call_stack<strand_impl>::context ctx(impl.get());
0143 
0144   // Run all ready handlers. No lock is required since the ready queue is
0145   // accessed only within the strand.
0146   boost::system::error_code ec;
0147   while (scheduler_operation* o = impl->ready_queue_.front())
0148   {
0149     impl->ready_queue_.pop();
0150     o->complete(impl.get(), ec, 0);
0151   }
0152 }
0153 
0154 } // namespace detail
0155 } // namespace asio
0156 } // namespace boost
0157 
0158 #include <boost/asio/detail/pop_options.hpp>
0159 
0160 #endif // BOOST_ASIO_DETAIL_IMPL_STRAND_EXECUTOR_SERVICE_IPP