Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-07-05 08:28:04

0001 //
0002 // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
0003 //
0004 // Distributed under the Boost Software License, Version 1.0. (See accompanying
0005 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 // Official repository: https://github.com/boostorg/beast
0008 //
0009 
0010 #ifndef BOOST_BEAST_WEBSOCKET_IMPL_READ_HPP
0011 #define BOOST_BEAST_WEBSOCKET_IMPL_READ_HPP
0012 
0013 #include <boost/beast/core/buffer_traits.hpp>
0014 #include <boost/beast/websocket/teardown.hpp>
0015 #include <boost/beast/websocket/detail/mask.hpp>
0016 #include <boost/beast/websocket/impl/stream_impl.hpp>
0017 #include <boost/beast/core/async_base.hpp>
0018 #include <boost/beast/core/buffers_prefix.hpp>
0019 #include <boost/beast/core/buffers_suffix.hpp>
0020 #include <boost/beast/core/flat_static_buffer.hpp>
0021 #include <boost/beast/core/read_size.hpp>
0022 #include <boost/beast/core/stream_traits.hpp>
0023 #include <boost/beast/core/detail/bind_continuation.hpp>
0024 #include <boost/beast/core/detail/buffer.hpp>
0025 #include <boost/beast/core/detail/clamp.hpp>
0026 #include <boost/beast/core/detail/config.hpp>
0027 #include <boost/asio/coroutine.hpp>
0028 #include <boost/assert.hpp>
0029 #include <boost/config.hpp>
0030 #include <boost/optional.hpp>
0031 #include <boost/throw_exception.hpp>
0032 #include <algorithm>
0033 #include <limits>
0034 #include <memory>
0035 
0036 namespace boost {
0037 namespace beast {
0038 namespace websocket {
0039 
0040 /*  Read some message data into a buffer sequence.
0041 
0042     Also reads and handles control frames.
0043 */
0044 template<class NextLayer, bool deflateSupported>
0045 template<class Handler, class MutableBufferSequence>
0046 class stream<NextLayer, deflateSupported>::read_some_op
0047     : public beast::async_base<
0048         Handler, beast::executor_type<stream>>
0049     , public asio::coroutine
0050 {
0051     boost::weak_ptr<impl_type> wp_;
0052     MutableBufferSequence bs_;
0053     buffers_suffix<MutableBufferSequence> cb_;
0054     std::size_t bytes_written_ = 0;
0055     error_code result_;
0056     close_code code_;
0057     bool did_read_ = false;
0058 
0059 public:
0060     static constexpr int id = 1; // for soft_mutex
0061 
0062     template<class Handler_>
0063     read_some_op(
0064         Handler_&& h,
0065         boost::shared_ptr<impl_type> const& sp,
0066         MutableBufferSequence const& bs)
0067         : async_base<
0068             Handler, beast::executor_type<stream>>(
0069                 std::forward<Handler_>(h),
0070                     sp->stream().get_executor())
0071         , wp_(sp)
0072         , bs_(bs)
0073         , cb_(bs)
0074         , code_(close_code::none)
0075     {
0076         (*this)({}, 0, false);
0077     }
0078 
0079     void operator()(
0080         error_code ec = {},
0081         std::size_t bytes_transferred = 0,
0082         bool cont = true)
0083     {
0084         using beast::detail::clamp;
0085         auto sp = wp_.lock();
0086         if(! sp)
0087         {
0088             BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
0089             bytes_written_ = 0;
0090             return this->complete(cont, ec, bytes_written_);
0091         }
0092         auto& impl = *sp;
0093         BOOST_ASIO_CORO_REENTER(*this)
0094         {
0095             impl.update_timer(this->get_executor());
0096 
0097         acquire_read_lock:
0098             // Acquire the read lock
0099             if(! impl.rd_block.try_lock(this))
0100             {
0101             do_suspend:
0102                 BOOST_ASIO_CORO_YIELD
0103                 {
0104                     BOOST_ASIO_HANDLER_LOCATION((
0105                         __FILE__, __LINE__,
0106                         "websocket::async_read_some"));
0107 
0108                     this->set_allowed_cancellation(net::cancellation_type::all);
0109                     impl.op_r_rd.emplace(std::move(*this), net::cancellation_type::all);
0110                 }
0111                 if (ec)
0112                     return this->complete(cont, ec, bytes_written_);
0113 
0114                 this->set_allowed_cancellation(net::cancellation_type::terminal);
0115 
0116                 impl.rd_block.lock(this);
0117                 BOOST_ASIO_CORO_YIELD
0118                 {
0119                     BOOST_ASIO_HANDLER_LOCATION((
0120                         __FILE__, __LINE__,
0121                         "websocket::async_read_some"));
0122 
0123                     const auto ex = this->get_immediate_executor();
0124                     net::dispatch(ex, std::move(*this));
0125                 }
0126                 BOOST_ASSERT(impl.rd_block.is_locked(this));
0127 
0128                 BOOST_ASSERT(!ec);
0129                 if(impl.check_stop_now(ec))
0130                 {
0131                     // Issue 2264 - There is no guarantee that the next
0132                     // error will be operation_aborted.
0133                     // The error could be a result of the peer resetting the 
0134                     // connection
0135                     // BOOST_ASSERT(ec == net::error::operation_aborted);
0136                     goto upcall;
0137                 }
0138                 // VFALCO Should never get here
0139 
0140                 // The only way to get read blocked is if
0141                 // a `close_op` wrote a close frame
0142                 BOOST_ASSERT(impl.wr_close);
0143                 BOOST_ASSERT(impl.status_ != status::open);
0144                 BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
0145                 goto upcall;
0146             }
0147             else
0148             {
0149                 // Make sure the stream is not closed
0150                 if( impl.status_ == status::closed ||
0151                     impl.status_ == status::failed)
0152                 {
0153                     BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
0154                     goto upcall;
0155                 }
0156             }
0157 
0158             // if status_ == status::closing, we want to suspend
0159             // the read operation until the close completes,
0160             // then finish the read with operation_aborted.
0161 
0162         loop:
0163             BOOST_ASSERT(impl.rd_block.is_locked(this));
0164             // See if we need to read a frame header. This
0165             // condition is structured to give the decompressor
0166             // a chance to emit the final empty deflate block
0167             //
0168             if(impl.rd_remain == 0 &&
0169                 (! impl.rd_fh.fin || impl.rd_done))
0170             {
0171                 // Read frame header
0172                 while(! impl.parse_fh(
0173                     impl.rd_fh, impl.rd_buf, result_))
0174                 {
0175                     if(result_)
0176                     {
0177                         // _Fail the WebSocket Connection_
0178                         if(result_ == error::message_too_big)
0179                             code_ = close_code::too_big;
0180                         else
0181                             code_ = close_code::protocol_error;
0182                         goto close;
0183                     }
0184                     BOOST_ASSERT(impl.rd_block.is_locked(this));
0185                     BOOST_ASIO_CORO_YIELD
0186                     {
0187                         BOOST_ASIO_HANDLER_LOCATION((
0188                             __FILE__, __LINE__,
0189                             "websocket::async_read_some"));
0190 
0191                         impl.stream().async_read_some(
0192                             impl.rd_buf.prepare(read_size(
0193                                 impl.rd_buf, impl.rd_buf.max_size())),
0194                                     std::move(*this));
0195                     }
0196                     BOOST_ASSERT(impl.rd_block.is_locked(this));
0197                     impl.rd_buf.commit(bytes_transferred);
0198                     if(impl.check_stop_now(ec))
0199                         goto upcall;
0200                     impl.reset_idle();
0201 
0202                     // Allow a close operation
0203                     // to acquire the read block
0204                     impl.rd_block.unlock(this);
0205                     if( impl.op_r_close.maybe_invoke())
0206                     {
0207                         // Suspend
0208                         BOOST_ASSERT(impl.rd_block.is_locked());
0209                         goto do_suspend;
0210                     }
0211                     // Acquire read block
0212                     impl.rd_block.lock(this);
0213                 }
0214                 // Immediately apply the mask to the portion
0215                 // of the buffer holding payload data.
0216                 if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
0217                     detail::mask_inplace(buffers_prefix(
0218                         clamp(impl.rd_fh.len),
0219                             impl.rd_buf.data()),
0220                                 impl.rd_key);
0221                 if(detail::is_control(impl.rd_fh.op))
0222                 {
0223                     // Clear this otherwise the next
0224                     // frame will be considered final.
0225                     impl.rd_fh.fin = false;
0226 
0227                     // Handle ping frame
0228                     if(impl.rd_fh.op == detail::opcode::ping)
0229                     {
0230                         if(impl.ctrl_cb)
0231                         {
0232                             if(! cont)
0233                             {
0234                                 BOOST_ASIO_CORO_YIELD
0235                                 {
0236                                     BOOST_ASIO_HANDLER_LOCATION((
0237                                         __FILE__, __LINE__,
0238                                         "websocket::async_read_some"));
0239 
0240                                     const auto ex = this->get_immediate_executor();
0241                                     net::dispatch(ex, std::move(*this));
0242                                 }
0243                                 BOOST_ASSERT(cont);
0244                                 // VFALCO call check_stop_now() here?
0245                             }
0246                         }
0247                         {
0248                             auto const b = buffers_prefix(
0249                                 clamp(impl.rd_fh.len),
0250                                     impl.rd_buf.data());
0251                             auto const len = buffer_bytes(b);
0252                             BOOST_ASSERT(len == impl.rd_fh.len);
0253                             ping_data payload;
0254                             detail::read_ping(payload, b);
0255                             impl.rd_buf.consume(len);
0256                             // Ignore ping when closing
0257                             if(impl.status_ == status::closing)
0258                                 goto loop;
0259                             if(impl.ctrl_cb)
0260                                 impl.ctrl_cb(
0261                                     frame_type::ping, to_string_view(payload));
0262                             impl.rd_fb.clear();
0263                             impl.template write_ping<
0264                                 flat_static_buffer_base>(impl.rd_fb,
0265                                     detail::opcode::pong, payload);
0266                         }
0267 
0268                         // Allow a close operation
0269                         // to acquire the read block
0270                         impl.rd_block.unlock(this);
0271                         impl.op_r_close.maybe_invoke();
0272 
0273                         // Acquire the write lock
0274                         if(! impl.wr_block.try_lock(this))
0275                         {
0276                             BOOST_ASIO_CORO_YIELD
0277                             {
0278                                 BOOST_ASIO_HANDLER_LOCATION((
0279                                     __FILE__, __LINE__,
0280                                     "websocket::async_read_some"));
0281 
0282                                 impl.op_rd.emplace(std::move(*this));
0283                             }
0284                             if (ec)
0285                                 return this->complete(cont, ec, bytes_written_);
0286 
0287                             impl.wr_block.lock(this);
0288                             BOOST_ASIO_CORO_YIELD
0289                             {
0290                                 BOOST_ASIO_HANDLER_LOCATION((
0291                                     __FILE__, __LINE__,
0292                                     "websocket::async_read_some"));
0293 
0294                                 const auto ex = this->get_immediate_executor();
0295                                 net::dispatch(ex, std::move(*this));
0296                             }
0297                             BOOST_ASSERT(impl.wr_block.is_locked(this));
0298                             if(impl.check_stop_now(ec))
0299                                 goto upcall;
0300                         }
0301 
0302                         // Send pong
0303                         BOOST_ASSERT(impl.wr_block.is_locked(this));
0304                         BOOST_ASIO_CORO_YIELD
0305                         {
0306                             BOOST_ASIO_HANDLER_LOCATION((
0307                                 __FILE__, __LINE__,
0308                                 "websocket::async_read_some"));
0309 
0310                             net::async_write(
0311                                 impl.stream(), net::const_buffer(impl.rd_fb.data()),
0312                                 beast::detail::bind_continuation(std::move(*this)));
0313                         }
0314                         BOOST_ASSERT(impl.wr_block.is_locked(this));
0315                         if(impl.check_stop_now(ec))
0316                             goto upcall;
0317                         impl.wr_block.unlock(this);
0318                         impl.op_close.maybe_invoke()
0319                             || impl.op_idle_ping.maybe_invoke()
0320                             || impl.op_ping.maybe_invoke()
0321                             || impl.op_wr.maybe_invoke();
0322                         goto acquire_read_lock;
0323                     }
0324 
0325                     // Handle pong frame
0326                     if(impl.rd_fh.op == detail::opcode::pong)
0327                     {
0328                         // Ignore pong when closing
0329                         if(! impl.wr_close && impl.ctrl_cb)
0330                         {
0331                             if(! cont)
0332                             {
0333                                 BOOST_ASIO_CORO_YIELD
0334                                 {
0335                                     BOOST_ASIO_HANDLER_LOCATION((
0336                                         __FILE__, __LINE__,
0337                                         "websocket::async_read_some"));
0338 
0339                                     const auto ex = this->get_immediate_executor();
0340                                     net::dispatch(ex, std::move(*this));
0341                                 }
0342                                 BOOST_ASSERT(cont);
0343                             }
0344                         }
0345                         auto const cb = buffers_prefix(clamp(
0346                             impl.rd_fh.len), impl.rd_buf.data());
0347                         auto const len = buffer_bytes(cb);
0348                         BOOST_ASSERT(len == impl.rd_fh.len);
0349                         ping_data payload;
0350                         detail::read_ping(payload, cb);
0351                         impl.rd_buf.consume(len);
0352                         // Ignore pong when closing
0353                         if(! impl.wr_close && impl.ctrl_cb)
0354                             impl.ctrl_cb(frame_type::pong, to_string_view(payload));
0355                         goto loop;
0356                     }
0357 
0358                     // Handle close frame
0359                     BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close);
0360                     {
0361                         if(impl.ctrl_cb)
0362                         {
0363                             if(! cont)
0364                             {
0365                                 BOOST_ASIO_CORO_YIELD
0366                                 {
0367                                     BOOST_ASIO_HANDLER_LOCATION((
0368                                         __FILE__, __LINE__,
0369                                         "websocket::async_read_some"));
0370 
0371                                     const auto ex = this->get_immediate_executor();
0372                                     net::dispatch(ex, std::move(*this));
0373                                 }
0374                                 BOOST_ASSERT(cont);
0375                             }
0376                         }
0377                         auto const cb = buffers_prefix(clamp(
0378                             impl.rd_fh.len), impl.rd_buf.data());
0379                         auto const len = buffer_bytes(cb);
0380                         BOOST_ASSERT(len == impl.rd_fh.len);
0381                         BOOST_ASSERT(! impl.rd_close);
0382                         impl.rd_close = true;
0383                         close_reason cr;
0384                         detail::read_close(cr, cb, result_);
0385                         if(result_)
0386                         {
0387                             // _Fail the WebSocket Connection_
0388                             code_ = close_code::protocol_error;
0389                             goto close;
0390                         }
0391                         impl.cr = cr;
0392                         impl.rd_buf.consume(len);
0393                         if(impl.ctrl_cb)
0394                             impl.ctrl_cb(frame_type::close,
0395                                 to_string_view(impl.cr.reason));
0396                         // See if we are already closing
0397                         if(impl.status_ == status::closing)
0398                         {
0399                             // _Close the WebSocket Connection_
0400                             BOOST_ASSERT(impl.wr_close);
0401                             code_ = close_code::none;
0402                             result_ = error::closed;
0403                             goto close;
0404                         }
0405                         // _Start the WebSocket Closing Handshake_
0406                         code_ = cr.code == close_code::none ?
0407                             close_code::normal :
0408                             static_cast<close_code>(cr.code);
0409                         result_ = error::closed;
0410                         goto close;
0411                     }
0412                 }
0413                 if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin)
0414                 {
0415                     // Empty non-final frame
0416                     goto loop;
0417                 }
0418                 impl.rd_done = false;
0419             }
0420             if(! impl.rd_deflated())
0421             {
0422                 if(impl.rd_remain > 0)
0423                 {
0424                     if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() >
0425                         (std::min)(clamp(impl.rd_remain),
0426                             buffer_bytes(cb_)))
0427                     {
0428                         // Fill the read buffer first, otherwise we
0429                         // get fewer bytes at the cost of one I/O.
0430                         BOOST_ASIO_CORO_YIELD
0431                         {
0432                             BOOST_ASIO_HANDLER_LOCATION((
0433                                 __FILE__, __LINE__,
0434                                 "websocket::async_read_some"));
0435 
0436                             impl.stream().async_read_some(
0437                                 impl.rd_buf.prepare(read_size(
0438                                     impl.rd_buf, impl.rd_buf.max_size())),
0439                                         std::move(*this));
0440                         }
0441                         impl.rd_buf.commit(bytes_transferred);
0442                         if(impl.check_stop_now(ec))
0443                             goto upcall;
0444                         impl.reset_idle();
0445                         if(impl.rd_fh.mask)
0446                             detail::mask_inplace(buffers_prefix(clamp(
0447                                 impl.rd_remain), impl.rd_buf.data()),
0448                                     impl.rd_key);
0449                     }
0450                     if(impl.rd_buf.size() > 0)
0451                     {
0452                         // Copy from the read buffer.
0453                         // The mask was already applied.
0454                         bytes_transferred = net::buffer_copy(cb_,
0455                             impl.rd_buf.data(), clamp(impl.rd_remain));
0456                         auto const mb = buffers_prefix(
0457                             bytes_transferred, cb_);
0458                         impl.rd_remain -= bytes_transferred;
0459                         if(impl.rd_op == detail::opcode::text)
0460                         {
0461                             if(! impl.rd_utf8.write(mb) ||
0462                                 (impl.rd_remain == 0 && impl.rd_fh.fin &&
0463                                     ! impl.rd_utf8.finish()))
0464                             {
0465                                 // _Fail the WebSocket Connection_
0466                                 code_ = close_code::bad_payload;
0467                                 result_ = error::bad_frame_payload;
0468                                 goto close;
0469                             }
0470                         }
0471                         bytes_written_ += bytes_transferred;
0472                         impl.rd_size += bytes_transferred;
0473                         impl.rd_buf.consume(bytes_transferred);
0474                     }
0475                     else
0476                     {
0477                         // Read into caller's buffer
0478                         BOOST_ASSERT(impl.rd_remain > 0);
0479                         BOOST_ASSERT(buffer_bytes(cb_) > 0);
0480                         BOOST_ASSERT(buffer_bytes(buffers_prefix(
0481                             clamp(impl.rd_remain), cb_)) > 0);
0482                         BOOST_ASIO_CORO_YIELD
0483                         {
0484                             BOOST_ASIO_HANDLER_LOCATION((
0485                                 __FILE__, __LINE__,
0486                                 "websocket::async_read_some"));
0487 
0488                             impl.stream().async_read_some(buffers_prefix(
0489                                 clamp(impl.rd_remain), cb_), std::move(*this));
0490                         }
0491                         if(impl.check_stop_now(ec))
0492                             goto upcall;
0493                         impl.reset_idle();
0494                         BOOST_ASSERT(bytes_transferred > 0);
0495                         auto const mb = buffers_prefix(
0496                             bytes_transferred, cb_);
0497                         impl.rd_remain -= bytes_transferred;
0498                         if(impl.rd_fh.mask)
0499                             detail::mask_inplace(mb, impl.rd_key);
0500                         if(impl.rd_op == detail::opcode::text)
0501                         {
0502                             if(! impl.rd_utf8.write(mb) ||
0503                                 (impl.rd_remain == 0 && impl.rd_fh.fin &&
0504                                     ! impl.rd_utf8.finish()))
0505                             {
0506                                 // _Fail the WebSocket Connection_
0507                                 code_ = close_code::bad_payload;
0508                                 result_ = error::bad_frame_payload;
0509                                 goto close;
0510                             }
0511                         }
0512                         bytes_written_ += bytes_transferred;
0513                         impl.rd_size += bytes_transferred;
0514                     }
0515                 }
0516                 BOOST_ASSERT( ! impl.rd_done );
0517                 if( impl.rd_remain == 0 && impl.rd_fh.fin )
0518                     impl.rd_done = true;
0519             }
0520             else
0521             {
0522                 // Read compressed message frame payload:
0523                 // inflate even if rd_fh_.len == 0, otherwise we
0524                 // never emit the end-of-stream deflate block.
0525                 while(buffer_bytes(cb_) > 0)
0526                 {
0527                     if( impl.rd_remain > 0 &&
0528                         impl.rd_buf.size() == 0 &&
0529                         ! did_read_)
0530                     {
0531                         // read new
0532                         BOOST_ASIO_CORO_YIELD
0533                         {
0534                             BOOST_ASIO_HANDLER_LOCATION((
0535                                 __FILE__, __LINE__,
0536                                 "websocket::async_read_some"));
0537 
0538                             impl.stream().async_read_some(
0539                                 impl.rd_buf.prepare(read_size(
0540                                     impl.rd_buf, impl.rd_buf.max_size())),
0541                                         std::move(*this));
0542                         }
0543                         if(impl.check_stop_now(ec))
0544                             goto upcall;
0545                         impl.reset_idle();
0546                         BOOST_ASSERT(bytes_transferred > 0);
0547                         impl.rd_buf.commit(bytes_transferred);
0548                         if(impl.rd_fh.mask)
0549                             detail::mask_inplace(
0550                                 buffers_prefix(clamp(impl.rd_remain),
0551                                     impl.rd_buf.data()), impl.rd_key);
0552                         did_read_ = true;
0553                     }
0554                     zlib::z_params zs;
0555                     {
0556                         auto const out = buffers_front(cb_);
0557                         zs.next_out = out.data();
0558                         zs.avail_out = out.size();
0559                         BOOST_ASSERT(zs.avail_out > 0);
0560                     }
0561                     // boolean to track the end of the message.
0562                     bool fin = false;
0563                     if(impl.rd_remain > 0)
0564                     {
0565                         if(impl.rd_buf.size() > 0)
0566                         {
0567                             // use what's there
0568                             auto const in = buffers_prefix(
0569                                 clamp(impl.rd_remain), buffers_front(
0570                                     impl.rd_buf.data()));
0571                             zs.avail_in = in.size();
0572                             zs.next_in = in.data();
0573                         }
0574                         else
0575                         {
0576                             break;
0577                         }
0578                     }
0579                     else if(impl.rd_fh.fin)
0580                     {
0581                         // append the empty block codes
0582                         static std::uint8_t constexpr
0583                             empty_block[4] = { 0x00, 0x00, 0xff, 0xff };
0584                         zs.next_in = empty_block;
0585                         zs.avail_in = sizeof(empty_block);
0586                         fin = true;
0587                     }
0588                     else
0589                     {
0590                         break;
0591                     }
0592                     impl.inflate(zs, zlib::Flush::sync, ec);
0593                     if(impl.check_stop_now(ec))
0594                         goto upcall;
0595                     if(fin && zs.total_out == 0) {
0596                         impl.do_context_takeover_read(impl.role);
0597                         impl.rd_done = true;
0598                         break;
0599                     }
0600                     if(impl.rd_msg_max && beast::detail::sum_exceeds(
0601                         impl.rd_size, zs.total_out, impl.rd_msg_max))
0602                     {
0603                         // _Fail the WebSocket Connection_
0604                         code_ = close_code::too_big;
0605                         result_ = error::message_too_big;
0606                         goto close;
0607                     }
0608                     cb_.consume(zs.total_out);
0609                     impl.rd_size += zs.total_out;
0610                     if (! fin) {
0611                         impl.rd_remain -= zs.total_in;
0612                         impl.rd_buf.consume(zs.total_in);
0613                     }
0614                     bytes_written_ += zs.total_out;
0615                 }
0616                 if(impl.rd_op == detail::opcode::text)
0617                 {
0618                     // check utf8
0619                     if(! impl.rd_utf8.write(
0620                         buffers_prefix(bytes_written_, bs_)) || (
0621                             impl.rd_done && ! impl.rd_utf8.finish()))
0622                     {
0623                         // _Fail the WebSocket Connection_
0624                         code_ = close_code::bad_payload;
0625                         result_ = error::bad_frame_payload;
0626                         goto close;
0627                     }
0628                 }
0629             }
0630             goto upcall;
0631 
0632         close:
0633             // Acquire the write lock
0634             if(! impl.wr_block.try_lock(this))
0635             {
0636                 BOOST_ASIO_CORO_YIELD
0637                 {
0638                     BOOST_ASIO_HANDLER_LOCATION((
0639                         __FILE__, __LINE__,
0640                         "websocket::async_read_some"));
0641 
0642                     impl.op_rd.emplace(std::move(*this));
0643                 }
0644                 if (ec)
0645                     return this->complete(cont, ec, bytes_written_);
0646 
0647                 impl.wr_block.lock(this);
0648                 BOOST_ASIO_CORO_YIELD
0649                 {
0650                     BOOST_ASIO_HANDLER_LOCATION((
0651                         __FILE__, __LINE__,
0652                         "websocket::async_read_some"));
0653 
0654                     const auto ex = this->get_immediate_executor();
0655                     net::dispatch(ex, std::move(*this));
0656                 }
0657                 BOOST_ASSERT(impl.wr_block.is_locked(this));
0658                 if(impl.check_stop_now(ec))
0659                     goto upcall;
0660             }
0661 
0662             impl.change_status(status::closing);
0663 
0664             if(! impl.wr_close)
0665             {
0666                 impl.wr_close = true;
0667 
0668                 // Serialize close frame
0669                 impl.rd_fb.clear();
0670                 impl.template write_close<
0671                     flat_static_buffer_base>(
0672                         impl.rd_fb, code_);
0673 
0674                 // Send close frame
0675                 BOOST_ASSERT(impl.wr_block.is_locked(this));
0676                 BOOST_ASIO_CORO_YIELD
0677                 {
0678                     BOOST_ASIO_HANDLER_LOCATION((
0679                         __FILE__, __LINE__,
0680                         "websocket::async_read_some"));
0681 
0682                     net::async_write(impl.stream(), net::const_buffer(impl.rd_fb.data()),
0683                         beast::detail::bind_continuation(std::move(*this)));
0684                 }
0685                 BOOST_ASSERT(impl.wr_block.is_locked(this));
0686                 if(impl.check_stop_now(ec))
0687                     goto upcall;
0688             }
0689 
0690             // Teardown
0691             using beast::websocket::async_teardown;
0692             BOOST_ASSERT(impl.wr_block.is_locked(this));
0693             BOOST_ASIO_CORO_YIELD
0694             {
0695                 BOOST_ASIO_HANDLER_LOCATION((
0696                     __FILE__, __LINE__,
0697                     "websocket::async_read_some"));
0698 
0699                 async_teardown(impl.role, impl.stream(),
0700                     beast::detail::bind_continuation(std::move(*this)));
0701             }
0702             BOOST_ASSERT(impl.wr_block.is_locked(this));
0703             if(ec == net::error::eof)
0704             {
0705                 // Rationale:
0706                 // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
0707                 ec = {};
0708             }
0709             if(! ec)
0710             {
0711                 BOOST_BEAST_ASSIGN_EC(ec, result_);
0712             }
0713             if(ec && ec != error::closed)
0714                 impl.change_status(status::failed);
0715             else
0716                 impl.change_status(status::closed);
0717             impl.close();
0718 
0719         upcall:
0720             impl.rd_block.try_unlock(this);
0721             impl.op_r_close.maybe_invoke();
0722             if(impl.wr_block.try_unlock(this))
0723                 impl.op_close.maybe_invoke()
0724                     || impl.op_idle_ping.maybe_invoke()
0725                     || impl.op_ping.maybe_invoke()
0726                     || impl.op_wr.maybe_invoke();
0727             this->complete(cont, ec, bytes_written_);
0728         }
0729     }
0730 };
0731 
0732 //------------------------------------------------------------------------------
0733 
0734 template<class NextLayer, bool deflateSupported>
0735 template<class Handler,  class DynamicBuffer>
0736 class stream<NextLayer, deflateSupported>::read_op
0737     : public beast::async_base<
0738         Handler, beast::executor_type<stream>>
0739     , public asio::coroutine
0740 {
0741     boost::weak_ptr<impl_type> wp_;
0742     DynamicBuffer& b_;
0743     std::size_t limit_;
0744     std::size_t bytes_written_ = 0;
0745     bool some_;
0746 
0747 public:
0748     template<class Handler_>
0749     read_op(
0750         Handler_&& h,
0751         boost::shared_ptr<impl_type> const& sp,
0752         DynamicBuffer& b,
0753         std::size_t limit,
0754         bool some)
0755         : async_base<Handler,
0756             beast::executor_type<stream>>(
0757                 std::forward<Handler_>(h),
0758                     sp->stream().get_executor())
0759         , wp_(sp)
0760         , b_(b)
0761         , limit_(limit ? limit : (
0762             std::numeric_limits<std::size_t>::max)())
0763         , some_(some)
0764     {
0765         (*this)({}, 0, false);
0766     }
0767 
0768     void operator()(
0769         error_code ec = {},
0770         std::size_t bytes_transferred = 0,
0771         bool cont = true)
0772     {
0773         using beast::detail::clamp;
0774         auto sp = wp_.lock();
0775         if(! sp)
0776         {
0777             BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
0778             bytes_written_ = 0;
0779             return this->complete(cont, ec, bytes_written_);
0780         }
0781         auto& impl = *sp;
0782         using mutable_buffers_type = typename
0783             DynamicBuffer::mutable_buffers_type;
0784         BOOST_ASIO_CORO_REENTER(*this)
0785         {
0786             do
0787             {
0788                 // VFALCO TODO use boost::beast::bind_continuation
0789                 BOOST_ASIO_CORO_YIELD
0790                 {
0791                     auto mb = beast::detail::dynamic_buffer_prepare(b_,
0792                         clamp(impl.read_size_hint_db(b_), limit_),
0793                             ec, error::buffer_overflow);
0794                     if(impl.check_stop_now(ec))
0795                         goto upcall;
0796 
0797                     BOOST_ASIO_HANDLER_LOCATION((
0798                         __FILE__, __LINE__,
0799                         "websocket::async_read"));
0800 
0801                     read_some_op<read_op, mutable_buffers_type>(
0802                         std::move(*this), sp, *mb);
0803                 }
0804 
0805                 b_.commit(bytes_transferred);
0806                 bytes_written_ += bytes_transferred;
0807                 if(ec)
0808                     goto upcall;
0809             }
0810             while(! some_ && ! impl.rd_done);
0811 
0812         upcall:
0813             this->complete(cont, ec, bytes_written_);
0814         }
0815     }
0816 };
0817 
0818 template<class NextLayer, bool deflateSupported>
0819 struct stream<NextLayer, deflateSupported>::
0820     run_read_some_op
0821 {
0822     boost::shared_ptr<impl_type> const& self;
0823 
0824     using executor_type = typename stream::executor_type;
0825 
0826     executor_type
0827     get_executor() const noexcept
0828     {
0829         return self->stream().get_executor();
0830     }
0831 
0832     template<
0833         class ReadHandler,
0834         class MutableBufferSequence>
0835     void
0836     operator()(
0837         ReadHandler&& h,
0838         MutableBufferSequence const& b)
0839     {
0840         // If you get an error on the following line it means
0841         // that your handler does not meet the documented type
0842         // requirements for the handler.
0843 
0844         static_assert(
0845             beast::detail::is_invocable<ReadHandler,
0846                 void(error_code, std::size_t)>::value,
0847             "ReadHandler type requirements not met");
0848 
0849         read_some_op<
0850             typename std::decay<ReadHandler>::type,
0851             MutableBufferSequence>(
0852                 std::forward<ReadHandler>(h),
0853                 self,
0854                 b);
0855     }
0856 };
0857 
0858 template<class NextLayer, bool deflateSupported>
0859 struct stream<NextLayer, deflateSupported>::
0860     run_read_op
0861 {
0862     boost::shared_ptr<impl_type> const& self;
0863 
0864     using executor_type = typename stream::executor_type;
0865 
0866     executor_type
0867     get_executor() const noexcept
0868     {
0869         return self->stream().get_executor();
0870     }
0871 
0872     template<
0873         class ReadHandler,
0874         class DynamicBuffer>
0875     void
0876     operator()(
0877         ReadHandler&& h,
0878         DynamicBuffer* b,
0879         std::size_t limit,
0880         bool some)
0881     {
0882         // If you get an error on the following line it means
0883         // that your handler does not meet the documented type
0884         // requirements for the handler.
0885 
0886         static_assert(
0887             beast::detail::is_invocable<ReadHandler,
0888                 void(error_code, std::size_t)>::value,
0889             "ReadHandler type requirements not met");
0890 
0891         read_op<
0892             typename std::decay<ReadHandler>::type,
0893             DynamicBuffer>(
0894                 std::forward<ReadHandler>(h),
0895                 self,
0896                 *b,
0897                 limit,
0898                 some);
0899     }
0900 };
0901 
0902 //------------------------------------------------------------------------------
0903 
0904 template<class NextLayer, bool deflateSupported>
0905 template<class DynamicBuffer>
0906 std::size_t
0907 stream<NextLayer, deflateSupported>::
0908 read(DynamicBuffer& buffer)
0909 {
0910     static_assert(is_sync_stream<next_layer_type>::value,
0911         "SyncStream type requirements not met");
0912     static_assert(
0913         net::is_dynamic_buffer<DynamicBuffer>::value,
0914         "DynamicBuffer type requirements not met");
0915     error_code ec;
0916     auto const bytes_written = read(buffer, ec);
0917     if(ec)
0918         BOOST_THROW_EXCEPTION(system_error{ec});
0919     return bytes_written;
0920 }
0921 
0922 template<class NextLayer, bool deflateSupported>
0923 template<class DynamicBuffer>
0924 std::size_t
0925 stream<NextLayer, deflateSupported>::
0926 read(DynamicBuffer& buffer, error_code& ec)
0927 {
0928     static_assert(is_sync_stream<next_layer_type>::value,
0929         "SyncStream type requirements not met");
0930     static_assert(
0931         net::is_dynamic_buffer<DynamicBuffer>::value,
0932         "DynamicBuffer type requirements not met");
0933     std::size_t bytes_written = 0;
0934     do
0935     {
0936         bytes_written += read_some(buffer, 0, ec);
0937         if(ec)
0938             return bytes_written;
0939     }
0940     while(! is_message_done());
0941     return bytes_written;
0942 }
0943 
0944 template<class NextLayer, bool deflateSupported>
0945 template<class DynamicBuffer, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>
0946 BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
0947 stream<NextLayer, deflateSupported>::
0948 async_read(DynamicBuffer& buffer, ReadHandler&& handler)
0949 {
0950     static_assert(is_async_stream<next_layer_type>::value,
0951         "AsyncStream type requirements not met");
0952     static_assert(
0953         net::is_dynamic_buffer<DynamicBuffer>::value,
0954         "DynamicBuffer type requirements not met");
0955     return net::async_initiate<
0956         ReadHandler,
0957         void(error_code, std::size_t)>(
0958             run_read_op{impl_},
0959             handler,
0960             &buffer,
0961             0,
0962             false);
0963 }
0964 
0965 //------------------------------------------------------------------------------
0966 
0967 template<class NextLayer, bool deflateSupported>
0968 template<class DynamicBuffer>
0969 std::size_t
0970 stream<NextLayer, deflateSupported>::
0971 read_some(
0972     DynamicBuffer& buffer,
0973     std::size_t limit)
0974 {
0975     static_assert(is_sync_stream<next_layer_type>::value,
0976         "SyncStream type requirements not met");
0977     static_assert(
0978         net::is_dynamic_buffer<DynamicBuffer>::value,
0979         "DynamicBuffer type requirements not met");
0980     error_code ec;
0981     auto const bytes_written =
0982         read_some(buffer, limit, ec);
0983     if(ec)
0984         BOOST_THROW_EXCEPTION(system_error{ec});
0985     return bytes_written;
0986 }
0987 
0988 template<class NextLayer, bool deflateSupported>
0989 template<class DynamicBuffer>
0990 std::size_t
0991 stream<NextLayer, deflateSupported>::
0992 read_some(
0993     DynamicBuffer& buffer,
0994     std::size_t limit,
0995     error_code& ec)
0996 {
0997     static_assert(is_sync_stream<next_layer_type>::value,
0998         "SyncStream type requirements not met");
0999     static_assert(
1000         net::is_dynamic_buffer<DynamicBuffer>::value,
1001         "DynamicBuffer type requirements not met");
1002     using beast::detail::clamp;
1003     if(! limit)
1004         limit = (std::numeric_limits<std::size_t>::max)();
1005     auto const size =
1006         clamp(impl_->read_size_hint_db(buffer), limit);
1007     BOOST_ASSERT(size > 0);
1008     auto mb = beast::detail::dynamic_buffer_prepare(
1009         buffer, size, ec, error::buffer_overflow);
1010     if(impl_->check_stop_now(ec))
1011         return 0;
1012     auto const bytes_written = read_some(*mb, ec);
1013     buffer.commit(bytes_written);
1014     return bytes_written;
1015 }
1016 
1017 template<class NextLayer, bool deflateSupported>
1018 template<class DynamicBuffer, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>
1019 BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
1020 stream<NextLayer, deflateSupported>::
1021 async_read_some(
1022     DynamicBuffer& buffer,
1023     std::size_t limit,
1024     ReadHandler&& handler)
1025 {
1026     static_assert(is_async_stream<next_layer_type>::value,
1027         "AsyncStream type requirements not met");
1028     static_assert(
1029         net::is_dynamic_buffer<DynamicBuffer>::value,
1030         "DynamicBuffer type requirements not met");
1031     return net::async_initiate<
1032         ReadHandler,
1033         void(error_code, std::size_t)>(
1034             run_read_op{impl_},
1035             handler,
1036             &buffer,
1037             limit,
1038             true);
1039 }
1040 
1041 //------------------------------------------------------------------------------
1042 
1043 template<class NextLayer, bool deflateSupported>
1044 template<class MutableBufferSequence>
1045 std::size_t
1046 stream<NextLayer, deflateSupported>::
1047 read_some(
1048     MutableBufferSequence const& buffers)
1049 {
1050     static_assert(is_sync_stream<next_layer_type>::value,
1051         "SyncStream type requirements not met");
1052     static_assert(net::is_mutable_buffer_sequence<
1053             MutableBufferSequence>::value,
1054         "MutableBufferSequence type requirements not met");
1055     error_code ec;
1056     auto const bytes_written = read_some(buffers, ec);
1057     if(ec)
1058         BOOST_THROW_EXCEPTION(system_error{ec});
1059     return bytes_written;
1060 }
1061 
1062 template<class NextLayer, bool deflateSupported>
1063 template<class MutableBufferSequence>
1064 std::size_t
1065 stream<NextLayer, deflateSupported>::
1066 read_some(
1067     MutableBufferSequence const& buffers,
1068     error_code& ec)
1069 {
1070     static_assert(is_sync_stream<next_layer_type>::value,
1071         "SyncStream type requirements not met");
1072     static_assert(net::is_mutable_buffer_sequence<
1073             MutableBufferSequence>::value,
1074         "MutableBufferSequence type requirements not met");
1075     using beast::detail::clamp;
1076     auto& impl = *impl_;
1077     close_code code{};
1078     std::size_t bytes_written = 0;
1079     ec = {};
1080     // Make sure the stream is open
1081     if(impl.check_stop_now(ec))
1082         return bytes_written;
1083 loop:
1084     // See if we need to read a frame header. This
1085     // condition is structured to give the decompressor
1086     // a chance to emit the final empty deflate block
1087     //
1088     if(impl.rd_remain == 0 && (
1089         ! impl.rd_fh.fin || impl.rd_done))
1090     {
1091         // Read frame header
1092         error_code result;
1093         while(! impl.parse_fh(impl.rd_fh, impl.rd_buf, result))
1094         {
1095             if(result)
1096             {
1097                 // _Fail the WebSocket Connection_
1098                 if(result == error::message_too_big)
1099                     code = close_code::too_big;
1100                 else
1101                     code = close_code::protocol_error;
1102                 do_fail(code, result, ec);
1103                 return bytes_written;
1104             }
1105             auto const bytes_transferred =
1106                 impl.stream().read_some(
1107                     impl.rd_buf.prepare(read_size(
1108                         impl.rd_buf, impl.rd_buf.max_size())),
1109                     ec);
1110             impl.rd_buf.commit(bytes_transferred);
1111             if(impl.check_stop_now(ec))
1112                 return bytes_written;
1113         }
1114         // Immediately apply the mask to the portion
1115         // of the buffer holding payload data.
1116         if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
1117             detail::mask_inplace(buffers_prefix(
1118                 clamp(impl.rd_fh.len), impl.rd_buf.data()),
1119                     impl.rd_key);
1120         if(detail::is_control(impl.rd_fh.op))
1121         {
1122             // Get control frame payload
1123             auto const b = buffers_prefix(
1124                 clamp(impl.rd_fh.len), impl.rd_buf.data());
1125             auto const len = buffer_bytes(b);
1126             BOOST_ASSERT(len == impl.rd_fh.len);
1127 
1128             // Clear this otherwise the next
1129             // frame will be considered final.
1130             impl.rd_fh.fin = false;
1131 
1132             // Handle ping frame
1133             if(impl.rd_fh.op == detail::opcode::ping)
1134             {
1135                 ping_data payload;
1136                 detail::read_ping(payload, b);
1137                 impl.rd_buf.consume(len);
1138                 if(impl.wr_close)
1139                 {
1140                     // Ignore ping when closing
1141                     goto loop;
1142                 }
1143                 if(impl.ctrl_cb)
1144                     impl.ctrl_cb(frame_type::ping, to_string_view(payload));
1145                 detail::frame_buffer fb;
1146                 impl.template write_ping<flat_static_buffer_base>(fb,
1147                     detail::opcode::pong, payload);
1148                 net::write(impl.stream(), fb.data(), ec);
1149                 if(impl.check_stop_now(ec))
1150                     return bytes_written;
1151                 goto loop;
1152             }
1153             // Handle pong frame
1154             if(impl.rd_fh.op == detail::opcode::pong)
1155             {
1156                 ping_data payload;
1157                 detail::read_ping(payload, b);
1158                 impl.rd_buf.consume(len);
1159                 if(impl.ctrl_cb)
1160                     impl.ctrl_cb(frame_type::pong, to_string_view(payload));
1161                 goto loop;
1162             }
1163             // Handle close frame
1164             BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close);
1165             {
1166                 BOOST_ASSERT(! impl.rd_close);
1167                 impl.rd_close = true;
1168                 close_reason cr;
1169                 detail::read_close(cr, b, result);
1170                 if(result)
1171                 {
1172                     // _Fail the WebSocket Connection_
1173                     do_fail(close_code::protocol_error,
1174                         result, ec);
1175                     return bytes_written;
1176                 }
1177                 impl.cr = cr;
1178                 impl.rd_buf.consume(len);
1179                 if(impl.ctrl_cb)
1180                     impl.ctrl_cb(frame_type::close, to_string_view(impl.cr.reason));
1181                 BOOST_ASSERT(! impl.wr_close);
1182                 // _Start the WebSocket Closing Handshake_
1183                 do_fail(
1184                     cr.code == close_code::none ?
1185                         close_code::normal :
1186                         static_cast<close_code>(cr.code),
1187                     error::closed, ec);
1188                 return bytes_written;
1189             }
1190         }
1191         if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin)
1192         {
1193             // Empty non-final frame
1194             goto loop;
1195         }
1196         impl.rd_done = false;
1197     }
1198     else
1199     {
1200         ec = {};
1201     }
1202     if(! impl.rd_deflated())
1203     {
1204         if(impl.rd_remain > 0)
1205         {
1206             if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() >
1207                 (std::min)(clamp(impl.rd_remain),
1208                     buffer_bytes(buffers)))
1209             {
1210                 // Fill the read buffer first, otherwise we
1211                 // get fewer bytes at the cost of one I/O.
1212                 impl.rd_buf.commit(impl.stream().read_some(
1213                     impl.rd_buf.prepare(read_size(impl.rd_buf,
1214                         impl.rd_buf.max_size())), ec));
1215                 if(impl.check_stop_now(ec))
1216                     return bytes_written;
1217                 if(impl.rd_fh.mask)
1218                     detail::mask_inplace(
1219                         buffers_prefix(clamp(impl.rd_remain),
1220                             impl.rd_buf.data()), impl.rd_key);
1221             }
1222             if(impl.rd_buf.size() > 0)
1223             {
1224                 // Copy from the read buffer.
1225                 // The mask was already applied.
1226                 auto const bytes_transferred = net::buffer_copy(
1227                     buffers, impl.rd_buf.data(),
1228                         clamp(impl.rd_remain));
1229                 auto const mb = buffers_prefix(
1230                     bytes_transferred, buffers);
1231                 impl.rd_remain -= bytes_transferred;
1232                 if(impl.rd_op == detail::opcode::text)
1233                 {
1234                     if(! impl.rd_utf8.write(mb) ||
1235                         (impl.rd_remain == 0 && impl.rd_fh.fin &&
1236                             ! impl.rd_utf8.finish()))
1237                     {
1238                         // _Fail the WebSocket Connection_
1239                         do_fail(close_code::bad_payload,
1240                             error::bad_frame_payload, ec);
1241                         return bytes_written;
1242                     }
1243                 }
1244                 bytes_written += bytes_transferred;
1245                 impl.rd_size += bytes_transferred;
1246                 impl.rd_buf.consume(bytes_transferred);
1247             }
1248             else
1249             {
1250                 // Read into caller's buffer
1251                 BOOST_ASSERT(impl.rd_remain > 0);
1252                 BOOST_ASSERT(buffer_bytes(buffers) > 0);
1253                 BOOST_ASSERT(buffer_bytes(buffers_prefix(
1254                     clamp(impl.rd_remain), buffers)) > 0);
1255                 auto const bytes_transferred =
1256                     impl.stream().read_some(buffers_prefix(
1257                         clamp(impl.rd_remain), buffers), ec);
1258                 // VFALCO What if some bytes were written?
1259                 if(impl.check_stop_now(ec))
1260                     return bytes_written;
1261                 BOOST_ASSERT(bytes_transferred > 0);
1262                 auto const mb = buffers_prefix(
1263                     bytes_transferred, buffers);
1264                 impl.rd_remain -= bytes_transferred;
1265                 if(impl.rd_fh.mask)
1266                     detail::mask_inplace(mb, impl.rd_key);
1267                 if(impl.rd_op == detail::opcode::text)
1268                 {
1269                     if(! impl.rd_utf8.write(mb) ||
1270                         (impl.rd_remain == 0 && impl.rd_fh.fin &&
1271                             ! impl.rd_utf8.finish()))
1272                     {
1273                         // _Fail the WebSocket Connection_
1274                         do_fail(close_code::bad_payload,
1275                             error::bad_frame_payload, ec);
1276                         return bytes_written;
1277                     }
1278                 }
1279                 bytes_written += bytes_transferred;
1280                 impl.rd_size += bytes_transferred;
1281             }
1282         }
1283         BOOST_ASSERT( ! impl.rd_done );
1284         if( impl.rd_remain == 0 && impl.rd_fh.fin )
1285             impl.rd_done = true;
1286     }
1287     else
1288     {
1289         // Read compressed message frame payload:
1290         // inflate even if rd_fh_.len == 0, otherwise we
1291         // never emit the end-of-stream deflate block.
1292         //
1293         bool did_read = false;
1294         buffers_suffix<MutableBufferSequence> cb(buffers);
1295         while(buffer_bytes(cb) > 0)
1296         {
1297             zlib::z_params zs;
1298             {
1299                 auto const out = beast::buffers_front(cb);
1300                 zs.next_out = out.data();
1301                 zs.avail_out = out.size();
1302                 BOOST_ASSERT(zs.avail_out > 0);
1303             }
1304             // boolean to track the end of the message.
1305             bool fin = false;
1306             if(impl.rd_remain > 0)
1307             {
1308                 if(impl.rd_buf.size() > 0)
1309                 {
1310                     // use what's there
1311                     auto const in = buffers_prefix(
1312                         clamp(impl.rd_remain), beast::buffers_front(
1313                             impl.rd_buf.data()));
1314                     zs.avail_in = in.size();
1315                     zs.next_in = in.data();
1316                 }
1317                 else if(! did_read)
1318                 {
1319                     // read new
1320                     auto const bytes_transferred =
1321                         impl.stream().read_some(
1322                             impl.rd_buf.prepare(read_size(
1323                                 impl.rd_buf, impl.rd_buf.max_size())),
1324                             ec);
1325                     if(impl.check_stop_now(ec))
1326                         return bytes_written;
1327                     BOOST_ASSERT(bytes_transferred > 0);
1328                     impl.rd_buf.commit(bytes_transferred);
1329                     if(impl.rd_fh.mask)
1330                         detail::mask_inplace(
1331                             buffers_prefix(clamp(impl.rd_remain),
1332                                 impl.rd_buf.data()), impl.rd_key);
1333                     auto const in = buffers_prefix(
1334                         clamp(impl.rd_remain), buffers_front(
1335                             impl.rd_buf.data()));
1336                     zs.avail_in = in.size();
1337                     zs.next_in = in.data();
1338                     did_read = true;
1339                 }
1340                 else
1341                 {
1342                     break;
1343                 }
1344             }
1345             else if(impl.rd_fh.fin)
1346             {
1347                 // append the empty block codes
1348                 static std::uint8_t constexpr
1349                     empty_block[4] = { 0x00, 0x00, 0xff, 0xff };
1350                 zs.next_in = empty_block;
1351                 zs.avail_in = sizeof(empty_block);
1352                 fin = true;
1353             }
1354             else
1355             {
1356                 break;
1357             }
1358             impl.inflate(zs, zlib::Flush::sync, ec);
1359             if(impl.check_stop_now(ec))
1360                 return bytes_written;
1361             if (fin && zs.total_out == 0) {
1362                 impl.do_context_takeover_read(impl.role);
1363                 impl.rd_done = true;
1364                 break;
1365             }
1366             if(impl.rd_msg_max && beast::detail::sum_exceeds(
1367                 impl.rd_size, zs.total_out, impl.rd_msg_max))
1368             {
1369                 do_fail(close_code::too_big,
1370                     error::message_too_big, ec);
1371                 return bytes_written;
1372             }
1373             cb.consume(zs.total_out);
1374             impl.rd_size += zs.total_out;
1375             if (! fin) {
1376                 impl.rd_remain -= zs.total_in;
1377                 impl.rd_buf.consume(zs.total_in);
1378             }
1379             bytes_written += zs.total_out;
1380         }
1381         if(impl.rd_op == detail::opcode::text)
1382         {
1383             // check utf8
1384             if(! impl.rd_utf8.write(beast::buffers_prefix(
1385                 bytes_written, buffers)) || (
1386                     impl.rd_done && ! impl.rd_utf8.finish()))
1387             {
1388                 // _Fail the WebSocket Connection_
1389                 do_fail(close_code::bad_payload,
1390                     error::bad_frame_payload, ec);
1391                 return bytes_written;
1392             }
1393         }
1394     }
1395     return bytes_written;
1396 }
1397 
1398 template<class NextLayer, bool deflateSupported>
1399 template<class MutableBufferSequence, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>
1400 BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
1401 stream<NextLayer, deflateSupported>::
1402 async_read_some(
1403     MutableBufferSequence const& buffers,
1404     ReadHandler&& handler)
1405 {
1406     static_assert(is_async_stream<next_layer_type>::value,
1407         "AsyncStream type requirements not met");
1408     static_assert(net::is_mutable_buffer_sequence<
1409             MutableBufferSequence>::value,
1410         "MutableBufferSequence type requirements not met");
1411     return net::async_initiate<
1412         ReadHandler,
1413         void(error_code, std::size_t)>(
1414             run_read_some_op{impl_},
1415             handler,
1416             buffers);
1417 }
1418 
1419 } // websocket
1420 } // beast
1421 } // boost
1422 
1423 #endif