File indexing completed on 2025-07-05 08:28:04
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 #ifndef BOOST_BEAST_WEBSOCKET_IMPL_READ_HPP
0011 #define BOOST_BEAST_WEBSOCKET_IMPL_READ_HPP
0012
0013 #include <boost/beast/core/buffer_traits.hpp>
0014 #include <boost/beast/websocket/teardown.hpp>
0015 #include <boost/beast/websocket/detail/mask.hpp>
0016 #include <boost/beast/websocket/impl/stream_impl.hpp>
0017 #include <boost/beast/core/async_base.hpp>
0018 #include <boost/beast/core/buffers_prefix.hpp>
0019 #include <boost/beast/core/buffers_suffix.hpp>
0020 #include <boost/beast/core/flat_static_buffer.hpp>
0021 #include <boost/beast/core/read_size.hpp>
0022 #include <boost/beast/core/stream_traits.hpp>
0023 #include <boost/beast/core/detail/bind_continuation.hpp>
0024 #include <boost/beast/core/detail/buffer.hpp>
0025 #include <boost/beast/core/detail/clamp.hpp>
0026 #include <boost/beast/core/detail/config.hpp>
0027 #include <boost/asio/coroutine.hpp>
0028 #include <boost/assert.hpp>
0029 #include <boost/config.hpp>
0030 #include <boost/optional.hpp>
0031 #include <boost/throw_exception.hpp>
0032 #include <algorithm>
0033 #include <limits>
0034 #include <memory>
0035
0036 namespace boost {
0037 namespace beast {
0038 namespace websocket {
0039
0040
0041
0042
0043
0044 template<class NextLayer, bool deflateSupported>
0045 template<class Handler, class MutableBufferSequence>
0046 class stream<NextLayer, deflateSupported>::read_some_op
0047 : public beast::async_base<
0048 Handler, beast::executor_type<stream>>
0049 , public asio::coroutine
0050 {
0051 boost::weak_ptr<impl_type> wp_;
0052 MutableBufferSequence bs_;
0053 buffers_suffix<MutableBufferSequence> cb_;
0054 std::size_t bytes_written_ = 0;
0055 error_code result_;
0056 close_code code_;
0057 bool did_read_ = false;
0058
0059 public:
0060 static constexpr int id = 1;
0061
0062 template<class Handler_>
0063 read_some_op(
0064 Handler_&& h,
0065 boost::shared_ptr<impl_type> const& sp,
0066 MutableBufferSequence const& bs)
0067 : async_base<
0068 Handler, beast::executor_type<stream>>(
0069 std::forward<Handler_>(h),
0070 sp->stream().get_executor())
0071 , wp_(sp)
0072 , bs_(bs)
0073 , cb_(bs)
0074 , code_(close_code::none)
0075 {
0076 (*this)({}, 0, false);
0077 }
0078
0079 void operator()(
0080 error_code ec = {},
0081 std::size_t bytes_transferred = 0,
0082 bool cont = true)
0083 {
0084 using beast::detail::clamp;
0085 auto sp = wp_.lock();
0086 if(! sp)
0087 {
0088 BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
0089 bytes_written_ = 0;
0090 return this->complete(cont, ec, bytes_written_);
0091 }
0092 auto& impl = *sp;
0093 BOOST_ASIO_CORO_REENTER(*this)
0094 {
0095 impl.update_timer(this->get_executor());
0096
0097 acquire_read_lock:
0098
0099 if(! impl.rd_block.try_lock(this))
0100 {
0101 do_suspend:
0102 BOOST_ASIO_CORO_YIELD
0103 {
0104 BOOST_ASIO_HANDLER_LOCATION((
0105 __FILE__, __LINE__,
0106 "websocket::async_read_some"));
0107
0108 this->set_allowed_cancellation(net::cancellation_type::all);
0109 impl.op_r_rd.emplace(std::move(*this), net::cancellation_type::all);
0110 }
0111 if (ec)
0112 return this->complete(cont, ec, bytes_written_);
0113
0114 this->set_allowed_cancellation(net::cancellation_type::terminal);
0115
0116 impl.rd_block.lock(this);
0117 BOOST_ASIO_CORO_YIELD
0118 {
0119 BOOST_ASIO_HANDLER_LOCATION((
0120 __FILE__, __LINE__,
0121 "websocket::async_read_some"));
0122
0123 const auto ex = this->get_immediate_executor();
0124 net::dispatch(ex, std::move(*this));
0125 }
0126 BOOST_ASSERT(impl.rd_block.is_locked(this));
0127
0128 BOOST_ASSERT(!ec);
0129 if(impl.check_stop_now(ec))
0130 {
0131
0132
0133
0134
0135
0136 goto upcall;
0137 }
0138
0139
0140
0141
0142 BOOST_ASSERT(impl.wr_close);
0143 BOOST_ASSERT(impl.status_ != status::open);
0144 BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
0145 goto upcall;
0146 }
0147 else
0148 {
0149
0150 if( impl.status_ == status::closed ||
0151 impl.status_ == status::failed)
0152 {
0153 BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
0154 goto upcall;
0155 }
0156 }
0157
0158
0159
0160
0161
0162 loop:
0163 BOOST_ASSERT(impl.rd_block.is_locked(this));
0164
0165
0166
0167
0168 if(impl.rd_remain == 0 &&
0169 (! impl.rd_fh.fin || impl.rd_done))
0170 {
0171
0172 while(! impl.parse_fh(
0173 impl.rd_fh, impl.rd_buf, result_))
0174 {
0175 if(result_)
0176 {
0177
0178 if(result_ == error::message_too_big)
0179 code_ = close_code::too_big;
0180 else
0181 code_ = close_code::protocol_error;
0182 goto close;
0183 }
0184 BOOST_ASSERT(impl.rd_block.is_locked(this));
0185 BOOST_ASIO_CORO_YIELD
0186 {
0187 BOOST_ASIO_HANDLER_LOCATION((
0188 __FILE__, __LINE__,
0189 "websocket::async_read_some"));
0190
0191 impl.stream().async_read_some(
0192 impl.rd_buf.prepare(read_size(
0193 impl.rd_buf, impl.rd_buf.max_size())),
0194 std::move(*this));
0195 }
0196 BOOST_ASSERT(impl.rd_block.is_locked(this));
0197 impl.rd_buf.commit(bytes_transferred);
0198 if(impl.check_stop_now(ec))
0199 goto upcall;
0200 impl.reset_idle();
0201
0202
0203
0204 impl.rd_block.unlock(this);
0205 if( impl.op_r_close.maybe_invoke())
0206 {
0207
0208 BOOST_ASSERT(impl.rd_block.is_locked());
0209 goto do_suspend;
0210 }
0211
0212 impl.rd_block.lock(this);
0213 }
0214
0215
0216 if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
0217 detail::mask_inplace(buffers_prefix(
0218 clamp(impl.rd_fh.len),
0219 impl.rd_buf.data()),
0220 impl.rd_key);
0221 if(detail::is_control(impl.rd_fh.op))
0222 {
0223
0224
0225 impl.rd_fh.fin = false;
0226
0227
0228 if(impl.rd_fh.op == detail::opcode::ping)
0229 {
0230 if(impl.ctrl_cb)
0231 {
0232 if(! cont)
0233 {
0234 BOOST_ASIO_CORO_YIELD
0235 {
0236 BOOST_ASIO_HANDLER_LOCATION((
0237 __FILE__, __LINE__,
0238 "websocket::async_read_some"));
0239
0240 const auto ex = this->get_immediate_executor();
0241 net::dispatch(ex, std::move(*this));
0242 }
0243 BOOST_ASSERT(cont);
0244
0245 }
0246 }
0247 {
0248 auto const b = buffers_prefix(
0249 clamp(impl.rd_fh.len),
0250 impl.rd_buf.data());
0251 auto const len = buffer_bytes(b);
0252 BOOST_ASSERT(len == impl.rd_fh.len);
0253 ping_data payload;
0254 detail::read_ping(payload, b);
0255 impl.rd_buf.consume(len);
0256
0257 if(impl.status_ == status::closing)
0258 goto loop;
0259 if(impl.ctrl_cb)
0260 impl.ctrl_cb(
0261 frame_type::ping, to_string_view(payload));
0262 impl.rd_fb.clear();
0263 impl.template write_ping<
0264 flat_static_buffer_base>(impl.rd_fb,
0265 detail::opcode::pong, payload);
0266 }
0267
0268
0269
0270 impl.rd_block.unlock(this);
0271 impl.op_r_close.maybe_invoke();
0272
0273
0274 if(! impl.wr_block.try_lock(this))
0275 {
0276 BOOST_ASIO_CORO_YIELD
0277 {
0278 BOOST_ASIO_HANDLER_LOCATION((
0279 __FILE__, __LINE__,
0280 "websocket::async_read_some"));
0281
0282 impl.op_rd.emplace(std::move(*this));
0283 }
0284 if (ec)
0285 return this->complete(cont, ec, bytes_written_);
0286
0287 impl.wr_block.lock(this);
0288 BOOST_ASIO_CORO_YIELD
0289 {
0290 BOOST_ASIO_HANDLER_LOCATION((
0291 __FILE__, __LINE__,
0292 "websocket::async_read_some"));
0293
0294 const auto ex = this->get_immediate_executor();
0295 net::dispatch(ex, std::move(*this));
0296 }
0297 BOOST_ASSERT(impl.wr_block.is_locked(this));
0298 if(impl.check_stop_now(ec))
0299 goto upcall;
0300 }
0301
0302
0303 BOOST_ASSERT(impl.wr_block.is_locked(this));
0304 BOOST_ASIO_CORO_YIELD
0305 {
0306 BOOST_ASIO_HANDLER_LOCATION((
0307 __FILE__, __LINE__,
0308 "websocket::async_read_some"));
0309
0310 net::async_write(
0311 impl.stream(), net::const_buffer(impl.rd_fb.data()),
0312 beast::detail::bind_continuation(std::move(*this)));
0313 }
0314 BOOST_ASSERT(impl.wr_block.is_locked(this));
0315 if(impl.check_stop_now(ec))
0316 goto upcall;
0317 impl.wr_block.unlock(this);
0318 impl.op_close.maybe_invoke()
0319 || impl.op_idle_ping.maybe_invoke()
0320 || impl.op_ping.maybe_invoke()
0321 || impl.op_wr.maybe_invoke();
0322 goto acquire_read_lock;
0323 }
0324
0325
0326 if(impl.rd_fh.op == detail::opcode::pong)
0327 {
0328
0329 if(! impl.wr_close && impl.ctrl_cb)
0330 {
0331 if(! cont)
0332 {
0333 BOOST_ASIO_CORO_YIELD
0334 {
0335 BOOST_ASIO_HANDLER_LOCATION((
0336 __FILE__, __LINE__,
0337 "websocket::async_read_some"));
0338
0339 const auto ex = this->get_immediate_executor();
0340 net::dispatch(ex, std::move(*this));
0341 }
0342 BOOST_ASSERT(cont);
0343 }
0344 }
0345 auto const cb = buffers_prefix(clamp(
0346 impl.rd_fh.len), impl.rd_buf.data());
0347 auto const len = buffer_bytes(cb);
0348 BOOST_ASSERT(len == impl.rd_fh.len);
0349 ping_data payload;
0350 detail::read_ping(payload, cb);
0351 impl.rd_buf.consume(len);
0352
0353 if(! impl.wr_close && impl.ctrl_cb)
0354 impl.ctrl_cb(frame_type::pong, to_string_view(payload));
0355 goto loop;
0356 }
0357
0358
0359 BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close);
0360 {
0361 if(impl.ctrl_cb)
0362 {
0363 if(! cont)
0364 {
0365 BOOST_ASIO_CORO_YIELD
0366 {
0367 BOOST_ASIO_HANDLER_LOCATION((
0368 __FILE__, __LINE__,
0369 "websocket::async_read_some"));
0370
0371 const auto ex = this->get_immediate_executor();
0372 net::dispatch(ex, std::move(*this));
0373 }
0374 BOOST_ASSERT(cont);
0375 }
0376 }
0377 auto const cb = buffers_prefix(clamp(
0378 impl.rd_fh.len), impl.rd_buf.data());
0379 auto const len = buffer_bytes(cb);
0380 BOOST_ASSERT(len == impl.rd_fh.len);
0381 BOOST_ASSERT(! impl.rd_close);
0382 impl.rd_close = true;
0383 close_reason cr;
0384 detail::read_close(cr, cb, result_);
0385 if(result_)
0386 {
0387
0388 code_ = close_code::protocol_error;
0389 goto close;
0390 }
0391 impl.cr = cr;
0392 impl.rd_buf.consume(len);
0393 if(impl.ctrl_cb)
0394 impl.ctrl_cb(frame_type::close,
0395 to_string_view(impl.cr.reason));
0396
0397 if(impl.status_ == status::closing)
0398 {
0399
0400 BOOST_ASSERT(impl.wr_close);
0401 code_ = close_code::none;
0402 result_ = error::closed;
0403 goto close;
0404 }
0405
0406 code_ = cr.code == close_code::none ?
0407 close_code::normal :
0408 static_cast<close_code>(cr.code);
0409 result_ = error::closed;
0410 goto close;
0411 }
0412 }
0413 if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin)
0414 {
0415
0416 goto loop;
0417 }
0418 impl.rd_done = false;
0419 }
0420 if(! impl.rd_deflated())
0421 {
0422 if(impl.rd_remain > 0)
0423 {
0424 if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() >
0425 (std::min)(clamp(impl.rd_remain),
0426 buffer_bytes(cb_)))
0427 {
0428
0429
0430 BOOST_ASIO_CORO_YIELD
0431 {
0432 BOOST_ASIO_HANDLER_LOCATION((
0433 __FILE__, __LINE__,
0434 "websocket::async_read_some"));
0435
0436 impl.stream().async_read_some(
0437 impl.rd_buf.prepare(read_size(
0438 impl.rd_buf, impl.rd_buf.max_size())),
0439 std::move(*this));
0440 }
0441 impl.rd_buf.commit(bytes_transferred);
0442 if(impl.check_stop_now(ec))
0443 goto upcall;
0444 impl.reset_idle();
0445 if(impl.rd_fh.mask)
0446 detail::mask_inplace(buffers_prefix(clamp(
0447 impl.rd_remain), impl.rd_buf.data()),
0448 impl.rd_key);
0449 }
0450 if(impl.rd_buf.size() > 0)
0451 {
0452
0453
0454 bytes_transferred = net::buffer_copy(cb_,
0455 impl.rd_buf.data(), clamp(impl.rd_remain));
0456 auto const mb = buffers_prefix(
0457 bytes_transferred, cb_);
0458 impl.rd_remain -= bytes_transferred;
0459 if(impl.rd_op == detail::opcode::text)
0460 {
0461 if(! impl.rd_utf8.write(mb) ||
0462 (impl.rd_remain == 0 && impl.rd_fh.fin &&
0463 ! impl.rd_utf8.finish()))
0464 {
0465
0466 code_ = close_code::bad_payload;
0467 result_ = error::bad_frame_payload;
0468 goto close;
0469 }
0470 }
0471 bytes_written_ += bytes_transferred;
0472 impl.rd_size += bytes_transferred;
0473 impl.rd_buf.consume(bytes_transferred);
0474 }
0475 else
0476 {
0477
0478 BOOST_ASSERT(impl.rd_remain > 0);
0479 BOOST_ASSERT(buffer_bytes(cb_) > 0);
0480 BOOST_ASSERT(buffer_bytes(buffers_prefix(
0481 clamp(impl.rd_remain), cb_)) > 0);
0482 BOOST_ASIO_CORO_YIELD
0483 {
0484 BOOST_ASIO_HANDLER_LOCATION((
0485 __FILE__, __LINE__,
0486 "websocket::async_read_some"));
0487
0488 impl.stream().async_read_some(buffers_prefix(
0489 clamp(impl.rd_remain), cb_), std::move(*this));
0490 }
0491 if(impl.check_stop_now(ec))
0492 goto upcall;
0493 impl.reset_idle();
0494 BOOST_ASSERT(bytes_transferred > 0);
0495 auto const mb = buffers_prefix(
0496 bytes_transferred, cb_);
0497 impl.rd_remain -= bytes_transferred;
0498 if(impl.rd_fh.mask)
0499 detail::mask_inplace(mb, impl.rd_key);
0500 if(impl.rd_op == detail::opcode::text)
0501 {
0502 if(! impl.rd_utf8.write(mb) ||
0503 (impl.rd_remain == 0 && impl.rd_fh.fin &&
0504 ! impl.rd_utf8.finish()))
0505 {
0506
0507 code_ = close_code::bad_payload;
0508 result_ = error::bad_frame_payload;
0509 goto close;
0510 }
0511 }
0512 bytes_written_ += bytes_transferred;
0513 impl.rd_size += bytes_transferred;
0514 }
0515 }
0516 BOOST_ASSERT( ! impl.rd_done );
0517 if( impl.rd_remain == 0 && impl.rd_fh.fin )
0518 impl.rd_done = true;
0519 }
0520 else
0521 {
0522
0523
0524
0525 while(buffer_bytes(cb_) > 0)
0526 {
0527 if( impl.rd_remain > 0 &&
0528 impl.rd_buf.size() == 0 &&
0529 ! did_read_)
0530 {
0531
0532 BOOST_ASIO_CORO_YIELD
0533 {
0534 BOOST_ASIO_HANDLER_LOCATION((
0535 __FILE__, __LINE__,
0536 "websocket::async_read_some"));
0537
0538 impl.stream().async_read_some(
0539 impl.rd_buf.prepare(read_size(
0540 impl.rd_buf, impl.rd_buf.max_size())),
0541 std::move(*this));
0542 }
0543 if(impl.check_stop_now(ec))
0544 goto upcall;
0545 impl.reset_idle();
0546 BOOST_ASSERT(bytes_transferred > 0);
0547 impl.rd_buf.commit(bytes_transferred);
0548 if(impl.rd_fh.mask)
0549 detail::mask_inplace(
0550 buffers_prefix(clamp(impl.rd_remain),
0551 impl.rd_buf.data()), impl.rd_key);
0552 did_read_ = true;
0553 }
0554 zlib::z_params zs;
0555 {
0556 auto const out = buffers_front(cb_);
0557 zs.next_out = out.data();
0558 zs.avail_out = out.size();
0559 BOOST_ASSERT(zs.avail_out > 0);
0560 }
0561
0562 bool fin = false;
0563 if(impl.rd_remain > 0)
0564 {
0565 if(impl.rd_buf.size() > 0)
0566 {
0567
0568 auto const in = buffers_prefix(
0569 clamp(impl.rd_remain), buffers_front(
0570 impl.rd_buf.data()));
0571 zs.avail_in = in.size();
0572 zs.next_in = in.data();
0573 }
0574 else
0575 {
0576 break;
0577 }
0578 }
0579 else if(impl.rd_fh.fin)
0580 {
0581
0582 static std::uint8_t constexpr
0583 empty_block[4] = { 0x00, 0x00, 0xff, 0xff };
0584 zs.next_in = empty_block;
0585 zs.avail_in = sizeof(empty_block);
0586 fin = true;
0587 }
0588 else
0589 {
0590 break;
0591 }
0592 impl.inflate(zs, zlib::Flush::sync, ec);
0593 if(impl.check_stop_now(ec))
0594 goto upcall;
0595 if(fin && zs.total_out == 0) {
0596 impl.do_context_takeover_read(impl.role);
0597 impl.rd_done = true;
0598 break;
0599 }
0600 if(impl.rd_msg_max && beast::detail::sum_exceeds(
0601 impl.rd_size, zs.total_out, impl.rd_msg_max))
0602 {
0603
0604 code_ = close_code::too_big;
0605 result_ = error::message_too_big;
0606 goto close;
0607 }
0608 cb_.consume(zs.total_out);
0609 impl.rd_size += zs.total_out;
0610 if (! fin) {
0611 impl.rd_remain -= zs.total_in;
0612 impl.rd_buf.consume(zs.total_in);
0613 }
0614 bytes_written_ += zs.total_out;
0615 }
0616 if(impl.rd_op == detail::opcode::text)
0617 {
0618
0619 if(! impl.rd_utf8.write(
0620 buffers_prefix(bytes_written_, bs_)) || (
0621 impl.rd_done && ! impl.rd_utf8.finish()))
0622 {
0623
0624 code_ = close_code::bad_payload;
0625 result_ = error::bad_frame_payload;
0626 goto close;
0627 }
0628 }
0629 }
0630 goto upcall;
0631
0632 close:
0633
0634 if(! impl.wr_block.try_lock(this))
0635 {
0636 BOOST_ASIO_CORO_YIELD
0637 {
0638 BOOST_ASIO_HANDLER_LOCATION((
0639 __FILE__, __LINE__,
0640 "websocket::async_read_some"));
0641
0642 impl.op_rd.emplace(std::move(*this));
0643 }
0644 if (ec)
0645 return this->complete(cont, ec, bytes_written_);
0646
0647 impl.wr_block.lock(this);
0648 BOOST_ASIO_CORO_YIELD
0649 {
0650 BOOST_ASIO_HANDLER_LOCATION((
0651 __FILE__, __LINE__,
0652 "websocket::async_read_some"));
0653
0654 const auto ex = this->get_immediate_executor();
0655 net::dispatch(ex, std::move(*this));
0656 }
0657 BOOST_ASSERT(impl.wr_block.is_locked(this));
0658 if(impl.check_stop_now(ec))
0659 goto upcall;
0660 }
0661
0662 impl.change_status(status::closing);
0663
0664 if(! impl.wr_close)
0665 {
0666 impl.wr_close = true;
0667
0668
0669 impl.rd_fb.clear();
0670 impl.template write_close<
0671 flat_static_buffer_base>(
0672 impl.rd_fb, code_);
0673
0674
0675 BOOST_ASSERT(impl.wr_block.is_locked(this));
0676 BOOST_ASIO_CORO_YIELD
0677 {
0678 BOOST_ASIO_HANDLER_LOCATION((
0679 __FILE__, __LINE__,
0680 "websocket::async_read_some"));
0681
0682 net::async_write(impl.stream(), net::const_buffer(impl.rd_fb.data()),
0683 beast::detail::bind_continuation(std::move(*this)));
0684 }
0685 BOOST_ASSERT(impl.wr_block.is_locked(this));
0686 if(impl.check_stop_now(ec))
0687 goto upcall;
0688 }
0689
0690
0691 using beast::websocket::async_teardown;
0692 BOOST_ASSERT(impl.wr_block.is_locked(this));
0693 BOOST_ASIO_CORO_YIELD
0694 {
0695 BOOST_ASIO_HANDLER_LOCATION((
0696 __FILE__, __LINE__,
0697 "websocket::async_read_some"));
0698
0699 async_teardown(impl.role, impl.stream(),
0700 beast::detail::bind_continuation(std::move(*this)));
0701 }
0702 BOOST_ASSERT(impl.wr_block.is_locked(this));
0703 if(ec == net::error::eof)
0704 {
0705
0706
0707 ec = {};
0708 }
0709 if(! ec)
0710 {
0711 BOOST_BEAST_ASSIGN_EC(ec, result_);
0712 }
0713 if(ec && ec != error::closed)
0714 impl.change_status(status::failed);
0715 else
0716 impl.change_status(status::closed);
0717 impl.close();
0718
0719 upcall:
0720 impl.rd_block.try_unlock(this);
0721 impl.op_r_close.maybe_invoke();
0722 if(impl.wr_block.try_unlock(this))
0723 impl.op_close.maybe_invoke()
0724 || impl.op_idle_ping.maybe_invoke()
0725 || impl.op_ping.maybe_invoke()
0726 || impl.op_wr.maybe_invoke();
0727 this->complete(cont, ec, bytes_written_);
0728 }
0729 }
0730 };
0731
0732
0733
0734 template<class NextLayer, bool deflateSupported>
0735 template<class Handler, class DynamicBuffer>
0736 class stream<NextLayer, deflateSupported>::read_op
0737 : public beast::async_base<
0738 Handler, beast::executor_type<stream>>
0739 , public asio::coroutine
0740 {
0741 boost::weak_ptr<impl_type> wp_;
0742 DynamicBuffer& b_;
0743 std::size_t limit_;
0744 std::size_t bytes_written_ = 0;
0745 bool some_;
0746
0747 public:
0748 template<class Handler_>
0749 read_op(
0750 Handler_&& h,
0751 boost::shared_ptr<impl_type> const& sp,
0752 DynamicBuffer& b,
0753 std::size_t limit,
0754 bool some)
0755 : async_base<Handler,
0756 beast::executor_type<stream>>(
0757 std::forward<Handler_>(h),
0758 sp->stream().get_executor())
0759 , wp_(sp)
0760 , b_(b)
0761 , limit_(limit ? limit : (
0762 std::numeric_limits<std::size_t>::max)())
0763 , some_(some)
0764 {
0765 (*this)({}, 0, false);
0766 }
0767
0768 void operator()(
0769 error_code ec = {},
0770 std::size_t bytes_transferred = 0,
0771 bool cont = true)
0772 {
0773 using beast::detail::clamp;
0774 auto sp = wp_.lock();
0775 if(! sp)
0776 {
0777 BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
0778 bytes_written_ = 0;
0779 return this->complete(cont, ec, bytes_written_);
0780 }
0781 auto& impl = *sp;
0782 using mutable_buffers_type = typename
0783 DynamicBuffer::mutable_buffers_type;
0784 BOOST_ASIO_CORO_REENTER(*this)
0785 {
0786 do
0787 {
0788
0789 BOOST_ASIO_CORO_YIELD
0790 {
0791 auto mb = beast::detail::dynamic_buffer_prepare(b_,
0792 clamp(impl.read_size_hint_db(b_), limit_),
0793 ec, error::buffer_overflow);
0794 if(impl.check_stop_now(ec))
0795 goto upcall;
0796
0797 BOOST_ASIO_HANDLER_LOCATION((
0798 __FILE__, __LINE__,
0799 "websocket::async_read"));
0800
0801 read_some_op<read_op, mutable_buffers_type>(
0802 std::move(*this), sp, *mb);
0803 }
0804
0805 b_.commit(bytes_transferred);
0806 bytes_written_ += bytes_transferred;
0807 if(ec)
0808 goto upcall;
0809 }
0810 while(! some_ && ! impl.rd_done);
0811
0812 upcall:
0813 this->complete(cont, ec, bytes_written_);
0814 }
0815 }
0816 };
0817
0818 template<class NextLayer, bool deflateSupported>
0819 struct stream<NextLayer, deflateSupported>::
0820 run_read_some_op
0821 {
0822 boost::shared_ptr<impl_type> const& self;
0823
0824 using executor_type = typename stream::executor_type;
0825
0826 executor_type
0827 get_executor() const noexcept
0828 {
0829 return self->stream().get_executor();
0830 }
0831
0832 template<
0833 class ReadHandler,
0834 class MutableBufferSequence>
0835 void
0836 operator()(
0837 ReadHandler&& h,
0838 MutableBufferSequence const& b)
0839 {
0840
0841
0842
0843
0844 static_assert(
0845 beast::detail::is_invocable<ReadHandler,
0846 void(error_code, std::size_t)>::value,
0847 "ReadHandler type requirements not met");
0848
0849 read_some_op<
0850 typename std::decay<ReadHandler>::type,
0851 MutableBufferSequence>(
0852 std::forward<ReadHandler>(h),
0853 self,
0854 b);
0855 }
0856 };
0857
0858 template<class NextLayer, bool deflateSupported>
0859 struct stream<NextLayer, deflateSupported>::
0860 run_read_op
0861 {
0862 boost::shared_ptr<impl_type> const& self;
0863
0864 using executor_type = typename stream::executor_type;
0865
0866 executor_type
0867 get_executor() const noexcept
0868 {
0869 return self->stream().get_executor();
0870 }
0871
0872 template<
0873 class ReadHandler,
0874 class DynamicBuffer>
0875 void
0876 operator()(
0877 ReadHandler&& h,
0878 DynamicBuffer* b,
0879 std::size_t limit,
0880 bool some)
0881 {
0882
0883
0884
0885
0886 static_assert(
0887 beast::detail::is_invocable<ReadHandler,
0888 void(error_code, std::size_t)>::value,
0889 "ReadHandler type requirements not met");
0890
0891 read_op<
0892 typename std::decay<ReadHandler>::type,
0893 DynamicBuffer>(
0894 std::forward<ReadHandler>(h),
0895 self,
0896 *b,
0897 limit,
0898 some);
0899 }
0900 };
0901
0902
0903
0904 template<class NextLayer, bool deflateSupported>
0905 template<class DynamicBuffer>
0906 std::size_t
0907 stream<NextLayer, deflateSupported>::
0908 read(DynamicBuffer& buffer)
0909 {
0910 static_assert(is_sync_stream<next_layer_type>::value,
0911 "SyncStream type requirements not met");
0912 static_assert(
0913 net::is_dynamic_buffer<DynamicBuffer>::value,
0914 "DynamicBuffer type requirements not met");
0915 error_code ec;
0916 auto const bytes_written = read(buffer, ec);
0917 if(ec)
0918 BOOST_THROW_EXCEPTION(system_error{ec});
0919 return bytes_written;
0920 }
0921
0922 template<class NextLayer, bool deflateSupported>
0923 template<class DynamicBuffer>
0924 std::size_t
0925 stream<NextLayer, deflateSupported>::
0926 read(DynamicBuffer& buffer, error_code& ec)
0927 {
0928 static_assert(is_sync_stream<next_layer_type>::value,
0929 "SyncStream type requirements not met");
0930 static_assert(
0931 net::is_dynamic_buffer<DynamicBuffer>::value,
0932 "DynamicBuffer type requirements not met");
0933 std::size_t bytes_written = 0;
0934 do
0935 {
0936 bytes_written += read_some(buffer, 0, ec);
0937 if(ec)
0938 return bytes_written;
0939 }
0940 while(! is_message_done());
0941 return bytes_written;
0942 }
0943
0944 template<class NextLayer, bool deflateSupported>
0945 template<class DynamicBuffer, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>
0946 BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
0947 stream<NextLayer, deflateSupported>::
0948 async_read(DynamicBuffer& buffer, ReadHandler&& handler)
0949 {
0950 static_assert(is_async_stream<next_layer_type>::value,
0951 "AsyncStream type requirements not met");
0952 static_assert(
0953 net::is_dynamic_buffer<DynamicBuffer>::value,
0954 "DynamicBuffer type requirements not met");
0955 return net::async_initiate<
0956 ReadHandler,
0957 void(error_code, std::size_t)>(
0958 run_read_op{impl_},
0959 handler,
0960 &buffer,
0961 0,
0962 false);
0963 }
0964
0965
0966
0967 template<class NextLayer, bool deflateSupported>
0968 template<class DynamicBuffer>
0969 std::size_t
0970 stream<NextLayer, deflateSupported>::
0971 read_some(
0972 DynamicBuffer& buffer,
0973 std::size_t limit)
0974 {
0975 static_assert(is_sync_stream<next_layer_type>::value,
0976 "SyncStream type requirements not met");
0977 static_assert(
0978 net::is_dynamic_buffer<DynamicBuffer>::value,
0979 "DynamicBuffer type requirements not met");
0980 error_code ec;
0981 auto const bytes_written =
0982 read_some(buffer, limit, ec);
0983 if(ec)
0984 BOOST_THROW_EXCEPTION(system_error{ec});
0985 return bytes_written;
0986 }
0987
0988 template<class NextLayer, bool deflateSupported>
0989 template<class DynamicBuffer>
0990 std::size_t
0991 stream<NextLayer, deflateSupported>::
0992 read_some(
0993 DynamicBuffer& buffer,
0994 std::size_t limit,
0995 error_code& ec)
0996 {
0997 static_assert(is_sync_stream<next_layer_type>::value,
0998 "SyncStream type requirements not met");
0999 static_assert(
1000 net::is_dynamic_buffer<DynamicBuffer>::value,
1001 "DynamicBuffer type requirements not met");
1002 using beast::detail::clamp;
1003 if(! limit)
1004 limit = (std::numeric_limits<std::size_t>::max)();
1005 auto const size =
1006 clamp(impl_->read_size_hint_db(buffer), limit);
1007 BOOST_ASSERT(size > 0);
1008 auto mb = beast::detail::dynamic_buffer_prepare(
1009 buffer, size, ec, error::buffer_overflow);
1010 if(impl_->check_stop_now(ec))
1011 return 0;
1012 auto const bytes_written = read_some(*mb, ec);
1013 buffer.commit(bytes_written);
1014 return bytes_written;
1015 }
1016
1017 template<class NextLayer, bool deflateSupported>
1018 template<class DynamicBuffer, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>
1019 BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
1020 stream<NextLayer, deflateSupported>::
1021 async_read_some(
1022 DynamicBuffer& buffer,
1023 std::size_t limit,
1024 ReadHandler&& handler)
1025 {
1026 static_assert(is_async_stream<next_layer_type>::value,
1027 "AsyncStream type requirements not met");
1028 static_assert(
1029 net::is_dynamic_buffer<DynamicBuffer>::value,
1030 "DynamicBuffer type requirements not met");
1031 return net::async_initiate<
1032 ReadHandler,
1033 void(error_code, std::size_t)>(
1034 run_read_op{impl_},
1035 handler,
1036 &buffer,
1037 limit,
1038 true);
1039 }
1040
1041
1042
1043 template<class NextLayer, bool deflateSupported>
1044 template<class MutableBufferSequence>
1045 std::size_t
1046 stream<NextLayer, deflateSupported>::
1047 read_some(
1048 MutableBufferSequence const& buffers)
1049 {
1050 static_assert(is_sync_stream<next_layer_type>::value,
1051 "SyncStream type requirements not met");
1052 static_assert(net::is_mutable_buffer_sequence<
1053 MutableBufferSequence>::value,
1054 "MutableBufferSequence type requirements not met");
1055 error_code ec;
1056 auto const bytes_written = read_some(buffers, ec);
1057 if(ec)
1058 BOOST_THROW_EXCEPTION(system_error{ec});
1059 return bytes_written;
1060 }
1061
1062 template<class NextLayer, bool deflateSupported>
1063 template<class MutableBufferSequence>
1064 std::size_t
1065 stream<NextLayer, deflateSupported>::
1066 read_some(
1067 MutableBufferSequence const& buffers,
1068 error_code& ec)
1069 {
1070 static_assert(is_sync_stream<next_layer_type>::value,
1071 "SyncStream type requirements not met");
1072 static_assert(net::is_mutable_buffer_sequence<
1073 MutableBufferSequence>::value,
1074 "MutableBufferSequence type requirements not met");
1075 using beast::detail::clamp;
1076 auto& impl = *impl_;
1077 close_code code{};
1078 std::size_t bytes_written = 0;
1079 ec = {};
1080
1081 if(impl.check_stop_now(ec))
1082 return bytes_written;
1083 loop:
1084
1085
1086
1087
1088 if(impl.rd_remain == 0 && (
1089 ! impl.rd_fh.fin || impl.rd_done))
1090 {
1091
1092 error_code result;
1093 while(! impl.parse_fh(impl.rd_fh, impl.rd_buf, result))
1094 {
1095 if(result)
1096 {
1097
1098 if(result == error::message_too_big)
1099 code = close_code::too_big;
1100 else
1101 code = close_code::protocol_error;
1102 do_fail(code, result, ec);
1103 return bytes_written;
1104 }
1105 auto const bytes_transferred =
1106 impl.stream().read_some(
1107 impl.rd_buf.prepare(read_size(
1108 impl.rd_buf, impl.rd_buf.max_size())),
1109 ec);
1110 impl.rd_buf.commit(bytes_transferred);
1111 if(impl.check_stop_now(ec))
1112 return bytes_written;
1113 }
1114
1115
1116 if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
1117 detail::mask_inplace(buffers_prefix(
1118 clamp(impl.rd_fh.len), impl.rd_buf.data()),
1119 impl.rd_key);
1120 if(detail::is_control(impl.rd_fh.op))
1121 {
1122
1123 auto const b = buffers_prefix(
1124 clamp(impl.rd_fh.len), impl.rd_buf.data());
1125 auto const len = buffer_bytes(b);
1126 BOOST_ASSERT(len == impl.rd_fh.len);
1127
1128
1129
1130 impl.rd_fh.fin = false;
1131
1132
1133 if(impl.rd_fh.op == detail::opcode::ping)
1134 {
1135 ping_data payload;
1136 detail::read_ping(payload, b);
1137 impl.rd_buf.consume(len);
1138 if(impl.wr_close)
1139 {
1140
1141 goto loop;
1142 }
1143 if(impl.ctrl_cb)
1144 impl.ctrl_cb(frame_type::ping, to_string_view(payload));
1145 detail::frame_buffer fb;
1146 impl.template write_ping<flat_static_buffer_base>(fb,
1147 detail::opcode::pong, payload);
1148 net::write(impl.stream(), fb.data(), ec);
1149 if(impl.check_stop_now(ec))
1150 return bytes_written;
1151 goto loop;
1152 }
1153
1154 if(impl.rd_fh.op == detail::opcode::pong)
1155 {
1156 ping_data payload;
1157 detail::read_ping(payload, b);
1158 impl.rd_buf.consume(len);
1159 if(impl.ctrl_cb)
1160 impl.ctrl_cb(frame_type::pong, to_string_view(payload));
1161 goto loop;
1162 }
1163
1164 BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close);
1165 {
1166 BOOST_ASSERT(! impl.rd_close);
1167 impl.rd_close = true;
1168 close_reason cr;
1169 detail::read_close(cr, b, result);
1170 if(result)
1171 {
1172
1173 do_fail(close_code::protocol_error,
1174 result, ec);
1175 return bytes_written;
1176 }
1177 impl.cr = cr;
1178 impl.rd_buf.consume(len);
1179 if(impl.ctrl_cb)
1180 impl.ctrl_cb(frame_type::close, to_string_view(impl.cr.reason));
1181 BOOST_ASSERT(! impl.wr_close);
1182
1183 do_fail(
1184 cr.code == close_code::none ?
1185 close_code::normal :
1186 static_cast<close_code>(cr.code),
1187 error::closed, ec);
1188 return bytes_written;
1189 }
1190 }
1191 if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin)
1192 {
1193
1194 goto loop;
1195 }
1196 impl.rd_done = false;
1197 }
1198 else
1199 {
1200 ec = {};
1201 }
1202 if(! impl.rd_deflated())
1203 {
1204 if(impl.rd_remain > 0)
1205 {
1206 if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() >
1207 (std::min)(clamp(impl.rd_remain),
1208 buffer_bytes(buffers)))
1209 {
1210
1211
1212 impl.rd_buf.commit(impl.stream().read_some(
1213 impl.rd_buf.prepare(read_size(impl.rd_buf,
1214 impl.rd_buf.max_size())), ec));
1215 if(impl.check_stop_now(ec))
1216 return bytes_written;
1217 if(impl.rd_fh.mask)
1218 detail::mask_inplace(
1219 buffers_prefix(clamp(impl.rd_remain),
1220 impl.rd_buf.data()), impl.rd_key);
1221 }
1222 if(impl.rd_buf.size() > 0)
1223 {
1224
1225
1226 auto const bytes_transferred = net::buffer_copy(
1227 buffers, impl.rd_buf.data(),
1228 clamp(impl.rd_remain));
1229 auto const mb = buffers_prefix(
1230 bytes_transferred, buffers);
1231 impl.rd_remain -= bytes_transferred;
1232 if(impl.rd_op == detail::opcode::text)
1233 {
1234 if(! impl.rd_utf8.write(mb) ||
1235 (impl.rd_remain == 0 && impl.rd_fh.fin &&
1236 ! impl.rd_utf8.finish()))
1237 {
1238
1239 do_fail(close_code::bad_payload,
1240 error::bad_frame_payload, ec);
1241 return bytes_written;
1242 }
1243 }
1244 bytes_written += bytes_transferred;
1245 impl.rd_size += bytes_transferred;
1246 impl.rd_buf.consume(bytes_transferred);
1247 }
1248 else
1249 {
1250
1251 BOOST_ASSERT(impl.rd_remain > 0);
1252 BOOST_ASSERT(buffer_bytes(buffers) > 0);
1253 BOOST_ASSERT(buffer_bytes(buffers_prefix(
1254 clamp(impl.rd_remain), buffers)) > 0);
1255 auto const bytes_transferred =
1256 impl.stream().read_some(buffers_prefix(
1257 clamp(impl.rd_remain), buffers), ec);
1258
1259 if(impl.check_stop_now(ec))
1260 return bytes_written;
1261 BOOST_ASSERT(bytes_transferred > 0);
1262 auto const mb = buffers_prefix(
1263 bytes_transferred, buffers);
1264 impl.rd_remain -= bytes_transferred;
1265 if(impl.rd_fh.mask)
1266 detail::mask_inplace(mb, impl.rd_key);
1267 if(impl.rd_op == detail::opcode::text)
1268 {
1269 if(! impl.rd_utf8.write(mb) ||
1270 (impl.rd_remain == 0 && impl.rd_fh.fin &&
1271 ! impl.rd_utf8.finish()))
1272 {
1273
1274 do_fail(close_code::bad_payload,
1275 error::bad_frame_payload, ec);
1276 return bytes_written;
1277 }
1278 }
1279 bytes_written += bytes_transferred;
1280 impl.rd_size += bytes_transferred;
1281 }
1282 }
1283 BOOST_ASSERT( ! impl.rd_done );
1284 if( impl.rd_remain == 0 && impl.rd_fh.fin )
1285 impl.rd_done = true;
1286 }
1287 else
1288 {
1289
1290
1291
1292
1293 bool did_read = false;
1294 buffers_suffix<MutableBufferSequence> cb(buffers);
1295 while(buffer_bytes(cb) > 0)
1296 {
1297 zlib::z_params zs;
1298 {
1299 auto const out = beast::buffers_front(cb);
1300 zs.next_out = out.data();
1301 zs.avail_out = out.size();
1302 BOOST_ASSERT(zs.avail_out > 0);
1303 }
1304
1305 bool fin = false;
1306 if(impl.rd_remain > 0)
1307 {
1308 if(impl.rd_buf.size() > 0)
1309 {
1310
1311 auto const in = buffers_prefix(
1312 clamp(impl.rd_remain), beast::buffers_front(
1313 impl.rd_buf.data()));
1314 zs.avail_in = in.size();
1315 zs.next_in = in.data();
1316 }
1317 else if(! did_read)
1318 {
1319
1320 auto const bytes_transferred =
1321 impl.stream().read_some(
1322 impl.rd_buf.prepare(read_size(
1323 impl.rd_buf, impl.rd_buf.max_size())),
1324 ec);
1325 if(impl.check_stop_now(ec))
1326 return bytes_written;
1327 BOOST_ASSERT(bytes_transferred > 0);
1328 impl.rd_buf.commit(bytes_transferred);
1329 if(impl.rd_fh.mask)
1330 detail::mask_inplace(
1331 buffers_prefix(clamp(impl.rd_remain),
1332 impl.rd_buf.data()), impl.rd_key);
1333 auto const in = buffers_prefix(
1334 clamp(impl.rd_remain), buffers_front(
1335 impl.rd_buf.data()));
1336 zs.avail_in = in.size();
1337 zs.next_in = in.data();
1338 did_read = true;
1339 }
1340 else
1341 {
1342 break;
1343 }
1344 }
1345 else if(impl.rd_fh.fin)
1346 {
1347
1348 static std::uint8_t constexpr
1349 empty_block[4] = { 0x00, 0x00, 0xff, 0xff };
1350 zs.next_in = empty_block;
1351 zs.avail_in = sizeof(empty_block);
1352 fin = true;
1353 }
1354 else
1355 {
1356 break;
1357 }
1358 impl.inflate(zs, zlib::Flush::sync, ec);
1359 if(impl.check_stop_now(ec))
1360 return bytes_written;
1361 if (fin && zs.total_out == 0) {
1362 impl.do_context_takeover_read(impl.role);
1363 impl.rd_done = true;
1364 break;
1365 }
1366 if(impl.rd_msg_max && beast::detail::sum_exceeds(
1367 impl.rd_size, zs.total_out, impl.rd_msg_max))
1368 {
1369 do_fail(close_code::too_big,
1370 error::message_too_big, ec);
1371 return bytes_written;
1372 }
1373 cb.consume(zs.total_out);
1374 impl.rd_size += zs.total_out;
1375 if (! fin) {
1376 impl.rd_remain -= zs.total_in;
1377 impl.rd_buf.consume(zs.total_in);
1378 }
1379 bytes_written += zs.total_out;
1380 }
1381 if(impl.rd_op == detail::opcode::text)
1382 {
1383
1384 if(! impl.rd_utf8.write(beast::buffers_prefix(
1385 bytes_written, buffers)) || (
1386 impl.rd_done && ! impl.rd_utf8.finish()))
1387 {
1388
1389 do_fail(close_code::bad_payload,
1390 error::bad_frame_payload, ec);
1391 return bytes_written;
1392 }
1393 }
1394 }
1395 return bytes_written;
1396 }
1397
1398 template<class NextLayer, bool deflateSupported>
1399 template<class MutableBufferSequence, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>
1400 BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
1401 stream<NextLayer, deflateSupported>::
1402 async_read_some(
1403 MutableBufferSequence const& buffers,
1404 ReadHandler&& handler)
1405 {
1406 static_assert(is_async_stream<next_layer_type>::value,
1407 "AsyncStream type requirements not met");
1408 static_assert(net::is_mutable_buffer_sequence<
1409 MutableBufferSequence>::value,
1410 "MutableBufferSequence type requirements not met");
1411 return net::async_initiate<
1412 ReadHandler,
1413 void(error_code, std::size_t)>(
1414 run_read_some_op{impl_},
1415 handler,
1416 buffers);
1417 }
1418
1419 }
1420 }
1421 }
1422
1423 #endif