File indexing completed on 2025-12-16 09:43:52
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 #ifndef BOOST_BEAST_TEST_DETAIL_STREAM_STATE_IPP
0012 #define BOOST_BEAST_TEST_DETAIL_STREAM_STATE_IPP
0013
0014 #include <boost/beast/_experimental/test/error.hpp>
0015 #include <boost/asio/error.hpp>
0016 #include <boost/make_shared.hpp>
0017
0018 namespace boost {
0019 namespace beast {
0020 namespace test {
0021
0022 namespace detail {
0023
0024
0025
0026 stream_service::
0027 stream_service(net::execution_context& ctx)
0028 : beast::detail::service_base<stream_service>(ctx)
0029 , sp_(boost::make_shared<stream_service_impl>())
0030 {
0031 }
0032
0033 void
0034 stream_service::
0035 shutdown()
0036 {
0037 std::vector<std::unique_ptr<detail::stream_read_op_base>> v;
0038 std::lock_guard<std::mutex> g1(sp_->m_);
0039 v.reserve(sp_->v_.size());
0040 for(auto p : sp_->v_)
0041 {
0042 std::lock_guard<std::mutex> g2(p->m);
0043 v.emplace_back(std::move(p->op));
0044 p->code = detail::stream_status::eof;
0045 }
0046 }
0047
0048 auto
0049 stream_service::
0050 make_impl(
0051 net::any_io_executor exec,
0052 test::fail_count* fc) ->
0053 boost::shared_ptr<detail::stream_state>
0054 {
0055 #if defined(BOOST_ASIO_USE_TS_EXECUTOR_AS_DEFAULT)
0056 auto& ctx = exec.context();
0057 #else
0058 auto& ctx = net::query(
0059 exec,
0060 net::execution::context);
0061 #endif
0062 auto& svc = net::use_service<stream_service>(ctx);
0063 auto sp = boost::make_shared<detail::stream_state>(exec, svc.sp_, fc);
0064 std::lock_guard<std::mutex> g(svc.sp_->m_);
0065 svc.sp_->v_.push_back(sp.get());
0066 return sp;
0067 }
0068
0069
0070
0071 void
0072 stream_service_impl::
0073 remove(stream_state& impl)
0074 {
0075 std::lock_guard<std::mutex> g(m_);
0076 *std::find(
0077 v_.begin(), v_.end(),
0078 &impl) = std::move(v_.back());
0079 v_.pop_back();
0080 }
0081
0082
0083
0084 stream_state::
0085 stream_state(
0086 net::any_io_executor exec_,
0087 boost::weak_ptr<stream_service_impl> wp_,
0088 fail_count* fc_)
0089 : exec(std::move(exec_))
0090 , wp(std::move(wp_))
0091 , fc(fc_)
0092 {
0093 }
0094
0095 stream_state::
0096 ~stream_state()
0097 {
0098
0099 if(op != nullptr)
0100 (*op)(net::error::operation_aborted);
0101 }
0102
0103 void
0104 stream_state::
0105 remove() noexcept
0106 {
0107 auto sp = wp.lock();
0108
0109
0110
0111 BOOST_ASSERT(sp);
0112
0113 sp->remove(*this);
0114 }
0115
0116 void
0117 stream_state::
0118 notify_read()
0119 {
0120 if(op)
0121 {
0122 auto op_ = std::move(op);
0123 op_->operator()(error_code{});
0124 }
0125 else
0126 {
0127 cv.notify_all();
0128 }
0129 }
0130
0131 void
0132 stream_state::
0133 cancel_read()
0134 {
0135 std::unique_ptr<stream_read_op_base> p;
0136 {
0137 std::lock_guard<std::mutex> lock(m);
0138 code = stream_status::eof;
0139 p = std::move(op);
0140 }
0141 if(p != nullptr)
0142 (*p)(net::error::operation_aborted);
0143 }
0144
0145 }
0146
0147 }
0148 }
0149 }
0150
0151 #endif