File indexing completed on 2025-07-06 08:08:49
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 #ifndef BOOST_BEAST_TEST_IMPL_STREAM_HPP
0011 #define BOOST_BEAST_TEST_IMPL_STREAM_HPP
0012
0013 #include <boost/beast/core/buffer_traits.hpp>
0014 #include <boost/beast/core/detail/service_base.hpp>
0015 #include <boost/beast/core/detail/is_invocable.hpp>
0016 #include <boost/asio/any_io_executor.hpp>
0017 #include <boost/asio/append.hpp>
0018 #include <boost/asio/associated_cancellation_slot.hpp>
0019 #include <boost/asio/dispatch.hpp>
0020 #include <boost/asio/post.hpp>
0021 #include <mutex>
0022 #include <stdexcept>
0023 #include <vector>
0024
0025 namespace boost {
0026 namespace beast {
0027 namespace test {
0028
0029 namespace detail
0030 {
0031 template<class To>
0032 struct extract_executor_op
0033 {
0034 To operator()(net::any_io_executor& ex) const
0035 {
0036 assert(ex.template target<To>());
0037 return *ex.template target<To>();
0038 }
0039 };
0040
0041 template<>
0042 struct extract_executor_op<net::any_io_executor>
0043 {
0044 net::any_io_executor operator()(net::any_io_executor& ex) const
0045 {
0046 return ex;
0047 }
0048 };
0049 }
0050
0051 template<class Executor>
0052 template<class Handler, class Buffers>
0053 class basic_stream<Executor>::read_op : public detail::stream_read_op_base
0054 {
0055 struct lambda
0056 {
0057 Handler h_;
0058 boost::weak_ptr<detail::stream_state> wp_;
0059 Buffers b_;
0060 #if defined(BOOST_ASIO_NO_TS_EXECUTORS)
0061 net::any_io_executor wg2_;
0062 #else
0063 net::executor_work_guard<
0064 net::associated_executor_t<Handler, net::any_io_executor>> wg2_;
0065 #endif
0066
0067 lambda(lambda&&) = default;
0068 lambda(lambda const&) = default;
0069
0070 template<class Handler_>
0071 lambda(
0072 Handler_&& h,
0073 boost::shared_ptr<detail::stream_state> const& s,
0074 Buffers const& b)
0075 : h_(std::forward<Handler_>(h))
0076 , wp_(s)
0077 , b_(b)
0078 #if defined(BOOST_ASIO_NO_TS_EXECUTORS)
0079 , wg2_(net::prefer(
0080 net::get_associated_executor(
0081 h_, s->exec),
0082 net::execution::outstanding_work.tracked))
0083 #else
0084 , wg2_(net::get_associated_executor(
0085 h_, s->exec))
0086 #endif
0087 {
0088 }
0089
0090 using allocator_type = net::associated_allocator_t<Handler>;
0091
0092 allocator_type get_allocator() const noexcept
0093 {
0094 return net::get_associated_allocator(h_);
0095 }
0096
0097 using cancellation_slot_type =
0098 net::associated_cancellation_slot_t<Handler>;
0099
0100 cancellation_slot_type
0101 get_cancellation_slot() const noexcept
0102 {
0103 return net::get_associated_cancellation_slot(h_,
0104 net::cancellation_slot());
0105 }
0106
0107 void
0108 operator()(error_code ec)
0109 {
0110 std::size_t bytes_transferred = 0;
0111 auto sp = wp_.lock();
0112 if(! sp)
0113 {
0114 BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
0115 }
0116 if(! ec)
0117 {
0118 std::lock_guard<std::mutex> lock(sp->m);
0119 BOOST_ASSERT(! sp->op);
0120 if(sp->b.size() > 0)
0121 {
0122 bytes_transferred =
0123 net::buffer_copy(
0124 b_, sp->b.data(), sp->read_max);
0125 sp->b.consume(bytes_transferred);
0126 sp->nread_bytes += bytes_transferred;
0127 }
0128 else if (buffer_bytes(b_) > 0)
0129 {
0130 BOOST_BEAST_ASSIGN_EC(ec, net::error::eof);
0131 }
0132 }
0133
0134 #if defined(BOOST_ASIO_NO_TS_EXECUTORS)
0135 net::dispatch(wg2_,
0136 net::append(std::move(h_), ec, bytes_transferred));
0137 wg2_ = net::any_io_executor();
0138 #else
0139 net::dispatch(wg2_.get_executor(),
0140 net::append(std::move(h_), ec, bytes_transferred));
0141 wg2_.reset();
0142 #endif
0143 }
0144 };
0145
0146 lambda fn_;
0147 #if defined(BOOST_ASIO_USE_TS_EXECUTOR_AS_DEFAULT)
0148 net::executor_work_guard<net::any_io_executor> wg1_;
0149 #else
0150 net::any_io_executor wg1_;
0151 #endif
0152
0153 public:
0154 template<class Handler_>
0155 read_op(
0156 Handler_&& h,
0157 boost::shared_ptr<detail::stream_state> const& s,
0158 Buffers const& b)
0159 : fn_(std::forward<Handler_>(h), s, b)
0160 #if defined(BOOST_ASIO_USE_TS_EXECUTOR_AS_DEFAULT)
0161 , wg1_(s->exec)
0162 #else
0163 , wg1_(net::prefer(s->exec,
0164 net::execution::outstanding_work.tracked))
0165 #endif
0166 {
0167 }
0168
0169 void
0170 operator()(error_code ec) override
0171 {
0172 #if defined(BOOST_ASIO_USE_TS_EXECUTOR_AS_DEFAULT)
0173 net::post(wg1_.get_executor(), net::append(std::move(fn_), ec));
0174 wg1_.reset();
0175 #else
0176 net::post(wg1_, net::append(std::move(fn_), ec));
0177 wg1_ = net::any_io_executor();
0178 #endif
0179 }
0180 };
0181
0182 template<class Executor>
0183 struct basic_stream<Executor>::run_read_op
0184 {
0185 boost::shared_ptr<detail::stream_state> const& in;
0186
0187 using executor_type = typename basic_stream::executor_type;
0188
0189 executor_type
0190 get_executor() const noexcept
0191 {
0192 return detail::extract_executor_op<Executor>()(in->exec);
0193 }
0194
0195 template<
0196 class ReadHandler,
0197 class MutableBufferSequence>
0198 void
0199 operator()(
0200 ReadHandler&& h,
0201 MutableBufferSequence const& buffers)
0202 {
0203
0204
0205
0206
0207 static_assert(
0208 beast::detail::is_invocable<ReadHandler,
0209 void(error_code, std::size_t)>::value,
0210 "ReadHandler type requirements not met");
0211
0212 initiate_read(
0213 in,
0214 std::unique_ptr<detail::stream_read_op_base>{
0215 new read_op<
0216 typename std::decay<ReadHandler>::type,
0217 MutableBufferSequence>(
0218 std::move(h),
0219 in,
0220 buffers)},
0221 buffer_bytes(buffers));
0222 }
0223 };
0224
0225 template<class Executor>
0226 struct basic_stream<Executor>::run_write_op
0227 {
0228 boost::shared_ptr<detail::stream_state> const& in_;
0229
0230 using executor_type = typename basic_stream::executor_type;
0231
0232 executor_type
0233 get_executor() const noexcept
0234 {
0235 return detail::extract_executor_op<Executor>()(in_->exec);
0236 }
0237
0238 template<
0239 class WriteHandler,
0240 class ConstBufferSequence>
0241 void
0242 operator()(
0243 WriteHandler&& h,
0244 boost::weak_ptr<detail::stream_state> out_,
0245 ConstBufferSequence const& buffers)
0246 {
0247
0248
0249
0250
0251 static_assert(
0252 beast::detail::is_invocable<WriteHandler,
0253 void(error_code, std::size_t)>::value,
0254 "WriteHandler type requirements not met");
0255
0256 ++in_->nwrite;
0257 auto const upcall = [&](error_code ec, std::size_t n)
0258 {
0259 net::post(in_->exec, net::append(std::move(h), ec, n));
0260 };
0261
0262
0263 error_code ec;
0264 std::size_t n = 0;
0265 if(in_->fc && in_->fc->fail(ec))
0266 return upcall(ec, n);
0267
0268
0269 if(buffer_bytes(buffers) == 0)
0270 return upcall(ec, n);
0271
0272
0273 auto out = out_.lock();
0274 if(! out)
0275 return upcall(net::error::connection_reset, n);
0276
0277
0278 n = std::min<std::size_t>(
0279 buffer_bytes(buffers), in_->write_max);
0280 {
0281 std::lock_guard<std::mutex> lock(out->m);
0282 n = net::buffer_copy(out->b.prepare(n), buffers);
0283 out->b.commit(n);
0284 out->nwrite_bytes += n;
0285 out->notify_read();
0286 }
0287 BOOST_ASSERT(! ec);
0288 upcall(ec, n);
0289 }
0290 };
0291
0292
0293
0294 template<class Executor>
0295 template<class MutableBufferSequence>
0296 std::size_t
0297 basic_stream<Executor>::
0298 read_some(MutableBufferSequence const& buffers)
0299 {
0300 static_assert(net::is_mutable_buffer_sequence<
0301 MutableBufferSequence>::value,
0302 "MutableBufferSequence type requirements not met");
0303 error_code ec;
0304 auto const n = read_some(buffers, ec);
0305 if(ec)
0306 BOOST_THROW_EXCEPTION(system_error{ec});
0307 return n;
0308 }
0309
0310 template<class Executor>
0311 template<class MutableBufferSequence>
0312 std::size_t
0313 basic_stream<Executor>::
0314 read_some(MutableBufferSequence const& buffers,
0315 error_code& ec)
0316 {
0317 static_assert(net::is_mutable_buffer_sequence<
0318 MutableBufferSequence>::value,
0319 "MutableBufferSequence type requirements not met");
0320
0321 ++in_->nread;
0322
0323
0324 if(in_->fc && in_->fc->fail(ec))
0325 return 0;
0326
0327
0328 if(buffer_bytes(buffers) == 0)
0329 {
0330 ec = {};
0331 return 0;
0332 }
0333
0334 std::unique_lock<std::mutex> lock{in_->m};
0335 BOOST_ASSERT(! in_->op);
0336 in_->cv.wait(lock,
0337 [&]()
0338 {
0339 return
0340 in_->b.size() > 0 ||
0341 in_->code != detail::stream_status::ok;
0342 });
0343
0344
0345 if(in_->b.size() > 0)
0346 {
0347 auto const n = net::buffer_copy(
0348 buffers, in_->b.data(), in_->read_max);
0349 in_->b.consume(n);
0350 in_->nread_bytes += n;
0351 return n;
0352 }
0353
0354
0355 BOOST_ASSERT(in_->code != detail::stream_status::ok);
0356 BOOST_BEAST_ASSIGN_EC(ec, net::error::eof);
0357 return 0;
0358 }
0359
0360 template<class Executor>
0361 template<class MutableBufferSequence,
0362 BOOST_ASIO_COMPLETION_TOKEN_FOR(void(error_code, std::size_t)) ReadHandler>
0363 BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(ReadHandler, void(error_code, std::size_t))
0364 basic_stream<Executor>::
0365 async_read_some(
0366 MutableBufferSequence const& buffers,
0367 ReadHandler&& handler)
0368 {
0369 static_assert(net::is_mutable_buffer_sequence<
0370 MutableBufferSequence>::value,
0371 "MutableBufferSequence type requirements not met");
0372
0373 return net::async_initiate<
0374 ReadHandler,
0375 void(error_code, std::size_t)>(
0376 run_read_op{in_},
0377 handler,
0378 buffers);
0379 }
0380
0381 template<class Executor>
0382 template<class ConstBufferSequence>
0383 std::size_t
0384 basic_stream<Executor>::
0385 write_some(ConstBufferSequence const& buffers)
0386 {
0387 static_assert(net::is_const_buffer_sequence<
0388 ConstBufferSequence>::value,
0389 "ConstBufferSequence type requirements not met");
0390 error_code ec;
0391 auto const bytes_transferred =
0392 write_some(buffers, ec);
0393 if(ec)
0394 BOOST_THROW_EXCEPTION(system_error{ec});
0395 return bytes_transferred;
0396 }
0397
0398 template<class Executor>
0399 template<class ConstBufferSequence>
0400 std::size_t
0401 basic_stream<Executor>::
0402 write_some(
0403 ConstBufferSequence const& buffers, error_code& ec)
0404 {
0405 static_assert(net::is_const_buffer_sequence<
0406 ConstBufferSequence>::value,
0407 "ConstBufferSequence type requirements not met");
0408
0409 ++in_->nwrite;
0410
0411
0412 if(in_->fc && in_->fc->fail(ec))
0413 return 0;
0414
0415
0416 if(buffer_bytes(buffers) == 0)
0417 {
0418 ec = {};
0419 return 0;
0420 }
0421
0422
0423 auto out = out_.lock();
0424 if(! out)
0425 {
0426 BOOST_BEAST_ASSIGN_EC(ec, net::error::connection_reset);
0427 return 0;
0428 }
0429
0430
0431 auto n = std::min<std::size_t>(
0432 buffer_bytes(buffers), in_->write_max);
0433 {
0434 std::lock_guard<std::mutex> lock(out->m);
0435 n = net::buffer_copy(out->b.prepare(n), buffers);
0436 out->b.commit(n);
0437 out->nwrite_bytes += n;
0438 out->notify_read();
0439 }
0440 return n;
0441 }
0442
0443 template<class Executor>
0444 template<class ConstBufferSequence,
0445 BOOST_ASIO_COMPLETION_TOKEN_FOR(void(error_code, std::size_t)) WriteHandler>
0446 BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(WriteHandler, void(error_code, std::size_t))
0447 basic_stream<Executor>::
0448 async_write_some(
0449 ConstBufferSequence const& buffers,
0450 WriteHandler&& handler)
0451 {
0452 static_assert(net::is_const_buffer_sequence<
0453 ConstBufferSequence>::value,
0454 "ConstBufferSequence type requirements not met");
0455
0456 return net::async_initiate<
0457 WriteHandler,
0458 void(error_code, std::size_t)>(
0459 run_write_op{in_},
0460 handler,
0461 out_,
0462 buffers);
0463 }
0464
0465
0466
0467 template<class Executor, class TeardownHandler>
0468 void
0469 async_teardown(
0470 role_type,
0471 basic_stream<Executor>& s,
0472 TeardownHandler&& handler)
0473 {
0474 error_code ec;
0475 if( s.in_->fc &&
0476 s.in_->fc->fail(ec))
0477 return net::post(
0478 s.get_executor(),
0479 net::append(std::move(handler), ec));
0480 s.close();
0481 if( s.in_->fc &&
0482 s.in_->fc->fail(ec))
0483 {
0484 BOOST_BEAST_ASSIGN_EC(ec, net::error::eof);
0485 }
0486 else
0487 ec = {};
0488
0489 net::post(s.get_executor(), net::append(std::move(handler), ec));
0490 }
0491
0492
0493
0494 template<class Executor, class Arg1, class... ArgN>
0495 basic_stream<Executor>
0496 connect(stream& to, Arg1&& arg1, ArgN&&... argn)
0497 {
0498 stream from{
0499 std::forward<Arg1>(arg1),
0500 std::forward<ArgN>(argn)...};
0501 from.connect(to);
0502 return from;
0503 }
0504
0505 template<class Executor>
0506 auto basic_stream<Executor>::get_executor() noexcept -> executor_type
0507 {
0508 return detail::extract_executor_op<Executor>()(in_->exec);
0509 }
0510
0511 }
0512 }
0513 }
0514
0515 #endif