File indexing completed on 2025-01-18 09:29:22
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 #ifndef BOOST_BEAST_TEST_IMPL_STREAM_HPP
0011 #define BOOST_BEAST_TEST_IMPL_STREAM_HPP
0012
0013 #include <boost/beast/core/bind_handler.hpp>
0014 #include <boost/beast/core/buffer_traits.hpp>
0015 #include <boost/beast/core/detail/service_base.hpp>
0016 #include <boost/beast/core/detail/is_invocable.hpp>
0017 #include <boost/asio/any_io_executor.hpp>
0018 #include <boost/asio/dispatch.hpp>
0019 #include <boost/asio/post.hpp>
0020 #include <mutex>
0021 #include <stdexcept>
0022 #include <vector>
0023
0024 namespace boost {
0025 namespace beast {
0026 namespace test {
0027
0028
0029
0030 template<class Executor>
0031 template<class Handler, class Buffers>
0032 class basic_stream<Executor>::read_op : public detail::stream_read_op_base
0033 {
0034 struct lambda
0035 {
0036 Handler h_;
0037 boost::weak_ptr<detail::stream_state> wp_;
0038 Buffers b_;
0039 #if defined(BOOST_ASIO_NO_TS_EXECUTORS)
0040 net::any_io_executor wg2_;
0041 #else
0042 net::executor_work_guard<
0043 net::associated_executor_t<Handler, net::any_io_executor>> wg2_;
0044 #endif
0045
0046 lambda(lambda&&) = default;
0047 lambda(lambda const&) = default;
0048
0049 template<class Handler_>
0050 lambda(
0051 Handler_&& h,
0052 boost::shared_ptr<detail::stream_state> const& s,
0053 Buffers const& b)
0054 : h_(std::forward<Handler_>(h))
0055 , wp_(s)
0056 , b_(b)
0057 #if defined(BOOST_ASIO_NO_TS_EXECUTORS)
0058 , wg2_(net::prefer(
0059 net::get_associated_executor(
0060 h_, s->exec),
0061 net::execution::outstanding_work.tracked))
0062 #else
0063 , wg2_(net::get_associated_executor(
0064 h_, s->exec))
0065 #endif
0066 {
0067 }
0068
0069 using allocator_type = net::associated_allocator_t<Handler>;
0070
0071 allocator_type get_allocator() const noexcept
0072 {
0073 return net::get_associated_allocator(h_);
0074 }
0075
0076 using cancellation_slot_type =
0077 net::associated_cancellation_slot_t<Handler>;
0078
0079 cancellation_slot_type
0080 get_cancellation_slot() const noexcept
0081 {
0082 return net::get_associated_cancellation_slot(h_,
0083 net::cancellation_slot());
0084 }
0085
0086 void
0087 operator()(error_code ec)
0088 {
0089 std::size_t bytes_transferred = 0;
0090 auto sp = wp_.lock();
0091 if(! sp)
0092 {
0093 BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
0094 }
0095 if(! ec)
0096 {
0097 std::lock_guard<std::mutex> lock(sp->m);
0098 BOOST_ASSERT(! sp->op);
0099 if(sp->b.size() > 0)
0100 {
0101 bytes_transferred =
0102 net::buffer_copy(
0103 b_, sp->b.data(), sp->read_max);
0104 sp->b.consume(bytes_transferred);
0105 sp->nread_bytes += bytes_transferred;
0106 }
0107 else if (buffer_bytes(b_) > 0)
0108 {
0109 BOOST_BEAST_ASSIGN_EC(ec, net::error::eof);
0110 }
0111 }
0112
0113 #if defined(BOOST_ASIO_NO_TS_EXECUTORS)
0114 net::dispatch(wg2_,
0115 beast::bind_front_handler(std::move(h_),
0116 ec, bytes_transferred));
0117 wg2_ = net::any_io_executor();
0118 #else
0119 net::dispatch(wg2_.get_executor(),
0120 beast::bind_front_handler(std::move(h_),
0121 ec, bytes_transferred));
0122 wg2_.reset();
0123 #endif
0124 }
0125 };
0126
0127 lambda fn_;
0128 #if defined(BOOST_ASIO_USE_TS_EXECUTOR_AS_DEFAULT)
0129 net::executor_work_guard<net::any_io_executor> wg1_;
0130 #else
0131 net::any_io_executor wg1_;
0132 #endif
0133
0134 public:
0135 template<class Handler_>
0136 read_op(
0137 Handler_&& h,
0138 boost::shared_ptr<detail::stream_state> const& s,
0139 Buffers const& b)
0140 : fn_(std::forward<Handler_>(h), s, b)
0141 #if defined(BOOST_ASIO_USE_TS_EXECUTOR_AS_DEFAULT)
0142 , wg1_(s->exec)
0143 #else
0144 , wg1_(net::prefer(s->exec,
0145 net::execution::outstanding_work.tracked))
0146 #endif
0147 {
0148 }
0149
0150 void
0151 operator()(error_code ec) override
0152 {
0153 #if defined(BOOST_ASIO_USE_TS_EXECUTOR_AS_DEFAULT)
0154 net::post(wg1_.get_executor(),
0155 beast::bind_front_handler(std::move(fn_), ec));
0156 wg1_.reset();
0157 #else
0158 net::post(wg1_, beast::bind_front_handler(std::move(fn_), ec));
0159 wg1_ = net::any_io_executor();
0160 #endif
0161 }
0162 };
0163
0164 template<class Executor>
0165 struct basic_stream<Executor>::run_read_op
0166 {
0167 template<
0168 class ReadHandler,
0169 class MutableBufferSequence>
0170 void
0171 operator()(
0172 ReadHandler&& h,
0173 boost::shared_ptr<detail::stream_state> const& in,
0174 MutableBufferSequence const& buffers)
0175 {
0176
0177
0178
0179
0180 static_assert(
0181 beast::detail::is_invocable<ReadHandler,
0182 void(error_code, std::size_t)>::value,
0183 "ReadHandler type requirements not met");
0184
0185 initiate_read(
0186 in,
0187 std::unique_ptr<detail::stream_read_op_base>{
0188 new read_op<
0189 typename std::decay<ReadHandler>::type,
0190 MutableBufferSequence>(
0191 std::move(h),
0192 in,
0193 buffers)},
0194 buffer_bytes(buffers));
0195 }
0196 };
0197
0198 template<class Executor>
0199 struct basic_stream<Executor>::run_write_op
0200 {
0201 template<
0202 class WriteHandler,
0203 class ConstBufferSequence>
0204 void
0205 operator()(
0206 WriteHandler&& h,
0207 boost::shared_ptr<detail::stream_state> in_,
0208 boost::weak_ptr<detail::stream_state> out_,
0209 ConstBufferSequence const& buffers)
0210 {
0211
0212
0213
0214
0215 static_assert(
0216 beast::detail::is_invocable<WriteHandler,
0217 void(error_code, std::size_t)>::value,
0218 "WriteHandler type requirements not met");
0219
0220 ++in_->nwrite;
0221 auto const upcall = [&](error_code ec, std::size_t n)
0222 {
0223 net::post(
0224 in_->exec,
0225 beast::bind_front_handler(std::move(h), ec, n));
0226 };
0227
0228
0229 error_code ec;
0230 std::size_t n = 0;
0231 if(in_->fc && in_->fc->fail(ec))
0232 return upcall(ec, n);
0233
0234
0235 if(buffer_bytes(buffers) == 0)
0236 return upcall(ec, n);
0237
0238
0239 auto out = out_.lock();
0240 if(! out)
0241 return upcall(net::error::connection_reset, n);
0242
0243
0244 n = std::min<std::size_t>(
0245 buffer_bytes(buffers), in_->write_max);
0246 {
0247 std::lock_guard<std::mutex> lock(out->m);
0248 n = net::buffer_copy(out->b.prepare(n), buffers);
0249 out->b.commit(n);
0250 out->nwrite_bytes += n;
0251 out->notify_read();
0252 }
0253 BOOST_ASSERT(! ec);
0254 upcall(ec, n);
0255 }
0256 };
0257
0258
0259
0260 template<class Executor>
0261 template<class MutableBufferSequence>
0262 std::size_t
0263 basic_stream<Executor>::
0264 read_some(MutableBufferSequence const& buffers)
0265 {
0266 static_assert(net::is_mutable_buffer_sequence<
0267 MutableBufferSequence>::value,
0268 "MutableBufferSequence type requirements not met");
0269 error_code ec;
0270 auto const n = read_some(buffers, ec);
0271 if(ec)
0272 BOOST_THROW_EXCEPTION(system_error{ec});
0273 return n;
0274 }
0275
0276 template<class Executor>
0277 template<class MutableBufferSequence>
0278 std::size_t
0279 basic_stream<Executor>::
0280 read_some(MutableBufferSequence const& buffers,
0281 error_code& ec)
0282 {
0283 static_assert(net::is_mutable_buffer_sequence<
0284 MutableBufferSequence>::value,
0285 "MutableBufferSequence type requirements not met");
0286
0287 ++in_->nread;
0288
0289
0290 if(in_->fc && in_->fc->fail(ec))
0291 return 0;
0292
0293
0294 if(buffer_bytes(buffers) == 0)
0295 {
0296 ec = {};
0297 return 0;
0298 }
0299
0300 std::unique_lock<std::mutex> lock{in_->m};
0301 BOOST_ASSERT(! in_->op);
0302 in_->cv.wait(lock,
0303 [&]()
0304 {
0305 return
0306 in_->b.size() > 0 ||
0307 in_->code != detail::stream_status::ok;
0308 });
0309
0310
0311 if(in_->b.size() > 0)
0312 {
0313 auto const n = net::buffer_copy(
0314 buffers, in_->b.data(), in_->read_max);
0315 in_->b.consume(n);
0316 in_->nread_bytes += n;
0317 return n;
0318 }
0319
0320
0321 BOOST_ASSERT(in_->code != detail::stream_status::ok);
0322 BOOST_BEAST_ASSIGN_EC(ec, net::error::eof);
0323 return 0;
0324 }
0325
0326 template<class Executor>
0327 template<class MutableBufferSequence,
0328 BOOST_ASIO_COMPLETION_TOKEN_FOR(void(error_code, std::size_t)) ReadHandler>
0329 BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(ReadHandler, void(error_code, std::size_t))
0330 basic_stream<Executor>::
0331 async_read_some(
0332 MutableBufferSequence const& buffers,
0333 ReadHandler&& handler)
0334 {
0335 static_assert(net::is_mutable_buffer_sequence<
0336 MutableBufferSequence>::value,
0337 "MutableBufferSequence type requirements not met");
0338
0339 return net::async_initiate<
0340 ReadHandler,
0341 void(error_code, std::size_t)>(
0342 run_read_op{},
0343 handler,
0344 in_,
0345 buffers);
0346 }
0347
0348 template<class Executor>
0349 template<class ConstBufferSequence>
0350 std::size_t
0351 basic_stream<Executor>::
0352 write_some(ConstBufferSequence const& buffers)
0353 {
0354 static_assert(net::is_const_buffer_sequence<
0355 ConstBufferSequence>::value,
0356 "ConstBufferSequence type requirements not met");
0357 error_code ec;
0358 auto const bytes_transferred =
0359 write_some(buffers, ec);
0360 if(ec)
0361 BOOST_THROW_EXCEPTION(system_error{ec});
0362 return bytes_transferred;
0363 }
0364
0365 template<class Executor>
0366 template<class ConstBufferSequence>
0367 std::size_t
0368 basic_stream<Executor>::
0369 write_some(
0370 ConstBufferSequence const& buffers, error_code& ec)
0371 {
0372 static_assert(net::is_const_buffer_sequence<
0373 ConstBufferSequence>::value,
0374 "ConstBufferSequence type requirements not met");
0375
0376 ++in_->nwrite;
0377
0378
0379 if(in_->fc && in_->fc->fail(ec))
0380 return 0;
0381
0382
0383 if(buffer_bytes(buffers) == 0)
0384 {
0385 ec = {};
0386 return 0;
0387 }
0388
0389
0390 auto out = out_.lock();
0391 if(! out)
0392 {
0393 BOOST_BEAST_ASSIGN_EC(ec, net::error::connection_reset);
0394 return 0;
0395 }
0396
0397
0398 auto n = std::min<std::size_t>(
0399 buffer_bytes(buffers), in_->write_max);
0400 {
0401 std::lock_guard<std::mutex> lock(out->m);
0402 n = net::buffer_copy(out->b.prepare(n), buffers);
0403 out->b.commit(n);
0404 out->nwrite_bytes += n;
0405 out->notify_read();
0406 }
0407 return n;
0408 }
0409
0410 template<class Executor>
0411 template<class ConstBufferSequence,
0412 BOOST_ASIO_COMPLETION_TOKEN_FOR(void(error_code, std::size_t)) WriteHandler>
0413 BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(WriteHandler, void(error_code, std::size_t))
0414 basic_stream<Executor>::
0415 async_write_some(
0416 ConstBufferSequence const& buffers,
0417 WriteHandler&& handler)
0418 {
0419 static_assert(net::is_const_buffer_sequence<
0420 ConstBufferSequence>::value,
0421 "ConstBufferSequence type requirements not met");
0422
0423 return net::async_initiate<
0424 WriteHandler,
0425 void(error_code, std::size_t)>(
0426 run_write_op{},
0427 handler,
0428 in_,
0429 out_,
0430 buffers);
0431 }
0432
0433
0434
0435 template<class Executor, class TeardownHandler>
0436 void
0437 async_teardown(
0438 role_type,
0439 basic_stream<Executor>& s,
0440 TeardownHandler&& handler)
0441 {
0442 error_code ec;
0443 if( s.in_->fc &&
0444 s.in_->fc->fail(ec))
0445 return net::post(
0446 s.get_executor(),
0447 beast::bind_front_handler(
0448 std::move(handler), ec));
0449 s.close();
0450 if( s.in_->fc &&
0451 s.in_->fc->fail(ec))
0452 {
0453 BOOST_BEAST_ASSIGN_EC(ec, net::error::eof);
0454 }
0455 else
0456 ec = {};
0457
0458 net::post(
0459 s.get_executor(),
0460 beast::bind_front_handler(
0461 std::move(handler), ec));
0462 }
0463
0464
0465
0466 template<class Executor, class Arg1, class... ArgN>
0467 basic_stream<Executor>
0468 connect(stream& to, Arg1&& arg1, ArgN&&... argn)
0469 {
0470 stream from{
0471 std::forward<Arg1>(arg1),
0472 std::forward<ArgN>(argn)...};
0473 from.connect(to);
0474 return from;
0475 }
0476
0477 namespace detail
0478 {
0479 template<class To>
0480 struct extract_executor_op
0481 {
0482 To operator()(net::any_io_executor& ex) const
0483 {
0484 assert(ex.template target<To>());
0485 return *ex.template target<To>();
0486 }
0487 };
0488
0489 template<>
0490 struct extract_executor_op<net::any_io_executor>
0491 {
0492 net::any_io_executor operator()(net::any_io_executor& ex) const
0493 {
0494 return ex;
0495 }
0496 };
0497 }
0498
0499 template<class Executor>
0500 auto basic_stream<Executor>::get_executor() noexcept -> executor_type
0501 {
0502 return detail::extract_executor_op<Executor>()(in_->exec);
0503 }
0504
0505 }
0506 }
0507 }
0508
0509 #endif