File indexing completed on 2025-09-18 08:36:09
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 #ifndef BOOST_BEAST_WEBSOCKET_IMPL_READ_HPP
0011 #define BOOST_BEAST_WEBSOCKET_IMPL_READ_HPP
0012
0013 #include <boost/beast/core/buffer_traits.hpp>
0014 #include <boost/beast/websocket/teardown.hpp>
0015 #include <boost/beast/websocket/detail/mask.hpp>
0016 #include <boost/beast/websocket/impl/stream_impl.hpp>
0017 #include <boost/beast/core/async_base.hpp>
0018 #include <boost/beast/core/buffers_prefix.hpp>
0019 #include <boost/beast/core/buffers_suffix.hpp>
0020 #include <boost/beast/core/flat_static_buffer.hpp>
0021 #include <boost/beast/core/read_size.hpp>
0022 #include <boost/beast/core/stream_traits.hpp>
0023 #include <boost/beast/core/detail/bind_continuation.hpp>
0024 #include <boost/beast/core/detail/buffer.hpp>
0025 #include <boost/beast/core/detail/clamp.hpp>
0026 #include <boost/beast/core/detail/config.hpp>
0027 #include <boost/asio/coroutine.hpp>
0028 #include <boost/assert.hpp>
0029 #include <boost/config.hpp>
0030 #include <boost/optional.hpp>
0031 #include <boost/throw_exception.hpp>
0032 #include <algorithm>
0033 #include <limits>
0034 #include <memory>
0035
0036 namespace boost {
0037 namespace beast {
0038 namespace websocket {
0039
0040
0041
0042
0043
0044 template<class NextLayer, bool deflateSupported>
0045 template<class Handler, class MutableBufferSequence>
0046 class stream<NextLayer, deflateSupported>::read_some_op
0047 : public beast::async_base<
0048 Handler, beast::executor_type<stream>>
0049 , public asio::coroutine
0050 {
0051 boost::weak_ptr<impl_type> wp_;
0052 MutableBufferSequence bs_;
0053 buffers_suffix<MutableBufferSequence> cb_;
0054 std::size_t bytes_written_ = 0;
0055 error_code result_;
0056 close_code code_;
0057 bool did_read_ = false;
0058
0059 public:
0060 static constexpr int id = 1;
0061
0062 template<class Handler_>
0063 read_some_op(
0064 Handler_&& h,
0065 boost::shared_ptr<impl_type> const& sp,
0066 MutableBufferSequence const& bs)
0067 : async_base<
0068 Handler, beast::executor_type<stream>>(
0069 std::forward<Handler_>(h),
0070 sp->stream().get_executor())
0071 , wp_(sp)
0072 , bs_(bs)
0073 , cb_(bs)
0074 , code_(close_code::none)
0075 {
0076 (*this)({}, 0, false);
0077 }
0078
0079 void operator()(
0080 error_code ec = {},
0081 std::size_t bytes_transferred = 0,
0082 bool cont = true)
0083 {
0084 using beast::detail::clamp;
0085 auto sp = wp_.lock();
0086 if(! sp)
0087 {
0088 BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
0089 bytes_written_ = 0;
0090 return this->complete(cont, ec, bytes_written_);
0091 }
0092 auto& impl = *sp;
0093 BOOST_ASIO_CORO_REENTER(*this)
0094 {
0095 impl.update_timer(this->get_executor());
0096
0097 acquire_read_lock:
0098
0099 if(! impl.rd_block.try_lock(this))
0100 {
0101 do_suspend:
0102 BOOST_ASIO_CORO_YIELD
0103 {
0104 BOOST_ASIO_HANDLER_LOCATION((
0105 __FILE__, __LINE__,
0106 "websocket::async_read_some"));
0107
0108 this->set_allowed_cancellation(net::cancellation_type::all);
0109 impl.op_r_rd.emplace(std::move(*this), net::cancellation_type::all);
0110 }
0111 if (ec)
0112 return this->complete(cont, ec, bytes_written_);
0113
0114 this->set_allowed_cancellation(net::cancellation_type::terminal);
0115
0116 impl.rd_block.lock(this);
0117 BOOST_ASIO_CORO_YIELD
0118 {
0119 BOOST_ASIO_HANDLER_LOCATION((
0120 __FILE__, __LINE__,
0121 "websocket::async_read_some"));
0122
0123 const auto ex = this->get_immediate_executor();
0124 net::dispatch(ex, std::move(*this));
0125 }
0126 BOOST_ASSERT(impl.rd_block.is_locked(this));
0127
0128 BOOST_ASSERT(!ec);
0129 if(impl.check_stop_now(ec))
0130 {
0131
0132
0133
0134
0135
0136 goto upcall;
0137 }
0138
0139
0140
0141
0142 BOOST_ASSERT(impl.wr_close);
0143 BOOST_ASSERT(impl.status_ != status::open);
0144 BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
0145 goto upcall;
0146 }
0147 else
0148 {
0149
0150 if( impl.status_ == status::closed ||
0151 impl.status_ == status::failed)
0152 {
0153 BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
0154 goto upcall;
0155 }
0156 }
0157
0158
0159
0160
0161
0162 loop:
0163 BOOST_ASSERT(impl.rd_block.is_locked(this));
0164
0165
0166
0167
0168 if(impl.rd_remain == 0 &&
0169 (! impl.rd_fh.fin || impl.rd_done))
0170 {
0171
0172 while(! impl.parse_fh(
0173 impl.rd_fh, impl.rd_buf, result_))
0174 {
0175 if(result_)
0176 {
0177
0178 if(result_ == error::message_too_big)
0179 code_ = close_code::too_big;
0180 else
0181 code_ = close_code::protocol_error;
0182 goto close;
0183 }
0184 BOOST_ASSERT(impl.rd_block.is_locked(this));
0185 BOOST_ASIO_CORO_YIELD
0186 {
0187 BOOST_ASIO_HANDLER_LOCATION((
0188 __FILE__, __LINE__,
0189 "websocket::async_read_some"));
0190
0191 impl.stream().async_read_some(
0192 impl.rd_buf.prepare(read_size(
0193 impl.rd_buf, impl.rd_buf.max_size())),
0194 std::move(*this));
0195 }
0196 BOOST_ASSERT(impl.rd_block.is_locked(this));
0197 impl.rd_buf.commit(bytes_transferred);
0198 if(impl.check_stop_now(ec))
0199 goto upcall;
0200 impl.reset_idle();
0201
0202
0203
0204 impl.rd_block.unlock(this);
0205 if( impl.op_r_close.maybe_invoke())
0206 {
0207
0208 BOOST_ASSERT(impl.rd_block.is_locked());
0209 goto do_suspend;
0210 }
0211
0212 impl.rd_block.lock(this);
0213 }
0214
0215
0216 if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
0217 detail::mask_inplace(buffers_prefix(
0218 clamp(impl.rd_fh.len),
0219 impl.rd_buf.data()),
0220 impl.rd_key);
0221 if(detail::is_control(impl.rd_fh.op))
0222 {
0223
0224
0225 impl.rd_fh.fin = false;
0226
0227
0228 if(impl.rd_fh.op == detail::opcode::ping)
0229 {
0230 impl.update_timer(this->get_executor());
0231
0232 if(impl.ctrl_cb)
0233 {
0234 if(! cont)
0235 {
0236 BOOST_ASIO_CORO_YIELD
0237 {
0238 BOOST_ASIO_HANDLER_LOCATION((
0239 __FILE__, __LINE__,
0240 "websocket::async_read_some"));
0241
0242 const auto ex = this->get_immediate_executor();
0243 net::dispatch(ex, std::move(*this));
0244 }
0245 BOOST_ASSERT(cont);
0246
0247 }
0248 }
0249 {
0250 auto const b = buffers_prefix(
0251 clamp(impl.rd_fh.len),
0252 impl.rd_buf.data());
0253 auto const len = buffer_bytes(b);
0254 BOOST_ASSERT(len == impl.rd_fh.len);
0255 ping_data payload;
0256 detail::read_ping(payload, b);
0257 impl.rd_buf.consume(len);
0258
0259 if(impl.status_ == status::closing)
0260 goto loop;
0261 if(impl.ctrl_cb)
0262 impl.ctrl_cb(
0263 frame_type::ping, to_string_view(payload));
0264 impl.rd_fb.clear();
0265 impl.template write_ping<
0266 flat_static_buffer_base>(impl.rd_fb,
0267 detail::opcode::pong, payload);
0268 }
0269
0270
0271
0272 impl.rd_block.unlock(this);
0273 impl.op_r_close.maybe_invoke();
0274
0275
0276 if(! impl.wr_block.try_lock(this))
0277 {
0278 BOOST_ASIO_CORO_YIELD
0279 {
0280 BOOST_ASIO_HANDLER_LOCATION((
0281 __FILE__, __LINE__,
0282 "websocket::async_read_some"));
0283
0284 impl.op_rd.emplace(std::move(*this));
0285 }
0286 if (ec)
0287 return this->complete(cont, ec, bytes_written_);
0288
0289 impl.wr_block.lock(this);
0290 BOOST_ASIO_CORO_YIELD
0291 {
0292 BOOST_ASIO_HANDLER_LOCATION((
0293 __FILE__, __LINE__,
0294 "websocket::async_read_some"));
0295
0296 const auto ex = this->get_immediate_executor();
0297 net::dispatch(ex, std::move(*this));
0298 }
0299 BOOST_ASSERT(impl.wr_block.is_locked(this));
0300 if(impl.check_stop_now(ec))
0301 goto upcall;
0302 }
0303
0304
0305 BOOST_ASSERT(impl.wr_block.is_locked(this));
0306 BOOST_ASIO_CORO_YIELD
0307 {
0308 BOOST_ASIO_HANDLER_LOCATION((
0309 __FILE__, __LINE__,
0310 "websocket::async_read_some"));
0311
0312 net::async_write(
0313 impl.stream(), net::const_buffer(impl.rd_fb.data()),
0314 beast::detail::bind_continuation(std::move(*this)));
0315 }
0316 BOOST_ASSERT(impl.wr_block.is_locked(this));
0317 if(impl.check_stop_now(ec))
0318 goto upcall;
0319 impl.wr_block.unlock(this);
0320 impl.op_close.maybe_invoke()
0321 || impl.op_idle_ping.maybe_invoke()
0322 || impl.op_ping.maybe_invoke()
0323 || impl.op_wr.maybe_invoke();
0324 goto acquire_read_lock;
0325 }
0326
0327
0328 if(impl.rd_fh.op == detail::opcode::pong)
0329 {
0330
0331 if(! impl.wr_close && impl.ctrl_cb)
0332 {
0333 if(! cont)
0334 {
0335 BOOST_ASIO_CORO_YIELD
0336 {
0337 BOOST_ASIO_HANDLER_LOCATION((
0338 __FILE__, __LINE__,
0339 "websocket::async_read_some"));
0340
0341 const auto ex = this->get_immediate_executor();
0342 net::dispatch(ex, std::move(*this));
0343 }
0344 BOOST_ASSERT(cont);
0345 }
0346 }
0347 auto const cb = buffers_prefix(clamp(
0348 impl.rd_fh.len), impl.rd_buf.data());
0349 auto const len = buffer_bytes(cb);
0350 BOOST_ASSERT(len == impl.rd_fh.len);
0351 ping_data payload;
0352 detail::read_ping(payload, cb);
0353 impl.rd_buf.consume(len);
0354
0355 if(! impl.wr_close && impl.ctrl_cb)
0356 impl.ctrl_cb(frame_type::pong, to_string_view(payload));
0357 goto loop;
0358 }
0359
0360
0361 BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close);
0362 {
0363 if(impl.ctrl_cb)
0364 {
0365 if(! cont)
0366 {
0367 BOOST_ASIO_CORO_YIELD
0368 {
0369 BOOST_ASIO_HANDLER_LOCATION((
0370 __FILE__, __LINE__,
0371 "websocket::async_read_some"));
0372
0373 const auto ex = this->get_immediate_executor();
0374 net::dispatch(ex, std::move(*this));
0375 }
0376 BOOST_ASSERT(cont);
0377 }
0378 }
0379 auto const cb = buffers_prefix(clamp(
0380 impl.rd_fh.len), impl.rd_buf.data());
0381 auto const len = buffer_bytes(cb);
0382 BOOST_ASSERT(len == impl.rd_fh.len);
0383 BOOST_ASSERT(! impl.rd_close);
0384 impl.rd_close = true;
0385 close_reason cr;
0386 detail::read_close(cr, cb, result_);
0387 if(result_)
0388 {
0389
0390 code_ = close_code::protocol_error;
0391 goto close;
0392 }
0393 impl.cr = cr;
0394 impl.rd_buf.consume(len);
0395 if(impl.ctrl_cb)
0396 impl.ctrl_cb(frame_type::close,
0397 to_string_view(impl.cr.reason));
0398
0399 if(impl.status_ == status::closing)
0400 {
0401
0402 BOOST_ASSERT(impl.wr_close);
0403 code_ = close_code::none;
0404 result_ = error::closed;
0405 goto close;
0406 }
0407
0408 code_ = cr.code == close_code::none ?
0409 close_code::normal :
0410 static_cast<close_code>(cr.code);
0411 result_ = error::closed;
0412 goto close;
0413 }
0414 }
0415 if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin)
0416 {
0417
0418 goto loop;
0419 }
0420 impl.rd_done = false;
0421 }
0422 if(! impl.rd_deflated())
0423 {
0424 if(impl.rd_remain > 0)
0425 {
0426 if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() >
0427 (std::min)(clamp(impl.rd_remain),
0428 buffer_bytes(cb_)))
0429 {
0430
0431
0432 BOOST_ASIO_CORO_YIELD
0433 {
0434 BOOST_ASIO_HANDLER_LOCATION((
0435 __FILE__, __LINE__,
0436 "websocket::async_read_some"));
0437
0438 impl.stream().async_read_some(
0439 impl.rd_buf.prepare(read_size(
0440 impl.rd_buf, impl.rd_buf.max_size())),
0441 std::move(*this));
0442 }
0443 impl.rd_buf.commit(bytes_transferred);
0444 if(impl.check_stop_now(ec))
0445 goto upcall;
0446 impl.reset_idle();
0447 if(impl.rd_fh.mask)
0448 detail::mask_inplace(buffers_prefix(clamp(
0449 impl.rd_remain), impl.rd_buf.data()),
0450 impl.rd_key);
0451 }
0452 if(impl.rd_buf.size() > 0)
0453 {
0454
0455
0456 bytes_transferred = net::buffer_copy(cb_,
0457 impl.rd_buf.data(), clamp(impl.rd_remain));
0458 auto const mb = buffers_prefix(
0459 bytes_transferred, cb_);
0460 impl.rd_remain -= bytes_transferred;
0461 if(impl.rd_op == detail::opcode::text)
0462 {
0463 if(! impl.rd_utf8.write(mb) ||
0464 (impl.rd_remain == 0 && impl.rd_fh.fin &&
0465 ! impl.rd_utf8.finish()))
0466 {
0467
0468 code_ = close_code::bad_payload;
0469 result_ = error::bad_frame_payload;
0470 goto close;
0471 }
0472 }
0473 bytes_written_ += bytes_transferred;
0474 impl.rd_size += bytes_transferred;
0475 impl.rd_buf.consume(bytes_transferred);
0476 }
0477 else
0478 {
0479
0480 BOOST_ASSERT(impl.rd_remain > 0);
0481 BOOST_ASSERT(buffer_bytes(cb_) > 0);
0482 BOOST_ASSERT(buffer_bytes(buffers_prefix(
0483 clamp(impl.rd_remain), cb_)) > 0);
0484 BOOST_ASIO_CORO_YIELD
0485 {
0486 BOOST_ASIO_HANDLER_LOCATION((
0487 __FILE__, __LINE__,
0488 "websocket::async_read_some"));
0489
0490 impl.stream().async_read_some(buffers_prefix(
0491 clamp(impl.rd_remain), cb_), std::move(*this));
0492 }
0493 if(impl.check_stop_now(ec))
0494 goto upcall;
0495 impl.reset_idle();
0496 BOOST_ASSERT(bytes_transferred > 0);
0497 auto const mb = buffers_prefix(
0498 bytes_transferred, cb_);
0499 impl.rd_remain -= bytes_transferred;
0500 if(impl.rd_fh.mask)
0501 detail::mask_inplace(mb, impl.rd_key);
0502 if(impl.rd_op == detail::opcode::text)
0503 {
0504 if(! impl.rd_utf8.write(mb) ||
0505 (impl.rd_remain == 0 && impl.rd_fh.fin &&
0506 ! impl.rd_utf8.finish()))
0507 {
0508
0509 code_ = close_code::bad_payload;
0510 result_ = error::bad_frame_payload;
0511 goto close;
0512 }
0513 }
0514 bytes_written_ += bytes_transferred;
0515 impl.rd_size += bytes_transferred;
0516 }
0517 }
0518 BOOST_ASSERT( ! impl.rd_done );
0519 if( impl.rd_remain == 0 && impl.rd_fh.fin )
0520 impl.rd_done = true;
0521 }
0522 else
0523 {
0524
0525
0526
0527 while(buffer_bytes(cb_) > 0)
0528 {
0529 if( impl.rd_remain > 0 &&
0530 impl.rd_buf.size() == 0 &&
0531 ! did_read_)
0532 {
0533
0534 BOOST_ASIO_CORO_YIELD
0535 {
0536 BOOST_ASIO_HANDLER_LOCATION((
0537 __FILE__, __LINE__,
0538 "websocket::async_read_some"));
0539
0540 impl.stream().async_read_some(
0541 impl.rd_buf.prepare(read_size(
0542 impl.rd_buf, impl.rd_buf.max_size())),
0543 std::move(*this));
0544 }
0545 if(impl.check_stop_now(ec))
0546 goto upcall;
0547 impl.reset_idle();
0548 BOOST_ASSERT(bytes_transferred > 0);
0549 impl.rd_buf.commit(bytes_transferred);
0550 if(impl.rd_fh.mask)
0551 detail::mask_inplace(
0552 buffers_prefix(clamp(impl.rd_remain),
0553 impl.rd_buf.data()), impl.rd_key);
0554 did_read_ = true;
0555 }
0556 zlib::z_params zs;
0557 {
0558 auto const out = buffers_front(cb_);
0559 zs.next_out = out.data();
0560 zs.avail_out = out.size();
0561 BOOST_ASSERT(zs.avail_out > 0);
0562 }
0563
0564 bool fin = false;
0565 if(impl.rd_remain > 0)
0566 {
0567 if(impl.rd_buf.size() > 0)
0568 {
0569
0570 auto const in = buffers_prefix(
0571 clamp(impl.rd_remain), buffers_front(
0572 impl.rd_buf.data()));
0573 zs.avail_in = in.size();
0574 zs.next_in = in.data();
0575 }
0576 else
0577 {
0578 break;
0579 }
0580 }
0581 else if(impl.rd_fh.fin)
0582 {
0583
0584 static std::uint8_t constexpr
0585 empty_block[4] = { 0x00, 0x00, 0xff, 0xff };
0586 zs.next_in = empty_block;
0587 zs.avail_in = sizeof(empty_block);
0588 fin = true;
0589 }
0590 else
0591 {
0592 break;
0593 }
0594 impl.inflate(zs, zlib::Flush::sync, ec);
0595 if(impl.check_stop_now(ec))
0596 goto upcall;
0597 if(fin && zs.total_out == 0) {
0598 impl.do_context_takeover_read(impl.role);
0599 impl.rd_done = true;
0600 break;
0601 }
0602 if(impl.rd_msg_max && beast::detail::sum_exceeds(
0603 impl.rd_size, zs.total_out, impl.rd_msg_max))
0604 {
0605
0606 code_ = close_code::too_big;
0607 result_ = error::message_too_big;
0608 goto close;
0609 }
0610 cb_.consume(zs.total_out);
0611 impl.rd_size += zs.total_out;
0612 if (! fin) {
0613 impl.rd_remain -= zs.total_in;
0614 impl.rd_buf.consume(zs.total_in);
0615 }
0616 bytes_written_ += zs.total_out;
0617 }
0618 if(impl.rd_op == detail::opcode::text)
0619 {
0620
0621 if(! impl.rd_utf8.write(
0622 buffers_prefix(bytes_written_, bs_)) || (
0623 impl.rd_done && ! impl.rd_utf8.finish()))
0624 {
0625
0626 code_ = close_code::bad_payload;
0627 result_ = error::bad_frame_payload;
0628 goto close;
0629 }
0630 }
0631 }
0632 goto upcall;
0633
0634 close:
0635
0636 if(! impl.wr_block.try_lock(this))
0637 {
0638 BOOST_ASIO_CORO_YIELD
0639 {
0640 BOOST_ASIO_HANDLER_LOCATION((
0641 __FILE__, __LINE__,
0642 "websocket::async_read_some"));
0643
0644 impl.op_rd.emplace(std::move(*this));
0645 }
0646 if (ec)
0647 return this->complete(cont, ec, bytes_written_);
0648
0649 impl.wr_block.lock(this);
0650 BOOST_ASIO_CORO_YIELD
0651 {
0652 BOOST_ASIO_HANDLER_LOCATION((
0653 __FILE__, __LINE__,
0654 "websocket::async_read_some"));
0655
0656 const auto ex = this->get_immediate_executor();
0657 net::dispatch(ex, std::move(*this));
0658 }
0659 BOOST_ASSERT(impl.wr_block.is_locked(this));
0660 if(impl.check_stop_now(ec))
0661 goto upcall;
0662 }
0663
0664 impl.change_status(status::closing);
0665
0666 if(! impl.wr_close)
0667 {
0668 impl.wr_close = true;
0669
0670
0671 impl.rd_fb.clear();
0672 impl.template write_close<
0673 flat_static_buffer_base>(
0674 impl.rd_fb, code_);
0675
0676
0677 BOOST_ASSERT(impl.wr_block.is_locked(this));
0678 BOOST_ASIO_CORO_YIELD
0679 {
0680 BOOST_ASIO_HANDLER_LOCATION((
0681 __FILE__, __LINE__,
0682 "websocket::async_read_some"));
0683
0684 net::async_write(impl.stream(), net::const_buffer(impl.rd_fb.data()),
0685 beast::detail::bind_continuation(std::move(*this)));
0686 }
0687 BOOST_ASSERT(impl.wr_block.is_locked(this));
0688 if(impl.check_stop_now(ec))
0689 goto upcall;
0690 }
0691
0692
0693 using beast::websocket::async_teardown;
0694 BOOST_ASSERT(impl.wr_block.is_locked(this));
0695 BOOST_ASIO_CORO_YIELD
0696 {
0697 BOOST_ASIO_HANDLER_LOCATION((
0698 __FILE__, __LINE__,
0699 "websocket::async_read_some"));
0700
0701 async_teardown(impl.role, impl.stream(),
0702 beast::detail::bind_continuation(std::move(*this)));
0703 }
0704 BOOST_ASSERT(impl.wr_block.is_locked(this));
0705 if(ec == net::error::eof)
0706 {
0707
0708
0709 ec = {};
0710 }
0711 if(! ec)
0712 {
0713 BOOST_BEAST_ASSIGN_EC(ec, result_);
0714 }
0715 if(ec && ec != error::closed)
0716 impl.change_status(status::failed);
0717 else
0718 impl.change_status(status::closed);
0719 impl.close();
0720
0721 upcall:
0722 impl.rd_block.try_unlock(this);
0723 impl.op_r_close.maybe_invoke();
0724 if(impl.wr_block.try_unlock(this))
0725 impl.op_close.maybe_invoke()
0726 || impl.op_idle_ping.maybe_invoke()
0727 || impl.op_ping.maybe_invoke()
0728 || impl.op_wr.maybe_invoke();
0729 this->complete(cont, ec, bytes_written_);
0730 }
0731 }
0732 };
0733
0734
0735
0736 template<class NextLayer, bool deflateSupported>
0737 template<class Handler, class DynamicBuffer>
0738 class stream<NextLayer, deflateSupported>::read_op
0739 : public beast::async_base<
0740 Handler, beast::executor_type<stream>>
0741 , public asio::coroutine
0742 {
0743 boost::weak_ptr<impl_type> wp_;
0744 DynamicBuffer& b_;
0745 std::size_t limit_;
0746 std::size_t bytes_written_ = 0;
0747 bool some_;
0748
0749 public:
0750 template<class Handler_>
0751 read_op(
0752 Handler_&& h,
0753 boost::shared_ptr<impl_type> const& sp,
0754 DynamicBuffer& b,
0755 std::size_t limit,
0756 bool some)
0757 : async_base<Handler,
0758 beast::executor_type<stream>>(
0759 std::forward<Handler_>(h),
0760 sp->stream().get_executor())
0761 , wp_(sp)
0762 , b_(b)
0763 , limit_(limit ? limit : (
0764 std::numeric_limits<std::size_t>::max)())
0765 , some_(some)
0766 {
0767 (*this)({}, 0, false);
0768 }
0769
0770 void operator()(
0771 error_code ec = {},
0772 std::size_t bytes_transferred = 0,
0773 bool cont = true)
0774 {
0775 using beast::detail::clamp;
0776 auto sp = wp_.lock();
0777 if(! sp)
0778 {
0779 BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
0780 bytes_written_ = 0;
0781 return this->complete(cont, ec, bytes_written_);
0782 }
0783 auto& impl = *sp;
0784 using mutable_buffers_type = typename
0785 DynamicBuffer::mutable_buffers_type;
0786 BOOST_ASIO_CORO_REENTER(*this)
0787 {
0788 do
0789 {
0790
0791 BOOST_ASIO_CORO_YIELD
0792 {
0793 auto mb = beast::detail::dynamic_buffer_prepare(b_,
0794 clamp(impl.read_size_hint_db(b_), limit_),
0795 ec, error::buffer_overflow);
0796 if(impl.check_stop_now(ec))
0797 goto upcall;
0798
0799 BOOST_ASIO_HANDLER_LOCATION((
0800 __FILE__, __LINE__,
0801 "websocket::async_read"));
0802
0803 read_some_op<read_op, mutable_buffers_type>(
0804 std::move(*this), sp, *mb);
0805 }
0806
0807 b_.commit(bytes_transferred);
0808 bytes_written_ += bytes_transferred;
0809 if(ec)
0810 goto upcall;
0811 }
0812 while(! some_ && ! impl.rd_done);
0813
0814 upcall:
0815 this->complete(cont, ec, bytes_written_);
0816 }
0817 }
0818 };
0819
0820 template<class NextLayer, bool deflateSupported>
0821 struct stream<NextLayer, deflateSupported>::
0822 run_read_some_op
0823 {
0824 boost::shared_ptr<impl_type> const& self;
0825
0826 using executor_type = typename stream::executor_type;
0827
0828 executor_type
0829 get_executor() const noexcept
0830 {
0831 return self->stream().get_executor();
0832 }
0833
0834 template<
0835 class ReadHandler,
0836 class MutableBufferSequence>
0837 void
0838 operator()(
0839 ReadHandler&& h,
0840 MutableBufferSequence const& b)
0841 {
0842
0843
0844
0845
0846 static_assert(
0847 beast::detail::is_invocable<ReadHandler,
0848 void(error_code, std::size_t)>::value,
0849 "ReadHandler type requirements not met");
0850
0851 read_some_op<
0852 typename std::decay<ReadHandler>::type,
0853 MutableBufferSequence>(
0854 std::forward<ReadHandler>(h),
0855 self,
0856 b);
0857 }
0858 };
0859
0860 template<class NextLayer, bool deflateSupported>
0861 struct stream<NextLayer, deflateSupported>::
0862 run_read_op
0863 {
0864 boost::shared_ptr<impl_type> const& self;
0865
0866 using executor_type = typename stream::executor_type;
0867
0868 executor_type
0869 get_executor() const noexcept
0870 {
0871 return self->stream().get_executor();
0872 }
0873
0874 template<
0875 class ReadHandler,
0876 class DynamicBuffer>
0877 void
0878 operator()(
0879 ReadHandler&& h,
0880 DynamicBuffer* b,
0881 std::size_t limit,
0882 bool some)
0883 {
0884
0885
0886
0887
0888 static_assert(
0889 beast::detail::is_invocable<ReadHandler,
0890 void(error_code, std::size_t)>::value,
0891 "ReadHandler type requirements not met");
0892
0893 read_op<
0894 typename std::decay<ReadHandler>::type,
0895 DynamicBuffer>(
0896 std::forward<ReadHandler>(h),
0897 self,
0898 *b,
0899 limit,
0900 some);
0901 }
0902 };
0903
0904
0905
0906 template<class NextLayer, bool deflateSupported>
0907 template<class DynamicBuffer>
0908 std::size_t
0909 stream<NextLayer, deflateSupported>::
0910 read(DynamicBuffer& buffer)
0911 {
0912 static_assert(is_sync_stream<next_layer_type>::value,
0913 "SyncStream type requirements not met");
0914 static_assert(
0915 net::is_dynamic_buffer<DynamicBuffer>::value,
0916 "DynamicBuffer type requirements not met");
0917 error_code ec;
0918 auto const bytes_written = read(buffer, ec);
0919 if(ec)
0920 BOOST_THROW_EXCEPTION(system_error{ec});
0921 return bytes_written;
0922 }
0923
0924 template<class NextLayer, bool deflateSupported>
0925 template<class DynamicBuffer>
0926 std::size_t
0927 stream<NextLayer, deflateSupported>::
0928 read(DynamicBuffer& buffer, error_code& ec)
0929 {
0930 static_assert(is_sync_stream<next_layer_type>::value,
0931 "SyncStream type requirements not met");
0932 static_assert(
0933 net::is_dynamic_buffer<DynamicBuffer>::value,
0934 "DynamicBuffer type requirements not met");
0935 std::size_t bytes_written = 0;
0936 do
0937 {
0938 bytes_written += read_some(buffer, 0, ec);
0939 if(ec)
0940 return bytes_written;
0941 }
0942 while(! is_message_done());
0943 return bytes_written;
0944 }
0945
0946 template<class NextLayer, bool deflateSupported>
0947 template<class DynamicBuffer, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>
0948 BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
0949 stream<NextLayer, deflateSupported>::
0950 async_read(DynamicBuffer& buffer, ReadHandler&& handler)
0951 {
0952 static_assert(is_async_stream<next_layer_type>::value,
0953 "AsyncStream type requirements not met");
0954 static_assert(
0955 net::is_dynamic_buffer<DynamicBuffer>::value,
0956 "DynamicBuffer type requirements not met");
0957 return net::async_initiate<
0958 ReadHandler,
0959 void(error_code, std::size_t)>(
0960 run_read_op{impl_},
0961 handler,
0962 &buffer,
0963 0,
0964 false);
0965 }
0966
0967
0968
0969 template<class NextLayer, bool deflateSupported>
0970 template<class DynamicBuffer>
0971 std::size_t
0972 stream<NextLayer, deflateSupported>::
0973 read_some(
0974 DynamicBuffer& buffer,
0975 std::size_t limit)
0976 {
0977 static_assert(is_sync_stream<next_layer_type>::value,
0978 "SyncStream type requirements not met");
0979 static_assert(
0980 net::is_dynamic_buffer<DynamicBuffer>::value,
0981 "DynamicBuffer type requirements not met");
0982 error_code ec;
0983 auto const bytes_written =
0984 read_some(buffer, limit, ec);
0985 if(ec)
0986 BOOST_THROW_EXCEPTION(system_error{ec});
0987 return bytes_written;
0988 }
0989
0990 template<class NextLayer, bool deflateSupported>
0991 template<class DynamicBuffer>
0992 std::size_t
0993 stream<NextLayer, deflateSupported>::
0994 read_some(
0995 DynamicBuffer& buffer,
0996 std::size_t limit,
0997 error_code& ec)
0998 {
0999 static_assert(is_sync_stream<next_layer_type>::value,
1000 "SyncStream type requirements not met");
1001 static_assert(
1002 net::is_dynamic_buffer<DynamicBuffer>::value,
1003 "DynamicBuffer type requirements not met");
1004 using beast::detail::clamp;
1005 if(! limit)
1006 limit = (std::numeric_limits<std::size_t>::max)();
1007 auto const size =
1008 clamp(impl_->read_size_hint_db(buffer), limit);
1009 BOOST_ASSERT(size > 0);
1010 auto mb = beast::detail::dynamic_buffer_prepare(
1011 buffer, size, ec, error::buffer_overflow);
1012 if(impl_->check_stop_now(ec))
1013 return 0;
1014 auto const bytes_written = read_some(*mb, ec);
1015 buffer.commit(bytes_written);
1016 return bytes_written;
1017 }
1018
1019 template<class NextLayer, bool deflateSupported>
1020 template<class DynamicBuffer, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>
1021 BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
1022 stream<NextLayer, deflateSupported>::
1023 async_read_some(
1024 DynamicBuffer& buffer,
1025 std::size_t limit,
1026 ReadHandler&& handler)
1027 {
1028 static_assert(is_async_stream<next_layer_type>::value,
1029 "AsyncStream type requirements not met");
1030 static_assert(
1031 net::is_dynamic_buffer<DynamicBuffer>::value,
1032 "DynamicBuffer type requirements not met");
1033 return net::async_initiate<
1034 ReadHandler,
1035 void(error_code, std::size_t)>(
1036 run_read_op{impl_},
1037 handler,
1038 &buffer,
1039 limit,
1040 true);
1041 }
1042
1043
1044
1045 template<class NextLayer, bool deflateSupported>
1046 template<class MutableBufferSequence>
1047 std::size_t
1048 stream<NextLayer, deflateSupported>::
1049 read_some(
1050 MutableBufferSequence const& buffers)
1051 {
1052 static_assert(is_sync_stream<next_layer_type>::value,
1053 "SyncStream type requirements not met");
1054 static_assert(net::is_mutable_buffer_sequence<
1055 MutableBufferSequence>::value,
1056 "MutableBufferSequence type requirements not met");
1057 error_code ec;
1058 auto const bytes_written = read_some(buffers, ec);
1059 if(ec)
1060 BOOST_THROW_EXCEPTION(system_error{ec});
1061 return bytes_written;
1062 }
1063
1064 template<class NextLayer, bool deflateSupported>
1065 template<class MutableBufferSequence>
1066 std::size_t
1067 stream<NextLayer, deflateSupported>::
1068 read_some(
1069 MutableBufferSequence const& buffers,
1070 error_code& ec)
1071 {
1072 static_assert(is_sync_stream<next_layer_type>::value,
1073 "SyncStream type requirements not met");
1074 static_assert(net::is_mutable_buffer_sequence<
1075 MutableBufferSequence>::value,
1076 "MutableBufferSequence type requirements not met");
1077 using beast::detail::clamp;
1078 auto& impl = *impl_;
1079 close_code code{};
1080 std::size_t bytes_written = 0;
1081 ec = {};
1082
1083 if(impl.check_stop_now(ec))
1084 return bytes_written;
1085 loop:
1086
1087
1088
1089
1090 if(impl.rd_remain == 0 && (
1091 ! impl.rd_fh.fin || impl.rd_done))
1092 {
1093
1094 error_code result;
1095 while(! impl.parse_fh(impl.rd_fh, impl.rd_buf, result))
1096 {
1097 if(result)
1098 {
1099
1100 if(result == error::message_too_big)
1101 code = close_code::too_big;
1102 else
1103 code = close_code::protocol_error;
1104 do_fail(code, result, ec);
1105 return bytes_written;
1106 }
1107 auto const bytes_transferred =
1108 impl.stream().read_some(
1109 impl.rd_buf.prepare(read_size(
1110 impl.rd_buf, impl.rd_buf.max_size())),
1111 ec);
1112 impl.rd_buf.commit(bytes_transferred);
1113 if(impl.check_stop_now(ec))
1114 return bytes_written;
1115 }
1116
1117
1118 if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
1119 detail::mask_inplace(buffers_prefix(
1120 clamp(impl.rd_fh.len), impl.rd_buf.data()),
1121 impl.rd_key);
1122 if(detail::is_control(impl.rd_fh.op))
1123 {
1124
1125 auto const b = buffers_prefix(
1126 clamp(impl.rd_fh.len), impl.rd_buf.data());
1127 auto const len = buffer_bytes(b);
1128 BOOST_ASSERT(len == impl.rd_fh.len);
1129
1130
1131
1132 impl.rd_fh.fin = false;
1133
1134
1135 if(impl.rd_fh.op == detail::opcode::ping)
1136 {
1137 ping_data payload;
1138 detail::read_ping(payload, b);
1139 impl.rd_buf.consume(len);
1140 if(impl.wr_close)
1141 {
1142
1143 goto loop;
1144 }
1145 if(impl.ctrl_cb)
1146 impl.ctrl_cb(frame_type::ping, to_string_view(payload));
1147 detail::frame_buffer fb;
1148 impl.template write_ping<flat_static_buffer_base>(fb,
1149 detail::opcode::pong, payload);
1150 net::write(impl.stream(), fb.data(), ec);
1151 if(impl.check_stop_now(ec))
1152 return bytes_written;
1153 goto loop;
1154 }
1155
1156 if(impl.rd_fh.op == detail::opcode::pong)
1157 {
1158 ping_data payload;
1159 detail::read_ping(payload, b);
1160 impl.rd_buf.consume(len);
1161 if(impl.ctrl_cb)
1162 impl.ctrl_cb(frame_type::pong, to_string_view(payload));
1163 goto loop;
1164 }
1165
1166 BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close);
1167 {
1168 BOOST_ASSERT(! impl.rd_close);
1169 impl.rd_close = true;
1170 close_reason cr;
1171 detail::read_close(cr, b, result);
1172 if(result)
1173 {
1174
1175 do_fail(close_code::protocol_error,
1176 result, ec);
1177 return bytes_written;
1178 }
1179 impl.cr = cr;
1180 impl.rd_buf.consume(len);
1181 if(impl.ctrl_cb)
1182 impl.ctrl_cb(frame_type::close, to_string_view(impl.cr.reason));
1183 BOOST_ASSERT(! impl.wr_close);
1184
1185 do_fail(
1186 cr.code == close_code::none ?
1187 close_code::normal :
1188 static_cast<close_code>(cr.code),
1189 error::closed, ec);
1190 return bytes_written;
1191 }
1192 }
1193 if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin)
1194 {
1195
1196 goto loop;
1197 }
1198 impl.rd_done = false;
1199 }
1200 else
1201 {
1202 ec = {};
1203 }
1204 if(! impl.rd_deflated())
1205 {
1206 if(impl.rd_remain > 0)
1207 {
1208 if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() >
1209 (std::min)(clamp(impl.rd_remain),
1210 buffer_bytes(buffers)))
1211 {
1212
1213
1214 impl.rd_buf.commit(impl.stream().read_some(
1215 impl.rd_buf.prepare(read_size(impl.rd_buf,
1216 impl.rd_buf.max_size())), ec));
1217 if(impl.check_stop_now(ec))
1218 return bytes_written;
1219 if(impl.rd_fh.mask)
1220 detail::mask_inplace(
1221 buffers_prefix(clamp(impl.rd_remain),
1222 impl.rd_buf.data()), impl.rd_key);
1223 }
1224 if(impl.rd_buf.size() > 0)
1225 {
1226
1227
1228 auto const bytes_transferred = net::buffer_copy(
1229 buffers, impl.rd_buf.data(),
1230 clamp(impl.rd_remain));
1231 auto const mb = buffers_prefix(
1232 bytes_transferred, buffers);
1233 impl.rd_remain -= bytes_transferred;
1234 if(impl.rd_op == detail::opcode::text)
1235 {
1236 if(! impl.rd_utf8.write(mb) ||
1237 (impl.rd_remain == 0 && impl.rd_fh.fin &&
1238 ! impl.rd_utf8.finish()))
1239 {
1240
1241 do_fail(close_code::bad_payload,
1242 error::bad_frame_payload, ec);
1243 return bytes_written;
1244 }
1245 }
1246 bytes_written += bytes_transferred;
1247 impl.rd_size += bytes_transferred;
1248 impl.rd_buf.consume(bytes_transferred);
1249 }
1250 else
1251 {
1252
1253 BOOST_ASSERT(impl.rd_remain > 0);
1254 BOOST_ASSERT(buffer_bytes(buffers) > 0);
1255 BOOST_ASSERT(buffer_bytes(buffers_prefix(
1256 clamp(impl.rd_remain), buffers)) > 0);
1257 auto const bytes_transferred =
1258 impl.stream().read_some(buffers_prefix(
1259 clamp(impl.rd_remain), buffers), ec);
1260
1261 if(impl.check_stop_now(ec))
1262 return bytes_written;
1263 BOOST_ASSERT(bytes_transferred > 0);
1264 auto const mb = buffers_prefix(
1265 bytes_transferred, buffers);
1266 impl.rd_remain -= bytes_transferred;
1267 if(impl.rd_fh.mask)
1268 detail::mask_inplace(mb, impl.rd_key);
1269 if(impl.rd_op == detail::opcode::text)
1270 {
1271 if(! impl.rd_utf8.write(mb) ||
1272 (impl.rd_remain == 0 && impl.rd_fh.fin &&
1273 ! impl.rd_utf8.finish()))
1274 {
1275
1276 do_fail(close_code::bad_payload,
1277 error::bad_frame_payload, ec);
1278 return bytes_written;
1279 }
1280 }
1281 bytes_written += bytes_transferred;
1282 impl.rd_size += bytes_transferred;
1283 }
1284 }
1285 BOOST_ASSERT( ! impl.rd_done );
1286 if( impl.rd_remain == 0 && impl.rd_fh.fin )
1287 impl.rd_done = true;
1288 }
1289 else
1290 {
1291
1292
1293
1294
1295 bool did_read = false;
1296 buffers_suffix<MutableBufferSequence> cb(buffers);
1297 while(buffer_bytes(cb) > 0)
1298 {
1299 zlib::z_params zs;
1300 {
1301 auto const out = beast::buffers_front(cb);
1302 zs.next_out = out.data();
1303 zs.avail_out = out.size();
1304 BOOST_ASSERT(zs.avail_out > 0);
1305 }
1306
1307 bool fin = false;
1308 if(impl.rd_remain > 0)
1309 {
1310 if(impl.rd_buf.size() > 0)
1311 {
1312
1313 auto const in = buffers_prefix(
1314 clamp(impl.rd_remain), beast::buffers_front(
1315 impl.rd_buf.data()));
1316 zs.avail_in = in.size();
1317 zs.next_in = in.data();
1318 }
1319 else if(! did_read)
1320 {
1321
1322 auto const bytes_transferred =
1323 impl.stream().read_some(
1324 impl.rd_buf.prepare(read_size(
1325 impl.rd_buf, impl.rd_buf.max_size())),
1326 ec);
1327 if(impl.check_stop_now(ec))
1328 return bytes_written;
1329 BOOST_ASSERT(bytes_transferred > 0);
1330 impl.rd_buf.commit(bytes_transferred);
1331 if(impl.rd_fh.mask)
1332 detail::mask_inplace(
1333 buffers_prefix(clamp(impl.rd_remain),
1334 impl.rd_buf.data()), impl.rd_key);
1335 auto const in = buffers_prefix(
1336 clamp(impl.rd_remain), buffers_front(
1337 impl.rd_buf.data()));
1338 zs.avail_in = in.size();
1339 zs.next_in = in.data();
1340 did_read = true;
1341 }
1342 else
1343 {
1344 break;
1345 }
1346 }
1347 else if(impl.rd_fh.fin)
1348 {
1349
1350 static std::uint8_t constexpr
1351 empty_block[4] = { 0x00, 0x00, 0xff, 0xff };
1352 zs.next_in = empty_block;
1353 zs.avail_in = sizeof(empty_block);
1354 fin = true;
1355 }
1356 else
1357 {
1358 break;
1359 }
1360 impl.inflate(zs, zlib::Flush::sync, ec);
1361 if(impl.check_stop_now(ec))
1362 return bytes_written;
1363 if (fin && zs.total_out == 0) {
1364 impl.do_context_takeover_read(impl.role);
1365 impl.rd_done = true;
1366 break;
1367 }
1368 if(impl.rd_msg_max && beast::detail::sum_exceeds(
1369 impl.rd_size, zs.total_out, impl.rd_msg_max))
1370 {
1371 do_fail(close_code::too_big,
1372 error::message_too_big, ec);
1373 return bytes_written;
1374 }
1375 cb.consume(zs.total_out);
1376 impl.rd_size += zs.total_out;
1377 if (! fin) {
1378 impl.rd_remain -= zs.total_in;
1379 impl.rd_buf.consume(zs.total_in);
1380 }
1381 bytes_written += zs.total_out;
1382 }
1383 if(impl.rd_op == detail::opcode::text)
1384 {
1385
1386 if(! impl.rd_utf8.write(beast::buffers_prefix(
1387 bytes_written, buffers)) || (
1388 impl.rd_done && ! impl.rd_utf8.finish()))
1389 {
1390
1391 do_fail(close_code::bad_payload,
1392 error::bad_frame_payload, ec);
1393 return bytes_written;
1394 }
1395 }
1396 }
1397 return bytes_written;
1398 }
1399
1400 template<class NextLayer, bool deflateSupported>
1401 template<class MutableBufferSequence, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>
1402 BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
1403 stream<NextLayer, deflateSupported>::
1404 async_read_some(
1405 MutableBufferSequence const& buffers,
1406 ReadHandler&& handler)
1407 {
1408 static_assert(is_async_stream<next_layer_type>::value,
1409 "AsyncStream type requirements not met");
1410 static_assert(net::is_mutable_buffer_sequence<
1411 MutableBufferSequence>::value,
1412 "MutableBufferSequence type requirements not met");
1413 return net::async_initiate<
1414 ReadHandler,
1415 void(error_code, std::size_t)>(
1416 run_read_some_op{impl_},
1417 handler,
1418 buffers);
1419 }
1420
1421 }
1422 }
1423 }
1424
1425 #endif