File indexing completed on 2025-01-18 09:29:34
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 #ifndef BOOST_BEAST_WEBSOCKET_IMPL_READ_HPP
0011 #define BOOST_BEAST_WEBSOCKET_IMPL_READ_HPP
0012
0013 #include <boost/beast/core/buffer_traits.hpp>
0014 #include <boost/beast/websocket/teardown.hpp>
0015 #include <boost/beast/websocket/detail/mask.hpp>
0016 #include <boost/beast/websocket/impl/stream_impl.hpp>
0017 #include <boost/beast/core/async_base.hpp>
0018 #include <boost/beast/core/bind_handler.hpp>
0019 #include <boost/beast/core/buffers_prefix.hpp>
0020 #include <boost/beast/core/buffers_suffix.hpp>
0021 #include <boost/beast/core/flat_static_buffer.hpp>
0022 #include <boost/beast/core/read_size.hpp>
0023 #include <boost/beast/core/stream_traits.hpp>
0024 #include <boost/beast/core/detail/bind_continuation.hpp>
0025 #include <boost/beast/core/detail/buffer.hpp>
0026 #include <boost/beast/core/detail/clamp.hpp>
0027 #include <boost/beast/core/detail/config.hpp>
0028 #include <boost/asio/coroutine.hpp>
0029 #include <boost/assert.hpp>
0030 #include <boost/config.hpp>
0031 #include <boost/optional.hpp>
0032 #include <boost/throw_exception.hpp>
0033 #include <algorithm>
0034 #include <limits>
0035 #include <memory>
0036
0037 namespace boost {
0038 namespace beast {
0039 namespace websocket {
0040
0041
0042
0043
0044
0045 template<class NextLayer, bool deflateSupported>
0046 template<class Handler, class MutableBufferSequence>
0047 class stream<NextLayer, deflateSupported>::read_some_op
0048 : public beast::async_base<
0049 Handler, beast::executor_type<stream>>
0050 , public asio::coroutine
0051 {
0052 boost::weak_ptr<impl_type> wp_;
0053 MutableBufferSequence bs_;
0054 buffers_suffix<MutableBufferSequence> cb_;
0055 std::size_t bytes_written_ = 0;
0056 error_code result_;
0057 close_code code_;
0058 bool did_read_ = false;
0059
0060 public:
0061 static constexpr int id = 1;
0062
0063 template<class Handler_>
0064 read_some_op(
0065 Handler_&& h,
0066 boost::shared_ptr<impl_type> const& sp,
0067 MutableBufferSequence const& bs)
0068 : async_base<
0069 Handler, beast::executor_type<stream>>(
0070 std::forward<Handler_>(h),
0071 sp->stream().get_executor())
0072 , wp_(sp)
0073 , bs_(bs)
0074 , cb_(bs)
0075 , code_(close_code::none)
0076 {
0077 (*this)({}, 0, false);
0078 }
0079
0080 void operator()(
0081 error_code ec = {},
0082 std::size_t bytes_transferred = 0,
0083 bool cont = true)
0084 {
0085 using beast::detail::clamp;
0086 auto sp = wp_.lock();
0087 if(! sp)
0088 {
0089 BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
0090 bytes_written_ = 0;
0091 return this->complete(cont, ec, bytes_written_);
0092 }
0093 auto& impl = *sp;
0094 BOOST_ASIO_CORO_REENTER(*this)
0095 {
0096 impl.update_timer(this->get_executor());
0097
0098 acquire_read_lock:
0099
0100 if(! impl.rd_block.try_lock(this))
0101 {
0102 do_suspend:
0103 BOOST_ASIO_CORO_YIELD
0104 {
0105 BOOST_ASIO_HANDLER_LOCATION((
0106 __FILE__, __LINE__,
0107 "websocket::async_read_some"));
0108
0109 this->set_allowed_cancellation(net::cancellation_type::all);
0110 impl.op_r_rd.emplace(std::move(*this), net::cancellation_type::all);
0111 }
0112 if (ec)
0113 return this->complete(cont, ec, bytes_written_);
0114
0115 this->set_allowed_cancellation(net::cancellation_type::terminal);
0116
0117 impl.rd_block.lock(this);
0118 BOOST_ASIO_CORO_YIELD
0119 {
0120 BOOST_ASIO_HANDLER_LOCATION((
0121 __FILE__, __LINE__,
0122 "websocket::async_read_some"));
0123
0124 const auto ex = this->get_immediate_executor();
0125 net::dispatch(ex, std::move(*this));
0126 }
0127 BOOST_ASSERT(impl.rd_block.is_locked(this));
0128
0129 BOOST_ASSERT(!ec);
0130 if(impl.check_stop_now(ec))
0131 {
0132
0133
0134
0135
0136
0137 goto upcall;
0138 }
0139
0140
0141
0142
0143 BOOST_ASSERT(impl.wr_close);
0144 BOOST_ASSERT(impl.status_ != status::open);
0145 BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
0146 goto upcall;
0147 }
0148 else
0149 {
0150
0151 if( impl.status_ == status::closed ||
0152 impl.status_ == status::failed)
0153 {
0154 BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
0155 goto upcall;
0156 }
0157 }
0158
0159
0160
0161
0162
0163 loop:
0164 BOOST_ASSERT(impl.rd_block.is_locked(this));
0165
0166
0167
0168
0169 if(impl.rd_remain == 0 &&
0170 (! impl.rd_fh.fin || impl.rd_done))
0171 {
0172
0173 while(! impl.parse_fh(
0174 impl.rd_fh, impl.rd_buf, result_))
0175 {
0176 if(result_)
0177 {
0178
0179 if(result_ == error::message_too_big)
0180 code_ = close_code::too_big;
0181 else
0182 code_ = close_code::protocol_error;
0183 goto close;
0184 }
0185 BOOST_ASSERT(impl.rd_block.is_locked(this));
0186 BOOST_ASIO_CORO_YIELD
0187 {
0188 BOOST_ASIO_HANDLER_LOCATION((
0189 __FILE__, __LINE__,
0190 "websocket::async_read_some"));
0191
0192 impl.stream().async_read_some(
0193 impl.rd_buf.prepare(read_size(
0194 impl.rd_buf, impl.rd_buf.max_size())),
0195 std::move(*this));
0196 }
0197 BOOST_ASSERT(impl.rd_block.is_locked(this));
0198 impl.rd_buf.commit(bytes_transferred);
0199 if(impl.check_stop_now(ec))
0200 goto upcall;
0201 impl.reset_idle();
0202
0203
0204
0205 impl.rd_block.unlock(this);
0206 if( impl.op_r_close.maybe_invoke())
0207 {
0208
0209 BOOST_ASSERT(impl.rd_block.is_locked());
0210 goto do_suspend;
0211 }
0212
0213 impl.rd_block.lock(this);
0214 }
0215
0216
0217 if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
0218 detail::mask_inplace(buffers_prefix(
0219 clamp(impl.rd_fh.len),
0220 impl.rd_buf.data()),
0221 impl.rd_key);
0222 if(detail::is_control(impl.rd_fh.op))
0223 {
0224
0225
0226 impl.rd_fh.fin = false;
0227
0228
0229 if(impl.rd_fh.op == detail::opcode::ping)
0230 {
0231 if(impl.ctrl_cb)
0232 {
0233 if(! cont)
0234 {
0235 BOOST_ASIO_CORO_YIELD
0236 {
0237 BOOST_ASIO_HANDLER_LOCATION((
0238 __FILE__, __LINE__,
0239 "websocket::async_read_some"));
0240
0241 const auto ex = this->get_immediate_executor();
0242 net::dispatch(ex, std::move(*this));
0243 }
0244 BOOST_ASSERT(cont);
0245
0246 }
0247 }
0248 {
0249 auto const b = buffers_prefix(
0250 clamp(impl.rd_fh.len),
0251 impl.rd_buf.data());
0252 auto const len = buffer_bytes(b);
0253 BOOST_ASSERT(len == impl.rd_fh.len);
0254 ping_data payload;
0255 detail::read_ping(payload, b);
0256 impl.rd_buf.consume(len);
0257
0258 if(impl.status_ == status::closing)
0259 goto loop;
0260 if(impl.ctrl_cb)
0261 impl.ctrl_cb(
0262 frame_type::ping, to_string_view(payload));
0263 impl.rd_fb.clear();
0264 impl.template write_ping<
0265 flat_static_buffer_base>(impl.rd_fb,
0266 detail::opcode::pong, payload);
0267 }
0268
0269
0270
0271 impl.rd_block.unlock(this);
0272 impl.op_r_close.maybe_invoke();
0273
0274
0275 if(! impl.wr_block.try_lock(this))
0276 {
0277 BOOST_ASIO_CORO_YIELD
0278 {
0279 BOOST_ASIO_HANDLER_LOCATION((
0280 __FILE__, __LINE__,
0281 "websocket::async_read_some"));
0282
0283 impl.op_rd.emplace(std::move(*this));
0284 }
0285 if (ec)
0286 return this->complete(cont, ec, bytes_written_);
0287
0288 impl.wr_block.lock(this);
0289 BOOST_ASIO_CORO_YIELD
0290 {
0291 BOOST_ASIO_HANDLER_LOCATION((
0292 __FILE__, __LINE__,
0293 "websocket::async_read_some"));
0294
0295 const auto ex = this->get_immediate_executor();
0296 net::dispatch(ex, std::move(*this));
0297 }
0298 BOOST_ASSERT(impl.wr_block.is_locked(this));
0299 if(impl.check_stop_now(ec))
0300 goto upcall;
0301 }
0302
0303
0304 BOOST_ASSERT(impl.wr_block.is_locked(this));
0305 BOOST_ASIO_CORO_YIELD
0306 {
0307 BOOST_ASIO_HANDLER_LOCATION((
0308 __FILE__, __LINE__,
0309 "websocket::async_read_some"));
0310
0311 net::async_write(
0312 impl.stream(), net::const_buffer(impl.rd_fb.data()),
0313 beast::detail::bind_continuation(std::move(*this)));
0314 }
0315 BOOST_ASSERT(impl.wr_block.is_locked(this));
0316 if(impl.check_stop_now(ec))
0317 goto upcall;
0318 impl.wr_block.unlock(this);
0319 impl.op_close.maybe_invoke()
0320 || impl.op_idle_ping.maybe_invoke()
0321 || impl.op_ping.maybe_invoke()
0322 || impl.op_wr.maybe_invoke();
0323 goto acquire_read_lock;
0324 }
0325
0326
0327 if(impl.rd_fh.op == detail::opcode::pong)
0328 {
0329
0330 if(! impl.wr_close && impl.ctrl_cb)
0331 {
0332 if(! cont)
0333 {
0334 BOOST_ASIO_CORO_YIELD
0335 {
0336 BOOST_ASIO_HANDLER_LOCATION((
0337 __FILE__, __LINE__,
0338 "websocket::async_read_some"));
0339
0340 const auto ex = this->get_immediate_executor();
0341 net::dispatch(ex, std::move(*this));
0342 }
0343 BOOST_ASSERT(cont);
0344 }
0345 }
0346 auto const cb = buffers_prefix(clamp(
0347 impl.rd_fh.len), impl.rd_buf.data());
0348 auto const len = buffer_bytes(cb);
0349 BOOST_ASSERT(len == impl.rd_fh.len);
0350 ping_data payload;
0351 detail::read_ping(payload, cb);
0352 impl.rd_buf.consume(len);
0353
0354 if(! impl.wr_close && impl.ctrl_cb)
0355 impl.ctrl_cb(frame_type::pong, to_string_view(payload));
0356 goto loop;
0357 }
0358
0359
0360 BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close);
0361 {
0362 if(impl.ctrl_cb)
0363 {
0364 if(! cont)
0365 {
0366 BOOST_ASIO_CORO_YIELD
0367 {
0368 BOOST_ASIO_HANDLER_LOCATION((
0369 __FILE__, __LINE__,
0370 "websocket::async_read_some"));
0371
0372 const auto ex = this->get_immediate_executor();
0373 net::dispatch(ex, std::move(*this));
0374 }
0375 BOOST_ASSERT(cont);
0376 }
0377 }
0378 auto const cb = buffers_prefix(clamp(
0379 impl.rd_fh.len), impl.rd_buf.data());
0380 auto const len = buffer_bytes(cb);
0381 BOOST_ASSERT(len == impl.rd_fh.len);
0382 BOOST_ASSERT(! impl.rd_close);
0383 impl.rd_close = true;
0384 close_reason cr;
0385 detail::read_close(cr, cb, result_);
0386 if(result_)
0387 {
0388
0389 code_ = close_code::protocol_error;
0390 goto close;
0391 }
0392 impl.cr = cr;
0393 impl.rd_buf.consume(len);
0394 if(impl.ctrl_cb)
0395 impl.ctrl_cb(frame_type::close,
0396 to_string_view(impl.cr.reason));
0397
0398 if(impl.status_ == status::closing)
0399 {
0400
0401 BOOST_ASSERT(impl.wr_close);
0402 code_ = close_code::none;
0403 result_ = error::closed;
0404 goto close;
0405 }
0406
0407 code_ = cr.code == close_code::none ?
0408 close_code::normal :
0409 static_cast<close_code>(cr.code);
0410 result_ = error::closed;
0411 goto close;
0412 }
0413 }
0414 if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin)
0415 {
0416
0417 goto loop;
0418 }
0419 impl.rd_done = false;
0420 }
0421 if(! impl.rd_deflated())
0422 {
0423 if(impl.rd_remain > 0)
0424 {
0425 if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() >
0426 (std::min)(clamp(impl.rd_remain),
0427 buffer_bytes(cb_)))
0428 {
0429
0430
0431 BOOST_ASIO_CORO_YIELD
0432 {
0433 BOOST_ASIO_HANDLER_LOCATION((
0434 __FILE__, __LINE__,
0435 "websocket::async_read_some"));
0436
0437 impl.stream().async_read_some(
0438 impl.rd_buf.prepare(read_size(
0439 impl.rd_buf, impl.rd_buf.max_size())),
0440 std::move(*this));
0441 }
0442 impl.rd_buf.commit(bytes_transferred);
0443 if(impl.check_stop_now(ec))
0444 goto upcall;
0445 impl.reset_idle();
0446 if(impl.rd_fh.mask)
0447 detail::mask_inplace(buffers_prefix(clamp(
0448 impl.rd_remain), impl.rd_buf.data()),
0449 impl.rd_key);
0450 }
0451 if(impl.rd_buf.size() > 0)
0452 {
0453
0454
0455 bytes_transferred = net::buffer_copy(cb_,
0456 impl.rd_buf.data(), clamp(impl.rd_remain));
0457 auto const mb = buffers_prefix(
0458 bytes_transferred, cb_);
0459 impl.rd_remain -= bytes_transferred;
0460 if(impl.rd_op == detail::opcode::text)
0461 {
0462 if(! impl.rd_utf8.write(mb) ||
0463 (impl.rd_remain == 0 && impl.rd_fh.fin &&
0464 ! impl.rd_utf8.finish()))
0465 {
0466
0467 code_ = close_code::bad_payload;
0468 result_ = error::bad_frame_payload;
0469 goto close;
0470 }
0471 }
0472 bytes_written_ += bytes_transferred;
0473 impl.rd_size += bytes_transferred;
0474 impl.rd_buf.consume(bytes_transferred);
0475 }
0476 else
0477 {
0478
0479 BOOST_ASSERT(impl.rd_remain > 0);
0480 BOOST_ASSERT(buffer_bytes(cb_) > 0);
0481 BOOST_ASSERT(buffer_bytes(buffers_prefix(
0482 clamp(impl.rd_remain), cb_)) > 0);
0483 BOOST_ASIO_CORO_YIELD
0484 {
0485 BOOST_ASIO_HANDLER_LOCATION((
0486 __FILE__, __LINE__,
0487 "websocket::async_read_some"));
0488
0489 impl.stream().async_read_some(buffers_prefix(
0490 clamp(impl.rd_remain), cb_), std::move(*this));
0491 }
0492 if(impl.check_stop_now(ec))
0493 goto upcall;
0494 impl.reset_idle();
0495 BOOST_ASSERT(bytes_transferred > 0);
0496 auto const mb = buffers_prefix(
0497 bytes_transferred, cb_);
0498 impl.rd_remain -= bytes_transferred;
0499 if(impl.rd_fh.mask)
0500 detail::mask_inplace(mb, impl.rd_key);
0501 if(impl.rd_op == detail::opcode::text)
0502 {
0503 if(! impl.rd_utf8.write(mb) ||
0504 (impl.rd_remain == 0 && impl.rd_fh.fin &&
0505 ! impl.rd_utf8.finish()))
0506 {
0507
0508 code_ = close_code::bad_payload;
0509 result_ = error::bad_frame_payload;
0510 goto close;
0511 }
0512 }
0513 bytes_written_ += bytes_transferred;
0514 impl.rd_size += bytes_transferred;
0515 }
0516 }
0517 BOOST_ASSERT( ! impl.rd_done );
0518 if( impl.rd_remain == 0 && impl.rd_fh.fin )
0519 impl.rd_done = true;
0520 }
0521 else
0522 {
0523
0524
0525
0526 while(buffer_bytes(cb_) > 0)
0527 {
0528 if( impl.rd_remain > 0 &&
0529 impl.rd_buf.size() == 0 &&
0530 ! did_read_)
0531 {
0532
0533 BOOST_ASIO_CORO_YIELD
0534 {
0535 BOOST_ASIO_HANDLER_LOCATION((
0536 __FILE__, __LINE__,
0537 "websocket::async_read_some"));
0538
0539 impl.stream().async_read_some(
0540 impl.rd_buf.prepare(read_size(
0541 impl.rd_buf, impl.rd_buf.max_size())),
0542 std::move(*this));
0543 }
0544 if(impl.check_stop_now(ec))
0545 goto upcall;
0546 impl.reset_idle();
0547 BOOST_ASSERT(bytes_transferred > 0);
0548 impl.rd_buf.commit(bytes_transferred);
0549 if(impl.rd_fh.mask)
0550 detail::mask_inplace(
0551 buffers_prefix(clamp(impl.rd_remain),
0552 impl.rd_buf.data()), impl.rd_key);
0553 did_read_ = true;
0554 }
0555 zlib::z_params zs;
0556 {
0557 auto const out = buffers_front(cb_);
0558 zs.next_out = out.data();
0559 zs.avail_out = out.size();
0560 BOOST_ASSERT(zs.avail_out > 0);
0561 }
0562
0563 bool fin = false;
0564 if(impl.rd_remain > 0)
0565 {
0566 if(impl.rd_buf.size() > 0)
0567 {
0568
0569 auto const in = buffers_prefix(
0570 clamp(impl.rd_remain), buffers_front(
0571 impl.rd_buf.data()));
0572 zs.avail_in = in.size();
0573 zs.next_in = in.data();
0574 }
0575 else
0576 {
0577 break;
0578 }
0579 }
0580 else if(impl.rd_fh.fin)
0581 {
0582
0583 static std::uint8_t constexpr
0584 empty_block[4] = { 0x00, 0x00, 0xff, 0xff };
0585 zs.next_in = empty_block;
0586 zs.avail_in = sizeof(empty_block);
0587 fin = true;
0588 }
0589 else
0590 {
0591 break;
0592 }
0593 impl.inflate(zs, zlib::Flush::sync, ec);
0594 if(impl.check_stop_now(ec))
0595 goto upcall;
0596 if(fin && zs.total_out == 0) {
0597 impl.do_context_takeover_read(impl.role);
0598 impl.rd_done = true;
0599 break;
0600 }
0601 if(impl.rd_msg_max && beast::detail::sum_exceeds(
0602 impl.rd_size, zs.total_out, impl.rd_msg_max))
0603 {
0604
0605 code_ = close_code::too_big;
0606 result_ = error::message_too_big;
0607 goto close;
0608 }
0609 cb_.consume(zs.total_out);
0610 impl.rd_size += zs.total_out;
0611 if (! fin) {
0612 impl.rd_remain -= zs.total_in;
0613 impl.rd_buf.consume(zs.total_in);
0614 }
0615 bytes_written_ += zs.total_out;
0616 }
0617 if(impl.rd_op == detail::opcode::text)
0618 {
0619
0620 if(! impl.rd_utf8.write(
0621 buffers_prefix(bytes_written_, bs_)) || (
0622 impl.rd_done && ! impl.rd_utf8.finish()))
0623 {
0624
0625 code_ = close_code::bad_payload;
0626 result_ = error::bad_frame_payload;
0627 goto close;
0628 }
0629 }
0630 }
0631 goto upcall;
0632
0633 close:
0634
0635 if(! impl.wr_block.try_lock(this))
0636 {
0637 BOOST_ASIO_CORO_YIELD
0638 {
0639 BOOST_ASIO_HANDLER_LOCATION((
0640 __FILE__, __LINE__,
0641 "websocket::async_read_some"));
0642
0643 impl.op_rd.emplace(std::move(*this));
0644 }
0645 if (ec)
0646 return this->complete(cont, ec, bytes_written_);
0647
0648 impl.wr_block.lock(this);
0649 BOOST_ASIO_CORO_YIELD
0650 {
0651 BOOST_ASIO_HANDLER_LOCATION((
0652 __FILE__, __LINE__,
0653 "websocket::async_read_some"));
0654
0655 const auto ex = this->get_immediate_executor();
0656 net::dispatch(ex, std::move(*this));
0657 }
0658 BOOST_ASSERT(impl.wr_block.is_locked(this));
0659 if(impl.check_stop_now(ec))
0660 goto upcall;
0661 }
0662
0663 impl.change_status(status::closing);
0664
0665 if(! impl.wr_close)
0666 {
0667 impl.wr_close = true;
0668
0669
0670 impl.rd_fb.clear();
0671 impl.template write_close<
0672 flat_static_buffer_base>(
0673 impl.rd_fb, code_);
0674
0675
0676 BOOST_ASSERT(impl.wr_block.is_locked(this));
0677 BOOST_ASIO_CORO_YIELD
0678 {
0679 BOOST_ASIO_HANDLER_LOCATION((
0680 __FILE__, __LINE__,
0681 "websocket::async_read_some"));
0682
0683 net::async_write(impl.stream(), net::const_buffer(impl.rd_fb.data()),
0684 beast::detail::bind_continuation(std::move(*this)));
0685 }
0686 BOOST_ASSERT(impl.wr_block.is_locked(this));
0687 if(impl.check_stop_now(ec))
0688 goto upcall;
0689 }
0690
0691
0692 using beast::websocket::async_teardown;
0693 BOOST_ASSERT(impl.wr_block.is_locked(this));
0694 BOOST_ASIO_CORO_YIELD
0695 {
0696 BOOST_ASIO_HANDLER_LOCATION((
0697 __FILE__, __LINE__,
0698 "websocket::async_read_some"));
0699
0700 async_teardown(impl.role, impl.stream(),
0701 beast::detail::bind_continuation(std::move(*this)));
0702 }
0703 BOOST_ASSERT(impl.wr_block.is_locked(this));
0704 if(ec == net::error::eof)
0705 {
0706
0707
0708 ec = {};
0709 }
0710 if(! ec)
0711 {
0712 BOOST_BEAST_ASSIGN_EC(ec, result_);
0713 }
0714 if(ec && ec != error::closed)
0715 impl.change_status(status::failed);
0716 else
0717 impl.change_status(status::closed);
0718 impl.close();
0719
0720 upcall:
0721 impl.rd_block.try_unlock(this);
0722 impl.op_r_close.maybe_invoke();
0723 if(impl.wr_block.try_unlock(this))
0724 impl.op_close.maybe_invoke()
0725 || impl.op_idle_ping.maybe_invoke()
0726 || impl.op_ping.maybe_invoke()
0727 || impl.op_wr.maybe_invoke();
0728 this->complete(cont, ec, bytes_written_);
0729 }
0730 }
0731 };
0732
0733
0734
0735 template<class NextLayer, bool deflateSupported>
0736 template<class Handler, class DynamicBuffer>
0737 class stream<NextLayer, deflateSupported>::read_op
0738 : public beast::async_base<
0739 Handler, beast::executor_type<stream>>
0740 , public asio::coroutine
0741 {
0742 boost::weak_ptr<impl_type> wp_;
0743 DynamicBuffer& b_;
0744 std::size_t limit_;
0745 std::size_t bytes_written_ = 0;
0746 bool some_;
0747
0748 public:
0749 template<class Handler_>
0750 read_op(
0751 Handler_&& h,
0752 boost::shared_ptr<impl_type> const& sp,
0753 DynamicBuffer& b,
0754 std::size_t limit,
0755 bool some)
0756 : async_base<Handler,
0757 beast::executor_type<stream>>(
0758 std::forward<Handler_>(h),
0759 sp->stream().get_executor())
0760 , wp_(sp)
0761 , b_(b)
0762 , limit_(limit ? limit : (
0763 std::numeric_limits<std::size_t>::max)())
0764 , some_(some)
0765 {
0766 (*this)({}, 0, false);
0767 }
0768
0769 void operator()(
0770 error_code ec = {},
0771 std::size_t bytes_transferred = 0,
0772 bool cont = true)
0773 {
0774 using beast::detail::clamp;
0775 auto sp = wp_.lock();
0776 if(! sp)
0777 {
0778 BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
0779 bytes_written_ = 0;
0780 return this->complete(cont, ec, bytes_written_);
0781 }
0782 auto& impl = *sp;
0783 using mutable_buffers_type = typename
0784 DynamicBuffer::mutable_buffers_type;
0785 BOOST_ASIO_CORO_REENTER(*this)
0786 {
0787 do
0788 {
0789
0790 BOOST_ASIO_CORO_YIELD
0791 {
0792 auto mb = beast::detail::dynamic_buffer_prepare(b_,
0793 clamp(impl.read_size_hint_db(b_), limit_),
0794 ec, error::buffer_overflow);
0795 if(impl.check_stop_now(ec))
0796 goto upcall;
0797
0798 BOOST_ASIO_HANDLER_LOCATION((
0799 __FILE__, __LINE__,
0800 "websocket::async_read"));
0801
0802 read_some_op<read_op, mutable_buffers_type>(
0803 std::move(*this), sp, *mb);
0804 }
0805
0806 b_.commit(bytes_transferred);
0807 bytes_written_ += bytes_transferred;
0808 if(ec)
0809 goto upcall;
0810 }
0811 while(! some_ && ! impl.rd_done);
0812
0813 upcall:
0814 this->complete(cont, ec, bytes_written_);
0815 }
0816 }
0817 };
0818
0819 template<class NextLayer, bool deflateSupported>
0820 struct stream<NextLayer, deflateSupported>::
0821 run_read_some_op
0822 {
0823 template<
0824 class ReadHandler,
0825 class MutableBufferSequence>
0826 void
0827 operator()(
0828 ReadHandler&& h,
0829 boost::shared_ptr<impl_type> const& sp,
0830 MutableBufferSequence const& b)
0831 {
0832
0833
0834
0835
0836 static_assert(
0837 beast::detail::is_invocable<ReadHandler,
0838 void(error_code, std::size_t)>::value,
0839 "ReadHandler type requirements not met");
0840
0841 read_some_op<
0842 typename std::decay<ReadHandler>::type,
0843 MutableBufferSequence>(
0844 std::forward<ReadHandler>(h),
0845 sp,
0846 b);
0847 }
0848 };
0849
0850 template<class NextLayer, bool deflateSupported>
0851 struct stream<NextLayer, deflateSupported>::
0852 run_read_op
0853 {
0854 template<
0855 class ReadHandler,
0856 class DynamicBuffer>
0857 void
0858 operator()(
0859 ReadHandler&& h,
0860 boost::shared_ptr<impl_type> const& sp,
0861 DynamicBuffer* b,
0862 std::size_t limit,
0863 bool some)
0864 {
0865
0866
0867
0868
0869 static_assert(
0870 beast::detail::is_invocable<ReadHandler,
0871 void(error_code, std::size_t)>::value,
0872 "ReadHandler type requirements not met");
0873
0874 read_op<
0875 typename std::decay<ReadHandler>::type,
0876 DynamicBuffer>(
0877 std::forward<ReadHandler>(h),
0878 sp,
0879 *b,
0880 limit,
0881 some);
0882 }
0883 };
0884
0885
0886
0887 template<class NextLayer, bool deflateSupported>
0888 template<class DynamicBuffer>
0889 std::size_t
0890 stream<NextLayer, deflateSupported>::
0891 read(DynamicBuffer& buffer)
0892 {
0893 static_assert(is_sync_stream<next_layer_type>::value,
0894 "SyncStream type requirements not met");
0895 static_assert(
0896 net::is_dynamic_buffer<DynamicBuffer>::value,
0897 "DynamicBuffer type requirements not met");
0898 error_code ec;
0899 auto const bytes_written = read(buffer, ec);
0900 if(ec)
0901 BOOST_THROW_EXCEPTION(system_error{ec});
0902 return bytes_written;
0903 }
0904
0905 template<class NextLayer, bool deflateSupported>
0906 template<class DynamicBuffer>
0907 std::size_t
0908 stream<NextLayer, deflateSupported>::
0909 read(DynamicBuffer& buffer, error_code& ec)
0910 {
0911 static_assert(is_sync_stream<next_layer_type>::value,
0912 "SyncStream type requirements not met");
0913 static_assert(
0914 net::is_dynamic_buffer<DynamicBuffer>::value,
0915 "DynamicBuffer type requirements not met");
0916 std::size_t bytes_written = 0;
0917 do
0918 {
0919 bytes_written += read_some(buffer, 0, ec);
0920 if(ec)
0921 return bytes_written;
0922 }
0923 while(! is_message_done());
0924 return bytes_written;
0925 }
0926
0927 template<class NextLayer, bool deflateSupported>
0928 template<class DynamicBuffer, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>
0929 BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
0930 stream<NextLayer, deflateSupported>::
0931 async_read(DynamicBuffer& buffer, ReadHandler&& handler)
0932 {
0933 static_assert(is_async_stream<next_layer_type>::value,
0934 "AsyncStream type requirements not met");
0935 static_assert(
0936 net::is_dynamic_buffer<DynamicBuffer>::value,
0937 "DynamicBuffer type requirements not met");
0938 return net::async_initiate<
0939 ReadHandler,
0940 void(error_code, std::size_t)>(
0941 run_read_op{},
0942 handler,
0943 impl_,
0944 &buffer,
0945 0,
0946 false);
0947 }
0948
0949
0950
0951 template<class NextLayer, bool deflateSupported>
0952 template<class DynamicBuffer>
0953 std::size_t
0954 stream<NextLayer, deflateSupported>::
0955 read_some(
0956 DynamicBuffer& buffer,
0957 std::size_t limit)
0958 {
0959 static_assert(is_sync_stream<next_layer_type>::value,
0960 "SyncStream type requirements not met");
0961 static_assert(
0962 net::is_dynamic_buffer<DynamicBuffer>::value,
0963 "DynamicBuffer type requirements not met");
0964 error_code ec;
0965 auto const bytes_written =
0966 read_some(buffer, limit, ec);
0967 if(ec)
0968 BOOST_THROW_EXCEPTION(system_error{ec});
0969 return bytes_written;
0970 }
0971
0972 template<class NextLayer, bool deflateSupported>
0973 template<class DynamicBuffer>
0974 std::size_t
0975 stream<NextLayer, deflateSupported>::
0976 read_some(
0977 DynamicBuffer& buffer,
0978 std::size_t limit,
0979 error_code& ec)
0980 {
0981 static_assert(is_sync_stream<next_layer_type>::value,
0982 "SyncStream type requirements not met");
0983 static_assert(
0984 net::is_dynamic_buffer<DynamicBuffer>::value,
0985 "DynamicBuffer type requirements not met");
0986 using beast::detail::clamp;
0987 if(! limit)
0988 limit = (std::numeric_limits<std::size_t>::max)();
0989 auto const size =
0990 clamp(read_size_hint(buffer), limit);
0991 BOOST_ASSERT(size > 0);
0992 auto mb = beast::detail::dynamic_buffer_prepare(
0993 buffer, size, ec, error::buffer_overflow);
0994 if(impl_->check_stop_now(ec))
0995 return 0;
0996 auto const bytes_written = read_some(*mb, ec);
0997 buffer.commit(bytes_written);
0998 return bytes_written;
0999 }
1000
1001 template<class NextLayer, bool deflateSupported>
1002 template<class DynamicBuffer, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>
1003 BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
1004 stream<NextLayer, deflateSupported>::
1005 async_read_some(
1006 DynamicBuffer& buffer,
1007 std::size_t limit,
1008 ReadHandler&& handler)
1009 {
1010 static_assert(is_async_stream<next_layer_type>::value,
1011 "AsyncStream type requirements not met");
1012 static_assert(
1013 net::is_dynamic_buffer<DynamicBuffer>::value,
1014 "DynamicBuffer type requirements not met");
1015 return net::async_initiate<
1016 ReadHandler,
1017 void(error_code, std::size_t)>(
1018 run_read_op{},
1019 handler,
1020 impl_,
1021 &buffer,
1022 limit,
1023 true);
1024 }
1025
1026
1027
1028 template<class NextLayer, bool deflateSupported>
1029 template<class MutableBufferSequence>
1030 std::size_t
1031 stream<NextLayer, deflateSupported>::
1032 read_some(
1033 MutableBufferSequence const& buffers)
1034 {
1035 static_assert(is_sync_stream<next_layer_type>::value,
1036 "SyncStream type requirements not met");
1037 static_assert(net::is_mutable_buffer_sequence<
1038 MutableBufferSequence>::value,
1039 "MutableBufferSequence type requirements not met");
1040 error_code ec;
1041 auto const bytes_written = read_some(buffers, ec);
1042 if(ec)
1043 BOOST_THROW_EXCEPTION(system_error{ec});
1044 return bytes_written;
1045 }
1046
1047 template<class NextLayer, bool deflateSupported>
1048 template<class MutableBufferSequence>
1049 std::size_t
1050 stream<NextLayer, deflateSupported>::
1051 read_some(
1052 MutableBufferSequence const& buffers,
1053 error_code& ec)
1054 {
1055 static_assert(is_sync_stream<next_layer_type>::value,
1056 "SyncStream type requirements not met");
1057 static_assert(net::is_mutable_buffer_sequence<
1058 MutableBufferSequence>::value,
1059 "MutableBufferSequence type requirements not met");
1060 using beast::detail::clamp;
1061 auto& impl = *impl_;
1062 close_code code{};
1063 std::size_t bytes_written = 0;
1064 ec = {};
1065
1066 if(impl.check_stop_now(ec))
1067 return bytes_written;
1068 loop:
1069
1070
1071
1072
1073 if(impl.rd_remain == 0 && (
1074 ! impl.rd_fh.fin || impl.rd_done))
1075 {
1076
1077 error_code result;
1078 while(! impl.parse_fh(impl.rd_fh, impl.rd_buf, result))
1079 {
1080 if(result)
1081 {
1082
1083 if(result == error::message_too_big)
1084 code = close_code::too_big;
1085 else
1086 code = close_code::protocol_error;
1087 do_fail(code, result, ec);
1088 return bytes_written;
1089 }
1090 auto const bytes_transferred =
1091 impl.stream().read_some(
1092 impl.rd_buf.prepare(read_size(
1093 impl.rd_buf, impl.rd_buf.max_size())),
1094 ec);
1095 impl.rd_buf.commit(bytes_transferred);
1096 if(impl.check_stop_now(ec))
1097 return bytes_written;
1098 }
1099
1100
1101 if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
1102 detail::mask_inplace(buffers_prefix(
1103 clamp(impl.rd_fh.len), impl.rd_buf.data()),
1104 impl.rd_key);
1105 if(detail::is_control(impl.rd_fh.op))
1106 {
1107
1108 auto const b = buffers_prefix(
1109 clamp(impl.rd_fh.len), impl.rd_buf.data());
1110 auto const len = buffer_bytes(b);
1111 BOOST_ASSERT(len == impl.rd_fh.len);
1112
1113
1114
1115 impl.rd_fh.fin = false;
1116
1117
1118 if(impl.rd_fh.op == detail::opcode::ping)
1119 {
1120 ping_data payload;
1121 detail::read_ping(payload, b);
1122 impl.rd_buf.consume(len);
1123 if(impl.wr_close)
1124 {
1125
1126 goto loop;
1127 }
1128 if(impl.ctrl_cb)
1129 impl.ctrl_cb(frame_type::ping, to_string_view(payload));
1130 detail::frame_buffer fb;
1131 impl.template write_ping<flat_static_buffer_base>(fb,
1132 detail::opcode::pong, payload);
1133 net::write(impl.stream(), fb.data(), ec);
1134 if(impl.check_stop_now(ec))
1135 return bytes_written;
1136 goto loop;
1137 }
1138
1139 if(impl.rd_fh.op == detail::opcode::pong)
1140 {
1141 ping_data payload;
1142 detail::read_ping(payload, b);
1143 impl.rd_buf.consume(len);
1144 if(impl.ctrl_cb)
1145 impl.ctrl_cb(frame_type::pong, to_string_view(payload));
1146 goto loop;
1147 }
1148
1149 BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close);
1150 {
1151 BOOST_ASSERT(! impl.rd_close);
1152 impl.rd_close = true;
1153 close_reason cr;
1154 detail::read_close(cr, b, result);
1155 if(result)
1156 {
1157
1158 do_fail(close_code::protocol_error,
1159 result, ec);
1160 return bytes_written;
1161 }
1162 impl.cr = cr;
1163 impl.rd_buf.consume(len);
1164 if(impl.ctrl_cb)
1165 impl.ctrl_cb(frame_type::close, to_string_view(impl.cr.reason));
1166 BOOST_ASSERT(! impl.wr_close);
1167
1168 do_fail(
1169 cr.code == close_code::none ?
1170 close_code::normal :
1171 static_cast<close_code>(cr.code),
1172 error::closed, ec);
1173 return bytes_written;
1174 }
1175 }
1176 if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin)
1177 {
1178
1179 goto loop;
1180 }
1181 impl.rd_done = false;
1182 }
1183 else
1184 {
1185 ec = {};
1186 }
1187 if(! impl.rd_deflated())
1188 {
1189 if(impl.rd_remain > 0)
1190 {
1191 if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() >
1192 (std::min)(clamp(impl.rd_remain),
1193 buffer_bytes(buffers)))
1194 {
1195
1196
1197 impl.rd_buf.commit(impl.stream().read_some(
1198 impl.rd_buf.prepare(read_size(impl.rd_buf,
1199 impl.rd_buf.max_size())), ec));
1200 if(impl.check_stop_now(ec))
1201 return bytes_written;
1202 if(impl.rd_fh.mask)
1203 detail::mask_inplace(
1204 buffers_prefix(clamp(impl.rd_remain),
1205 impl.rd_buf.data()), impl.rd_key);
1206 }
1207 if(impl.rd_buf.size() > 0)
1208 {
1209
1210
1211 auto const bytes_transferred = net::buffer_copy(
1212 buffers, impl.rd_buf.data(),
1213 clamp(impl.rd_remain));
1214 auto const mb = buffers_prefix(
1215 bytes_transferred, buffers);
1216 impl.rd_remain -= bytes_transferred;
1217 if(impl.rd_op == detail::opcode::text)
1218 {
1219 if(! impl.rd_utf8.write(mb) ||
1220 (impl.rd_remain == 0 && impl.rd_fh.fin &&
1221 ! impl.rd_utf8.finish()))
1222 {
1223
1224 do_fail(close_code::bad_payload,
1225 error::bad_frame_payload, ec);
1226 return bytes_written;
1227 }
1228 }
1229 bytes_written += bytes_transferred;
1230 impl.rd_size += bytes_transferred;
1231 impl.rd_buf.consume(bytes_transferred);
1232 }
1233 else
1234 {
1235
1236 BOOST_ASSERT(impl.rd_remain > 0);
1237 BOOST_ASSERT(buffer_bytes(buffers) > 0);
1238 BOOST_ASSERT(buffer_bytes(buffers_prefix(
1239 clamp(impl.rd_remain), buffers)) > 0);
1240 auto const bytes_transferred =
1241 impl.stream().read_some(buffers_prefix(
1242 clamp(impl.rd_remain), buffers), ec);
1243
1244 if(impl.check_stop_now(ec))
1245 return bytes_written;
1246 BOOST_ASSERT(bytes_transferred > 0);
1247 auto const mb = buffers_prefix(
1248 bytes_transferred, buffers);
1249 impl.rd_remain -= bytes_transferred;
1250 if(impl.rd_fh.mask)
1251 detail::mask_inplace(mb, impl.rd_key);
1252 if(impl.rd_op == detail::opcode::text)
1253 {
1254 if(! impl.rd_utf8.write(mb) ||
1255 (impl.rd_remain == 0 && impl.rd_fh.fin &&
1256 ! impl.rd_utf8.finish()))
1257 {
1258
1259 do_fail(close_code::bad_payload,
1260 error::bad_frame_payload, ec);
1261 return bytes_written;
1262 }
1263 }
1264 bytes_written += bytes_transferred;
1265 impl.rd_size += bytes_transferred;
1266 }
1267 }
1268 BOOST_ASSERT( ! impl.rd_done );
1269 if( impl.rd_remain == 0 && impl.rd_fh.fin )
1270 impl.rd_done = true;
1271 }
1272 else
1273 {
1274
1275
1276
1277
1278 bool did_read = false;
1279 buffers_suffix<MutableBufferSequence> cb(buffers);
1280 while(buffer_bytes(cb) > 0)
1281 {
1282 zlib::z_params zs;
1283 {
1284 auto const out = beast::buffers_front(cb);
1285 zs.next_out = out.data();
1286 zs.avail_out = out.size();
1287 BOOST_ASSERT(zs.avail_out > 0);
1288 }
1289
1290 bool fin = false;
1291 if(impl.rd_remain > 0)
1292 {
1293 if(impl.rd_buf.size() > 0)
1294 {
1295
1296 auto const in = buffers_prefix(
1297 clamp(impl.rd_remain), beast::buffers_front(
1298 impl.rd_buf.data()));
1299 zs.avail_in = in.size();
1300 zs.next_in = in.data();
1301 }
1302 else if(! did_read)
1303 {
1304
1305 auto const bytes_transferred =
1306 impl.stream().read_some(
1307 impl.rd_buf.prepare(read_size(
1308 impl.rd_buf, impl.rd_buf.max_size())),
1309 ec);
1310 if(impl.check_stop_now(ec))
1311 return bytes_written;
1312 BOOST_ASSERT(bytes_transferred > 0);
1313 impl.rd_buf.commit(bytes_transferred);
1314 if(impl.rd_fh.mask)
1315 detail::mask_inplace(
1316 buffers_prefix(clamp(impl.rd_remain),
1317 impl.rd_buf.data()), impl.rd_key);
1318 auto const in = buffers_prefix(
1319 clamp(impl.rd_remain), buffers_front(
1320 impl.rd_buf.data()));
1321 zs.avail_in = in.size();
1322 zs.next_in = in.data();
1323 did_read = true;
1324 }
1325 else
1326 {
1327 break;
1328 }
1329 }
1330 else if(impl.rd_fh.fin)
1331 {
1332
1333 static std::uint8_t constexpr
1334 empty_block[4] = { 0x00, 0x00, 0xff, 0xff };
1335 zs.next_in = empty_block;
1336 zs.avail_in = sizeof(empty_block);
1337 fin = true;
1338 }
1339 else
1340 {
1341 break;
1342 }
1343 impl.inflate(zs, zlib::Flush::sync, ec);
1344 if(impl.check_stop_now(ec))
1345 return bytes_written;
1346 if (fin && zs.total_out == 0) {
1347 impl.do_context_takeover_read(impl.role);
1348 impl.rd_done = true;
1349 break;
1350 }
1351 if(impl.rd_msg_max && beast::detail::sum_exceeds(
1352 impl.rd_size, zs.total_out, impl.rd_msg_max))
1353 {
1354 do_fail(close_code::too_big,
1355 error::message_too_big, ec);
1356 return bytes_written;
1357 }
1358 cb.consume(zs.total_out);
1359 impl.rd_size += zs.total_out;
1360 if (! fin) {
1361 impl.rd_remain -= zs.total_in;
1362 impl.rd_buf.consume(zs.total_in);
1363 }
1364 bytes_written += zs.total_out;
1365 }
1366 if(impl.rd_op == detail::opcode::text)
1367 {
1368
1369 if(! impl.rd_utf8.write(beast::buffers_prefix(
1370 bytes_written, buffers)) || (
1371 impl.rd_done && ! impl.rd_utf8.finish()))
1372 {
1373
1374 do_fail(close_code::bad_payload,
1375 error::bad_frame_payload, ec);
1376 return bytes_written;
1377 }
1378 }
1379 }
1380 return bytes_written;
1381 }
1382
1383 template<class NextLayer, bool deflateSupported>
1384 template<class MutableBufferSequence, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>
1385 BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
1386 stream<NextLayer, deflateSupported>::
1387 async_read_some(
1388 MutableBufferSequence const& buffers,
1389 ReadHandler&& handler)
1390 {
1391 static_assert(is_async_stream<next_layer_type>::value,
1392 "AsyncStream type requirements not met");
1393 static_assert(net::is_mutable_buffer_sequence<
1394 MutableBufferSequence>::value,
1395 "MutableBufferSequence type requirements not met");
1396 return net::async_initiate<
1397 ReadHandler,
1398 void(error_code, std::size_t)>(
1399 run_read_some_op{},
1400 handler,
1401 impl_,
1402 buffers);
1403 }
1404
1405 }
1406 }
1407 }
1408
1409 #endif