Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-09-18 08:36:09

0001 //
0002 // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
0003 //
0004 // Distributed under the Boost Software License, Version 1.0. (See accompanying
0005 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 // Official repository: https://github.com/boostorg/beast
0008 //
0009 
0010 #ifndef BOOST_BEAST_WEBSOCKET_IMPL_READ_HPP
0011 #define BOOST_BEAST_WEBSOCKET_IMPL_READ_HPP
0012 
0013 #include <boost/beast/core/buffer_traits.hpp>
0014 #include <boost/beast/websocket/teardown.hpp>
0015 #include <boost/beast/websocket/detail/mask.hpp>
0016 #include <boost/beast/websocket/impl/stream_impl.hpp>
0017 #include <boost/beast/core/async_base.hpp>
0018 #include <boost/beast/core/buffers_prefix.hpp>
0019 #include <boost/beast/core/buffers_suffix.hpp>
0020 #include <boost/beast/core/flat_static_buffer.hpp>
0021 #include <boost/beast/core/read_size.hpp>
0022 #include <boost/beast/core/stream_traits.hpp>
0023 #include <boost/beast/core/detail/bind_continuation.hpp>
0024 #include <boost/beast/core/detail/buffer.hpp>
0025 #include <boost/beast/core/detail/clamp.hpp>
0026 #include <boost/beast/core/detail/config.hpp>
0027 #include <boost/asio/coroutine.hpp>
0028 #include <boost/assert.hpp>
0029 #include <boost/config.hpp>
0030 #include <boost/optional.hpp>
0031 #include <boost/throw_exception.hpp>
0032 #include <algorithm>
0033 #include <limits>
0034 #include <memory>
0035 
0036 namespace boost {
0037 namespace beast {
0038 namespace websocket {
0039 
0040 /*  Read some message data into a buffer sequence.
0041 
0042     Also reads and handles control frames.
0043 */
0044 template<class NextLayer, bool deflateSupported>
0045 template<class Handler, class MutableBufferSequence>
0046 class stream<NextLayer, deflateSupported>::read_some_op
0047     : public beast::async_base<
0048         Handler, beast::executor_type<stream>>
0049     , public asio::coroutine
0050 {
0051     boost::weak_ptr<impl_type> wp_;
0052     MutableBufferSequence bs_;
0053     buffers_suffix<MutableBufferSequence> cb_;
0054     std::size_t bytes_written_ = 0;
0055     error_code result_;
0056     close_code code_;
0057     bool did_read_ = false;
0058 
0059 public:
0060     static constexpr int id = 1; // for soft_mutex
0061 
0062     template<class Handler_>
0063     read_some_op(
0064         Handler_&& h,
0065         boost::shared_ptr<impl_type> const& sp,
0066         MutableBufferSequence const& bs)
0067         : async_base<
0068             Handler, beast::executor_type<stream>>(
0069                 std::forward<Handler_>(h),
0070                     sp->stream().get_executor())
0071         , wp_(sp)
0072         , bs_(bs)
0073         , cb_(bs)
0074         , code_(close_code::none)
0075     {
0076         (*this)({}, 0, false);
0077     }
0078 
0079     void operator()(
0080         error_code ec = {},
0081         std::size_t bytes_transferred = 0,
0082         bool cont = true)
0083     {
0084         using beast::detail::clamp;
0085         auto sp = wp_.lock();
0086         if(! sp)
0087         {
0088             BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
0089             bytes_written_ = 0;
0090             return this->complete(cont, ec, bytes_written_);
0091         }
0092         auto& impl = *sp;
0093         BOOST_ASIO_CORO_REENTER(*this)
0094         {
0095             impl.update_timer(this->get_executor());
0096 
0097         acquire_read_lock:
0098             // Acquire the read lock
0099             if(! impl.rd_block.try_lock(this))
0100             {
0101             do_suspend:
0102                 BOOST_ASIO_CORO_YIELD
0103                 {
0104                     BOOST_ASIO_HANDLER_LOCATION((
0105                         __FILE__, __LINE__,
0106                         "websocket::async_read_some"));
0107 
0108                     this->set_allowed_cancellation(net::cancellation_type::all);
0109                     impl.op_r_rd.emplace(std::move(*this), net::cancellation_type::all);
0110                 }
0111                 if (ec)
0112                     return this->complete(cont, ec, bytes_written_);
0113 
0114                 this->set_allowed_cancellation(net::cancellation_type::terminal);
0115 
0116                 impl.rd_block.lock(this);
0117                 BOOST_ASIO_CORO_YIELD
0118                 {
0119                     BOOST_ASIO_HANDLER_LOCATION((
0120                         __FILE__, __LINE__,
0121                         "websocket::async_read_some"));
0122 
0123                     const auto ex = this->get_immediate_executor();
0124                     net::dispatch(ex, std::move(*this));
0125                 }
0126                 BOOST_ASSERT(impl.rd_block.is_locked(this));
0127 
0128                 BOOST_ASSERT(!ec);
0129                 if(impl.check_stop_now(ec))
0130                 {
0131                     // Issue 2264 - There is no guarantee that the next
0132                     // error will be operation_aborted.
0133                     // The error could be a result of the peer resetting the 
0134                     // connection
0135                     // BOOST_ASSERT(ec == net::error::operation_aborted);
0136                     goto upcall;
0137                 }
0138                 // VFALCO Should never get here
0139 
0140                 // The only way to get read blocked is if
0141                 // a `close_op` wrote a close frame
0142                 BOOST_ASSERT(impl.wr_close);
0143                 BOOST_ASSERT(impl.status_ != status::open);
0144                 BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
0145                 goto upcall;
0146             }
0147             else
0148             {
0149                 // Make sure the stream is not closed
0150                 if( impl.status_ == status::closed ||
0151                     impl.status_ == status::failed)
0152                 {
0153                     BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
0154                     goto upcall;
0155                 }
0156             }
0157 
0158             // if status_ == status::closing, we want to suspend
0159             // the read operation until the close completes,
0160             // then finish the read with operation_aborted.
0161 
0162         loop:
0163             BOOST_ASSERT(impl.rd_block.is_locked(this));
0164             // See if we need to read a frame header. This
0165             // condition is structured to give the decompressor
0166             // a chance to emit the final empty deflate block
0167             //
0168             if(impl.rd_remain == 0 &&
0169                 (! impl.rd_fh.fin || impl.rd_done))
0170             {
0171                 // Read frame header
0172                 while(! impl.parse_fh(
0173                     impl.rd_fh, impl.rd_buf, result_))
0174                 {
0175                     if(result_)
0176                     {
0177                         // _Fail the WebSocket Connection_
0178                         if(result_ == error::message_too_big)
0179                             code_ = close_code::too_big;
0180                         else
0181                             code_ = close_code::protocol_error;
0182                         goto close;
0183                     }
0184                     BOOST_ASSERT(impl.rd_block.is_locked(this));
0185                     BOOST_ASIO_CORO_YIELD
0186                     {
0187                         BOOST_ASIO_HANDLER_LOCATION((
0188                             __FILE__, __LINE__,
0189                             "websocket::async_read_some"));
0190 
0191                         impl.stream().async_read_some(
0192                             impl.rd_buf.prepare(read_size(
0193                                 impl.rd_buf, impl.rd_buf.max_size())),
0194                                     std::move(*this));
0195                     }
0196                     BOOST_ASSERT(impl.rd_block.is_locked(this));
0197                     impl.rd_buf.commit(bytes_transferred);
0198                     if(impl.check_stop_now(ec))
0199                         goto upcall;
0200                     impl.reset_idle();
0201 
0202                     // Allow a close operation
0203                     // to acquire the read block
0204                     impl.rd_block.unlock(this);
0205                     if( impl.op_r_close.maybe_invoke())
0206                     {
0207                         // Suspend
0208                         BOOST_ASSERT(impl.rd_block.is_locked());
0209                         goto do_suspend;
0210                     }
0211                     // Acquire read block
0212                     impl.rd_block.lock(this);
0213                 }
0214                 // Immediately apply the mask to the portion
0215                 // of the buffer holding payload data.
0216                 if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
0217                     detail::mask_inplace(buffers_prefix(
0218                         clamp(impl.rd_fh.len),
0219                             impl.rd_buf.data()),
0220                                 impl.rd_key);
0221                 if(detail::is_control(impl.rd_fh.op))
0222                 {
0223                     // Clear this otherwise the next
0224                     // frame will be considered final.
0225                     impl.rd_fh.fin = false;
0226 
0227                     // Handle ping frame
0228                     if(impl.rd_fh.op == detail::opcode::ping)
0229                     {
0230                         impl.update_timer(this->get_executor());
0231 
0232                         if(impl.ctrl_cb)
0233                         {
0234                             if(! cont)
0235                             {
0236                                 BOOST_ASIO_CORO_YIELD
0237                                 {
0238                                     BOOST_ASIO_HANDLER_LOCATION((
0239                                         __FILE__, __LINE__,
0240                                         "websocket::async_read_some"));
0241 
0242                                     const auto ex = this->get_immediate_executor();
0243                                     net::dispatch(ex, std::move(*this));
0244                                 }
0245                                 BOOST_ASSERT(cont);
0246                                 // VFALCO call check_stop_now() here?
0247                             }
0248                         }
0249                         {
0250                             auto const b = buffers_prefix(
0251                                 clamp(impl.rd_fh.len),
0252                                     impl.rd_buf.data());
0253                             auto const len = buffer_bytes(b);
0254                             BOOST_ASSERT(len == impl.rd_fh.len);
0255                             ping_data payload;
0256                             detail::read_ping(payload, b);
0257                             impl.rd_buf.consume(len);
0258                             // Ignore ping when closing
0259                             if(impl.status_ == status::closing)
0260                                 goto loop;
0261                             if(impl.ctrl_cb)
0262                                 impl.ctrl_cb(
0263                                     frame_type::ping, to_string_view(payload));
0264                             impl.rd_fb.clear();
0265                             impl.template write_ping<
0266                                 flat_static_buffer_base>(impl.rd_fb,
0267                                     detail::opcode::pong, payload);
0268                         }
0269 
0270                         // Allow a close operation
0271                         // to acquire the read block
0272                         impl.rd_block.unlock(this);
0273                         impl.op_r_close.maybe_invoke();
0274 
0275                         // Acquire the write lock
0276                         if(! impl.wr_block.try_lock(this))
0277                         {
0278                             BOOST_ASIO_CORO_YIELD
0279                             {
0280                                 BOOST_ASIO_HANDLER_LOCATION((
0281                                     __FILE__, __LINE__,
0282                                     "websocket::async_read_some"));
0283 
0284                                 impl.op_rd.emplace(std::move(*this));
0285                             }
0286                             if (ec)
0287                                 return this->complete(cont, ec, bytes_written_);
0288 
0289                             impl.wr_block.lock(this);
0290                             BOOST_ASIO_CORO_YIELD
0291                             {
0292                                 BOOST_ASIO_HANDLER_LOCATION((
0293                                     __FILE__, __LINE__,
0294                                     "websocket::async_read_some"));
0295 
0296                                 const auto ex = this->get_immediate_executor();
0297                                 net::dispatch(ex, std::move(*this));
0298                             }
0299                             BOOST_ASSERT(impl.wr_block.is_locked(this));
0300                             if(impl.check_stop_now(ec))
0301                                 goto upcall;
0302                         }
0303 
0304                         // Send pong
0305                         BOOST_ASSERT(impl.wr_block.is_locked(this));
0306                         BOOST_ASIO_CORO_YIELD
0307                         {
0308                             BOOST_ASIO_HANDLER_LOCATION((
0309                                 __FILE__, __LINE__,
0310                                 "websocket::async_read_some"));
0311 
0312                             net::async_write(
0313                                 impl.stream(), net::const_buffer(impl.rd_fb.data()),
0314                                 beast::detail::bind_continuation(std::move(*this)));
0315                         }
0316                         BOOST_ASSERT(impl.wr_block.is_locked(this));
0317                         if(impl.check_stop_now(ec))
0318                             goto upcall;
0319                         impl.wr_block.unlock(this);
0320                         impl.op_close.maybe_invoke()
0321                             || impl.op_idle_ping.maybe_invoke()
0322                             || impl.op_ping.maybe_invoke()
0323                             || impl.op_wr.maybe_invoke();
0324                         goto acquire_read_lock;
0325                     }
0326 
0327                     // Handle pong frame
0328                     if(impl.rd_fh.op == detail::opcode::pong)
0329                     {
0330                         // Ignore pong when closing
0331                         if(! impl.wr_close && impl.ctrl_cb)
0332                         {
0333                             if(! cont)
0334                             {
0335                                 BOOST_ASIO_CORO_YIELD
0336                                 {
0337                                     BOOST_ASIO_HANDLER_LOCATION((
0338                                         __FILE__, __LINE__,
0339                                         "websocket::async_read_some"));
0340 
0341                                     const auto ex = this->get_immediate_executor();
0342                                     net::dispatch(ex, std::move(*this));
0343                                 }
0344                                 BOOST_ASSERT(cont);
0345                             }
0346                         }
0347                         auto const cb = buffers_prefix(clamp(
0348                             impl.rd_fh.len), impl.rd_buf.data());
0349                         auto const len = buffer_bytes(cb);
0350                         BOOST_ASSERT(len == impl.rd_fh.len);
0351                         ping_data payload;
0352                         detail::read_ping(payload, cb);
0353                         impl.rd_buf.consume(len);
0354                         // Ignore pong when closing
0355                         if(! impl.wr_close && impl.ctrl_cb)
0356                             impl.ctrl_cb(frame_type::pong, to_string_view(payload));
0357                         goto loop;
0358                     }
0359 
0360                     // Handle close frame
0361                     BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close);
0362                     {
0363                         if(impl.ctrl_cb)
0364                         {
0365                             if(! cont)
0366                             {
0367                                 BOOST_ASIO_CORO_YIELD
0368                                 {
0369                                     BOOST_ASIO_HANDLER_LOCATION((
0370                                         __FILE__, __LINE__,
0371                                         "websocket::async_read_some"));
0372 
0373                                     const auto ex = this->get_immediate_executor();
0374                                     net::dispatch(ex, std::move(*this));
0375                                 }
0376                                 BOOST_ASSERT(cont);
0377                             }
0378                         }
0379                         auto const cb = buffers_prefix(clamp(
0380                             impl.rd_fh.len), impl.rd_buf.data());
0381                         auto const len = buffer_bytes(cb);
0382                         BOOST_ASSERT(len == impl.rd_fh.len);
0383                         BOOST_ASSERT(! impl.rd_close);
0384                         impl.rd_close = true;
0385                         close_reason cr;
0386                         detail::read_close(cr, cb, result_);
0387                         if(result_)
0388                         {
0389                             // _Fail the WebSocket Connection_
0390                             code_ = close_code::protocol_error;
0391                             goto close;
0392                         }
0393                         impl.cr = cr;
0394                         impl.rd_buf.consume(len);
0395                         if(impl.ctrl_cb)
0396                             impl.ctrl_cb(frame_type::close,
0397                                 to_string_view(impl.cr.reason));
0398                         // See if we are already closing
0399                         if(impl.status_ == status::closing)
0400                         {
0401                             // _Close the WebSocket Connection_
0402                             BOOST_ASSERT(impl.wr_close);
0403                             code_ = close_code::none;
0404                             result_ = error::closed;
0405                             goto close;
0406                         }
0407                         // _Start the WebSocket Closing Handshake_
0408                         code_ = cr.code == close_code::none ?
0409                             close_code::normal :
0410                             static_cast<close_code>(cr.code);
0411                         result_ = error::closed;
0412                         goto close;
0413                     }
0414                 }
0415                 if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin)
0416                 {
0417                     // Empty non-final frame
0418                     goto loop;
0419                 }
0420                 impl.rd_done = false;
0421             }
0422             if(! impl.rd_deflated())
0423             {
0424                 if(impl.rd_remain > 0)
0425                 {
0426                     if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() >
0427                         (std::min)(clamp(impl.rd_remain),
0428                             buffer_bytes(cb_)))
0429                     {
0430                         // Fill the read buffer first, otherwise we
0431                         // get fewer bytes at the cost of one I/O.
0432                         BOOST_ASIO_CORO_YIELD
0433                         {
0434                             BOOST_ASIO_HANDLER_LOCATION((
0435                                 __FILE__, __LINE__,
0436                                 "websocket::async_read_some"));
0437 
0438                             impl.stream().async_read_some(
0439                                 impl.rd_buf.prepare(read_size(
0440                                     impl.rd_buf, impl.rd_buf.max_size())),
0441                                         std::move(*this));
0442                         }
0443                         impl.rd_buf.commit(bytes_transferred);
0444                         if(impl.check_stop_now(ec))
0445                             goto upcall;
0446                         impl.reset_idle();
0447                         if(impl.rd_fh.mask)
0448                             detail::mask_inplace(buffers_prefix(clamp(
0449                                 impl.rd_remain), impl.rd_buf.data()),
0450                                     impl.rd_key);
0451                     }
0452                     if(impl.rd_buf.size() > 0)
0453                     {
0454                         // Copy from the read buffer.
0455                         // The mask was already applied.
0456                         bytes_transferred = net::buffer_copy(cb_,
0457                             impl.rd_buf.data(), clamp(impl.rd_remain));
0458                         auto const mb = buffers_prefix(
0459                             bytes_transferred, cb_);
0460                         impl.rd_remain -= bytes_transferred;
0461                         if(impl.rd_op == detail::opcode::text)
0462                         {
0463                             if(! impl.rd_utf8.write(mb) ||
0464                                 (impl.rd_remain == 0 && impl.rd_fh.fin &&
0465                                     ! impl.rd_utf8.finish()))
0466                             {
0467                                 // _Fail the WebSocket Connection_
0468                                 code_ = close_code::bad_payload;
0469                                 result_ = error::bad_frame_payload;
0470                                 goto close;
0471                             }
0472                         }
0473                         bytes_written_ += bytes_transferred;
0474                         impl.rd_size += bytes_transferred;
0475                         impl.rd_buf.consume(bytes_transferred);
0476                     }
0477                     else
0478                     {
0479                         // Read into caller's buffer
0480                         BOOST_ASSERT(impl.rd_remain > 0);
0481                         BOOST_ASSERT(buffer_bytes(cb_) > 0);
0482                         BOOST_ASSERT(buffer_bytes(buffers_prefix(
0483                             clamp(impl.rd_remain), cb_)) > 0);
0484                         BOOST_ASIO_CORO_YIELD
0485                         {
0486                             BOOST_ASIO_HANDLER_LOCATION((
0487                                 __FILE__, __LINE__,
0488                                 "websocket::async_read_some"));
0489 
0490                             impl.stream().async_read_some(buffers_prefix(
0491                                 clamp(impl.rd_remain), cb_), std::move(*this));
0492                         }
0493                         if(impl.check_stop_now(ec))
0494                             goto upcall;
0495                         impl.reset_idle();
0496                         BOOST_ASSERT(bytes_transferred > 0);
0497                         auto const mb = buffers_prefix(
0498                             bytes_transferred, cb_);
0499                         impl.rd_remain -= bytes_transferred;
0500                         if(impl.rd_fh.mask)
0501                             detail::mask_inplace(mb, impl.rd_key);
0502                         if(impl.rd_op == detail::opcode::text)
0503                         {
0504                             if(! impl.rd_utf8.write(mb) ||
0505                                 (impl.rd_remain == 0 && impl.rd_fh.fin &&
0506                                     ! impl.rd_utf8.finish()))
0507                             {
0508                                 // _Fail the WebSocket Connection_
0509                                 code_ = close_code::bad_payload;
0510                                 result_ = error::bad_frame_payload;
0511                                 goto close;
0512                             }
0513                         }
0514                         bytes_written_ += bytes_transferred;
0515                         impl.rd_size += bytes_transferred;
0516                     }
0517                 }
0518                 BOOST_ASSERT( ! impl.rd_done );
0519                 if( impl.rd_remain == 0 && impl.rd_fh.fin )
0520                     impl.rd_done = true;
0521             }
0522             else
0523             {
0524                 // Read compressed message frame payload:
0525                 // inflate even if rd_fh_.len == 0, otherwise we
0526                 // never emit the end-of-stream deflate block.
0527                 while(buffer_bytes(cb_) > 0)
0528                 {
0529                     if( impl.rd_remain > 0 &&
0530                         impl.rd_buf.size() == 0 &&
0531                         ! did_read_)
0532                     {
0533                         // read new
0534                         BOOST_ASIO_CORO_YIELD
0535                         {
0536                             BOOST_ASIO_HANDLER_LOCATION((
0537                                 __FILE__, __LINE__,
0538                                 "websocket::async_read_some"));
0539 
0540                             impl.stream().async_read_some(
0541                                 impl.rd_buf.prepare(read_size(
0542                                     impl.rd_buf, impl.rd_buf.max_size())),
0543                                         std::move(*this));
0544                         }
0545                         if(impl.check_stop_now(ec))
0546                             goto upcall;
0547                         impl.reset_idle();
0548                         BOOST_ASSERT(bytes_transferred > 0);
0549                         impl.rd_buf.commit(bytes_transferred);
0550                         if(impl.rd_fh.mask)
0551                             detail::mask_inplace(
0552                                 buffers_prefix(clamp(impl.rd_remain),
0553                                     impl.rd_buf.data()), impl.rd_key);
0554                         did_read_ = true;
0555                     }
0556                     zlib::z_params zs;
0557                     {
0558                         auto const out = buffers_front(cb_);
0559                         zs.next_out = out.data();
0560                         zs.avail_out = out.size();
0561                         BOOST_ASSERT(zs.avail_out > 0);
0562                     }
0563                     // boolean to track the end of the message.
0564                     bool fin = false;
0565                     if(impl.rd_remain > 0)
0566                     {
0567                         if(impl.rd_buf.size() > 0)
0568                         {
0569                             // use what's there
0570                             auto const in = buffers_prefix(
0571                                 clamp(impl.rd_remain), buffers_front(
0572                                     impl.rd_buf.data()));
0573                             zs.avail_in = in.size();
0574                             zs.next_in = in.data();
0575                         }
0576                         else
0577                         {
0578                             break;
0579                         }
0580                     }
0581                     else if(impl.rd_fh.fin)
0582                     {
0583                         // append the empty block codes
0584                         static std::uint8_t constexpr
0585                             empty_block[4] = { 0x00, 0x00, 0xff, 0xff };
0586                         zs.next_in = empty_block;
0587                         zs.avail_in = sizeof(empty_block);
0588                         fin = true;
0589                     }
0590                     else
0591                     {
0592                         break;
0593                     }
0594                     impl.inflate(zs, zlib::Flush::sync, ec);
0595                     if(impl.check_stop_now(ec))
0596                         goto upcall;
0597                     if(fin && zs.total_out == 0) {
0598                         impl.do_context_takeover_read(impl.role);
0599                         impl.rd_done = true;
0600                         break;
0601                     }
0602                     if(impl.rd_msg_max && beast::detail::sum_exceeds(
0603                         impl.rd_size, zs.total_out, impl.rd_msg_max))
0604                     {
0605                         // _Fail the WebSocket Connection_
0606                         code_ = close_code::too_big;
0607                         result_ = error::message_too_big;
0608                         goto close;
0609                     }
0610                     cb_.consume(zs.total_out);
0611                     impl.rd_size += zs.total_out;
0612                     if (! fin) {
0613                         impl.rd_remain -= zs.total_in;
0614                         impl.rd_buf.consume(zs.total_in);
0615                     }
0616                     bytes_written_ += zs.total_out;
0617                 }
0618                 if(impl.rd_op == detail::opcode::text)
0619                 {
0620                     // check utf8
0621                     if(! impl.rd_utf8.write(
0622                         buffers_prefix(bytes_written_, bs_)) || (
0623                             impl.rd_done && ! impl.rd_utf8.finish()))
0624                     {
0625                         // _Fail the WebSocket Connection_
0626                         code_ = close_code::bad_payload;
0627                         result_ = error::bad_frame_payload;
0628                         goto close;
0629                     }
0630                 }
0631             }
0632             goto upcall;
0633 
0634         close:
0635             // Acquire the write lock
0636             if(! impl.wr_block.try_lock(this))
0637             {
0638                 BOOST_ASIO_CORO_YIELD
0639                 {
0640                     BOOST_ASIO_HANDLER_LOCATION((
0641                         __FILE__, __LINE__,
0642                         "websocket::async_read_some"));
0643 
0644                     impl.op_rd.emplace(std::move(*this));
0645                 }
0646                 if (ec)
0647                     return this->complete(cont, ec, bytes_written_);
0648 
0649                 impl.wr_block.lock(this);
0650                 BOOST_ASIO_CORO_YIELD
0651                 {
0652                     BOOST_ASIO_HANDLER_LOCATION((
0653                         __FILE__, __LINE__,
0654                         "websocket::async_read_some"));
0655 
0656                     const auto ex = this->get_immediate_executor();
0657                     net::dispatch(ex, std::move(*this));
0658                 }
0659                 BOOST_ASSERT(impl.wr_block.is_locked(this));
0660                 if(impl.check_stop_now(ec))
0661                     goto upcall;
0662             }
0663 
0664             impl.change_status(status::closing);
0665 
0666             if(! impl.wr_close)
0667             {
0668                 impl.wr_close = true;
0669 
0670                 // Serialize close frame
0671                 impl.rd_fb.clear();
0672                 impl.template write_close<
0673                     flat_static_buffer_base>(
0674                         impl.rd_fb, code_);
0675 
0676                 // Send close frame
0677                 BOOST_ASSERT(impl.wr_block.is_locked(this));
0678                 BOOST_ASIO_CORO_YIELD
0679                 {
0680                     BOOST_ASIO_HANDLER_LOCATION((
0681                         __FILE__, __LINE__,
0682                         "websocket::async_read_some"));
0683 
0684                     net::async_write(impl.stream(), net::const_buffer(impl.rd_fb.data()),
0685                         beast::detail::bind_continuation(std::move(*this)));
0686                 }
0687                 BOOST_ASSERT(impl.wr_block.is_locked(this));
0688                 if(impl.check_stop_now(ec))
0689                     goto upcall;
0690             }
0691 
0692             // Teardown
0693             using beast::websocket::async_teardown;
0694             BOOST_ASSERT(impl.wr_block.is_locked(this));
0695             BOOST_ASIO_CORO_YIELD
0696             {
0697                 BOOST_ASIO_HANDLER_LOCATION((
0698                     __FILE__, __LINE__,
0699                     "websocket::async_read_some"));
0700 
0701                 async_teardown(impl.role, impl.stream(),
0702                     beast::detail::bind_continuation(std::move(*this)));
0703             }
0704             BOOST_ASSERT(impl.wr_block.is_locked(this));
0705             if(ec == net::error::eof)
0706             {
0707                 // Rationale:
0708                 // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
0709                 ec = {};
0710             }
0711             if(! ec)
0712             {
0713                 BOOST_BEAST_ASSIGN_EC(ec, result_);
0714             }
0715             if(ec && ec != error::closed)
0716                 impl.change_status(status::failed);
0717             else
0718                 impl.change_status(status::closed);
0719             impl.close();
0720 
0721         upcall:
0722             impl.rd_block.try_unlock(this);
0723             impl.op_r_close.maybe_invoke();
0724             if(impl.wr_block.try_unlock(this))
0725                 impl.op_close.maybe_invoke()
0726                     || impl.op_idle_ping.maybe_invoke()
0727                     || impl.op_ping.maybe_invoke()
0728                     || impl.op_wr.maybe_invoke();
0729             this->complete(cont, ec, bytes_written_);
0730         }
0731     }
0732 };
0733 
0734 //------------------------------------------------------------------------------
0735 
0736 template<class NextLayer, bool deflateSupported>
0737 template<class Handler,  class DynamicBuffer>
0738 class stream<NextLayer, deflateSupported>::read_op
0739     : public beast::async_base<
0740         Handler, beast::executor_type<stream>>
0741     , public asio::coroutine
0742 {
0743     boost::weak_ptr<impl_type> wp_;
0744     DynamicBuffer& b_;
0745     std::size_t limit_;
0746     std::size_t bytes_written_ = 0;
0747     bool some_;
0748 
0749 public:
0750     template<class Handler_>
0751     read_op(
0752         Handler_&& h,
0753         boost::shared_ptr<impl_type> const& sp,
0754         DynamicBuffer& b,
0755         std::size_t limit,
0756         bool some)
0757         : async_base<Handler,
0758             beast::executor_type<stream>>(
0759                 std::forward<Handler_>(h),
0760                     sp->stream().get_executor())
0761         , wp_(sp)
0762         , b_(b)
0763         , limit_(limit ? limit : (
0764             std::numeric_limits<std::size_t>::max)())
0765         , some_(some)
0766     {
0767         (*this)({}, 0, false);
0768     }
0769 
0770     void operator()(
0771         error_code ec = {},
0772         std::size_t bytes_transferred = 0,
0773         bool cont = true)
0774     {
0775         using beast::detail::clamp;
0776         auto sp = wp_.lock();
0777         if(! sp)
0778         {
0779             BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
0780             bytes_written_ = 0;
0781             return this->complete(cont, ec, bytes_written_);
0782         }
0783         auto& impl = *sp;
0784         using mutable_buffers_type = typename
0785             DynamicBuffer::mutable_buffers_type;
0786         BOOST_ASIO_CORO_REENTER(*this)
0787         {
0788             do
0789             {
0790                 // VFALCO TODO use boost::beast::bind_continuation
0791                 BOOST_ASIO_CORO_YIELD
0792                 {
0793                     auto mb = beast::detail::dynamic_buffer_prepare(b_,
0794                         clamp(impl.read_size_hint_db(b_), limit_),
0795                             ec, error::buffer_overflow);
0796                     if(impl.check_stop_now(ec))
0797                         goto upcall;
0798 
0799                     BOOST_ASIO_HANDLER_LOCATION((
0800                         __FILE__, __LINE__,
0801                         "websocket::async_read"));
0802 
0803                     read_some_op<read_op, mutable_buffers_type>(
0804                         std::move(*this), sp, *mb);
0805                 }
0806 
0807                 b_.commit(bytes_transferred);
0808                 bytes_written_ += bytes_transferred;
0809                 if(ec)
0810                     goto upcall;
0811             }
0812             while(! some_ && ! impl.rd_done);
0813 
0814         upcall:
0815             this->complete(cont, ec, bytes_written_);
0816         }
0817     }
0818 };
0819 
0820 template<class NextLayer, bool deflateSupported>
0821 struct stream<NextLayer, deflateSupported>::
0822     run_read_some_op
0823 {
0824     boost::shared_ptr<impl_type> const& self;
0825 
0826     using executor_type = typename stream::executor_type;
0827 
0828     executor_type
0829     get_executor() const noexcept
0830     {
0831         return self->stream().get_executor();
0832     }
0833 
0834     template<
0835         class ReadHandler,
0836         class MutableBufferSequence>
0837     void
0838     operator()(
0839         ReadHandler&& h,
0840         MutableBufferSequence const& b)
0841     {
0842         // If you get an error on the following line it means
0843         // that your handler does not meet the documented type
0844         // requirements for the handler.
0845 
0846         static_assert(
0847             beast::detail::is_invocable<ReadHandler,
0848                 void(error_code, std::size_t)>::value,
0849             "ReadHandler type requirements not met");
0850 
0851         read_some_op<
0852             typename std::decay<ReadHandler>::type,
0853             MutableBufferSequence>(
0854                 std::forward<ReadHandler>(h),
0855                 self,
0856                 b);
0857     }
0858 };
0859 
0860 template<class NextLayer, bool deflateSupported>
0861 struct stream<NextLayer, deflateSupported>::
0862     run_read_op
0863 {
0864     boost::shared_ptr<impl_type> const& self;
0865 
0866     using executor_type = typename stream::executor_type;
0867 
0868     executor_type
0869     get_executor() const noexcept
0870     {
0871         return self->stream().get_executor();
0872     }
0873 
0874     template<
0875         class ReadHandler,
0876         class DynamicBuffer>
0877     void
0878     operator()(
0879         ReadHandler&& h,
0880         DynamicBuffer* b,
0881         std::size_t limit,
0882         bool some)
0883     {
0884         // If you get an error on the following line it means
0885         // that your handler does not meet the documented type
0886         // requirements for the handler.
0887 
0888         static_assert(
0889             beast::detail::is_invocable<ReadHandler,
0890                 void(error_code, std::size_t)>::value,
0891             "ReadHandler type requirements not met");
0892 
0893         read_op<
0894             typename std::decay<ReadHandler>::type,
0895             DynamicBuffer>(
0896                 std::forward<ReadHandler>(h),
0897                 self,
0898                 *b,
0899                 limit,
0900                 some);
0901     }
0902 };
0903 
0904 //------------------------------------------------------------------------------
0905 
0906 template<class NextLayer, bool deflateSupported>
0907 template<class DynamicBuffer>
0908 std::size_t
0909 stream<NextLayer, deflateSupported>::
0910 read(DynamicBuffer& buffer)
0911 {
0912     static_assert(is_sync_stream<next_layer_type>::value,
0913         "SyncStream type requirements not met");
0914     static_assert(
0915         net::is_dynamic_buffer<DynamicBuffer>::value,
0916         "DynamicBuffer type requirements not met");
0917     error_code ec;
0918     auto const bytes_written = read(buffer, ec);
0919     if(ec)
0920         BOOST_THROW_EXCEPTION(system_error{ec});
0921     return bytes_written;
0922 }
0923 
0924 template<class NextLayer, bool deflateSupported>
0925 template<class DynamicBuffer>
0926 std::size_t
0927 stream<NextLayer, deflateSupported>::
0928 read(DynamicBuffer& buffer, error_code& ec)
0929 {
0930     static_assert(is_sync_stream<next_layer_type>::value,
0931         "SyncStream type requirements not met");
0932     static_assert(
0933         net::is_dynamic_buffer<DynamicBuffer>::value,
0934         "DynamicBuffer type requirements not met");
0935     std::size_t bytes_written = 0;
0936     do
0937     {
0938         bytes_written += read_some(buffer, 0, ec);
0939         if(ec)
0940             return bytes_written;
0941     }
0942     while(! is_message_done());
0943     return bytes_written;
0944 }
0945 
0946 template<class NextLayer, bool deflateSupported>
0947 template<class DynamicBuffer, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>
0948 BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
0949 stream<NextLayer, deflateSupported>::
0950 async_read(DynamicBuffer& buffer, ReadHandler&& handler)
0951 {
0952     static_assert(is_async_stream<next_layer_type>::value,
0953         "AsyncStream type requirements not met");
0954     static_assert(
0955         net::is_dynamic_buffer<DynamicBuffer>::value,
0956         "DynamicBuffer type requirements not met");
0957     return net::async_initiate<
0958         ReadHandler,
0959         void(error_code, std::size_t)>(
0960             run_read_op{impl_},
0961             handler,
0962             &buffer,
0963             0,
0964             false);
0965 }
0966 
0967 //------------------------------------------------------------------------------
0968 
0969 template<class NextLayer, bool deflateSupported>
0970 template<class DynamicBuffer>
0971 std::size_t
0972 stream<NextLayer, deflateSupported>::
0973 read_some(
0974     DynamicBuffer& buffer,
0975     std::size_t limit)
0976 {
0977     static_assert(is_sync_stream<next_layer_type>::value,
0978         "SyncStream type requirements not met");
0979     static_assert(
0980         net::is_dynamic_buffer<DynamicBuffer>::value,
0981         "DynamicBuffer type requirements not met");
0982     error_code ec;
0983     auto const bytes_written =
0984         read_some(buffer, limit, ec);
0985     if(ec)
0986         BOOST_THROW_EXCEPTION(system_error{ec});
0987     return bytes_written;
0988 }
0989 
0990 template<class NextLayer, bool deflateSupported>
0991 template<class DynamicBuffer>
0992 std::size_t
0993 stream<NextLayer, deflateSupported>::
0994 read_some(
0995     DynamicBuffer& buffer,
0996     std::size_t limit,
0997     error_code& ec)
0998 {
0999     static_assert(is_sync_stream<next_layer_type>::value,
1000         "SyncStream type requirements not met");
1001     static_assert(
1002         net::is_dynamic_buffer<DynamicBuffer>::value,
1003         "DynamicBuffer type requirements not met");
1004     using beast::detail::clamp;
1005     if(! limit)
1006         limit = (std::numeric_limits<std::size_t>::max)();
1007     auto const size =
1008         clamp(impl_->read_size_hint_db(buffer), limit);
1009     BOOST_ASSERT(size > 0);
1010     auto mb = beast::detail::dynamic_buffer_prepare(
1011         buffer, size, ec, error::buffer_overflow);
1012     if(impl_->check_stop_now(ec))
1013         return 0;
1014     auto const bytes_written = read_some(*mb, ec);
1015     buffer.commit(bytes_written);
1016     return bytes_written;
1017 }
1018 
1019 template<class NextLayer, bool deflateSupported>
1020 template<class DynamicBuffer, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>
1021 BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
1022 stream<NextLayer, deflateSupported>::
1023 async_read_some(
1024     DynamicBuffer& buffer,
1025     std::size_t limit,
1026     ReadHandler&& handler)
1027 {
1028     static_assert(is_async_stream<next_layer_type>::value,
1029         "AsyncStream type requirements not met");
1030     static_assert(
1031         net::is_dynamic_buffer<DynamicBuffer>::value,
1032         "DynamicBuffer type requirements not met");
1033     return net::async_initiate<
1034         ReadHandler,
1035         void(error_code, std::size_t)>(
1036             run_read_op{impl_},
1037             handler,
1038             &buffer,
1039             limit,
1040             true);
1041 }
1042 
1043 //------------------------------------------------------------------------------
1044 
1045 template<class NextLayer, bool deflateSupported>
1046 template<class MutableBufferSequence>
1047 std::size_t
1048 stream<NextLayer, deflateSupported>::
1049 read_some(
1050     MutableBufferSequence const& buffers)
1051 {
1052     static_assert(is_sync_stream<next_layer_type>::value,
1053         "SyncStream type requirements not met");
1054     static_assert(net::is_mutable_buffer_sequence<
1055             MutableBufferSequence>::value,
1056         "MutableBufferSequence type requirements not met");
1057     error_code ec;
1058     auto const bytes_written = read_some(buffers, ec);
1059     if(ec)
1060         BOOST_THROW_EXCEPTION(system_error{ec});
1061     return bytes_written;
1062 }
1063 
1064 template<class NextLayer, bool deflateSupported>
1065 template<class MutableBufferSequence>
1066 std::size_t
1067 stream<NextLayer, deflateSupported>::
1068 read_some(
1069     MutableBufferSequence const& buffers,
1070     error_code& ec)
1071 {
1072     static_assert(is_sync_stream<next_layer_type>::value,
1073         "SyncStream type requirements not met");
1074     static_assert(net::is_mutable_buffer_sequence<
1075             MutableBufferSequence>::value,
1076         "MutableBufferSequence type requirements not met");
1077     using beast::detail::clamp;
1078     auto& impl = *impl_;
1079     close_code code{};
1080     std::size_t bytes_written = 0;
1081     ec = {};
1082     // Make sure the stream is open
1083     if(impl.check_stop_now(ec))
1084         return bytes_written;
1085 loop:
1086     // See if we need to read a frame header. This
1087     // condition is structured to give the decompressor
1088     // a chance to emit the final empty deflate block
1089     //
1090     if(impl.rd_remain == 0 && (
1091         ! impl.rd_fh.fin || impl.rd_done))
1092     {
1093         // Read frame header
1094         error_code result;
1095         while(! impl.parse_fh(impl.rd_fh, impl.rd_buf, result))
1096         {
1097             if(result)
1098             {
1099                 // _Fail the WebSocket Connection_
1100                 if(result == error::message_too_big)
1101                     code = close_code::too_big;
1102                 else
1103                     code = close_code::protocol_error;
1104                 do_fail(code, result, ec);
1105                 return bytes_written;
1106             }
1107             auto const bytes_transferred =
1108                 impl.stream().read_some(
1109                     impl.rd_buf.prepare(read_size(
1110                         impl.rd_buf, impl.rd_buf.max_size())),
1111                     ec);
1112             impl.rd_buf.commit(bytes_transferred);
1113             if(impl.check_stop_now(ec))
1114                 return bytes_written;
1115         }
1116         // Immediately apply the mask to the portion
1117         // of the buffer holding payload data.
1118         if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
1119             detail::mask_inplace(buffers_prefix(
1120                 clamp(impl.rd_fh.len), impl.rd_buf.data()),
1121                     impl.rd_key);
1122         if(detail::is_control(impl.rd_fh.op))
1123         {
1124             // Get control frame payload
1125             auto const b = buffers_prefix(
1126                 clamp(impl.rd_fh.len), impl.rd_buf.data());
1127             auto const len = buffer_bytes(b);
1128             BOOST_ASSERT(len == impl.rd_fh.len);
1129 
1130             // Clear this otherwise the next
1131             // frame will be considered final.
1132             impl.rd_fh.fin = false;
1133 
1134             // Handle ping frame
1135             if(impl.rd_fh.op == detail::opcode::ping)
1136             {
1137                 ping_data payload;
1138                 detail::read_ping(payload, b);
1139                 impl.rd_buf.consume(len);
1140                 if(impl.wr_close)
1141                 {
1142                     // Ignore ping when closing
1143                     goto loop;
1144                 }
1145                 if(impl.ctrl_cb)
1146                     impl.ctrl_cb(frame_type::ping, to_string_view(payload));
1147                 detail::frame_buffer fb;
1148                 impl.template write_ping<flat_static_buffer_base>(fb,
1149                     detail::opcode::pong, payload);
1150                 net::write(impl.stream(), fb.data(), ec);
1151                 if(impl.check_stop_now(ec))
1152                     return bytes_written;
1153                 goto loop;
1154             }
1155             // Handle pong frame
1156             if(impl.rd_fh.op == detail::opcode::pong)
1157             {
1158                 ping_data payload;
1159                 detail::read_ping(payload, b);
1160                 impl.rd_buf.consume(len);
1161                 if(impl.ctrl_cb)
1162                     impl.ctrl_cb(frame_type::pong, to_string_view(payload));
1163                 goto loop;
1164             }
1165             // Handle close frame
1166             BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close);
1167             {
1168                 BOOST_ASSERT(! impl.rd_close);
1169                 impl.rd_close = true;
1170                 close_reason cr;
1171                 detail::read_close(cr, b, result);
1172                 if(result)
1173                 {
1174                     // _Fail the WebSocket Connection_
1175                     do_fail(close_code::protocol_error,
1176                         result, ec);
1177                     return bytes_written;
1178                 }
1179                 impl.cr = cr;
1180                 impl.rd_buf.consume(len);
1181                 if(impl.ctrl_cb)
1182                     impl.ctrl_cb(frame_type::close, to_string_view(impl.cr.reason));
1183                 BOOST_ASSERT(! impl.wr_close);
1184                 // _Start the WebSocket Closing Handshake_
1185                 do_fail(
1186                     cr.code == close_code::none ?
1187                         close_code::normal :
1188                         static_cast<close_code>(cr.code),
1189                     error::closed, ec);
1190                 return bytes_written;
1191             }
1192         }
1193         if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin)
1194         {
1195             // Empty non-final frame
1196             goto loop;
1197         }
1198         impl.rd_done = false;
1199     }
1200     else
1201     {
1202         ec = {};
1203     }
1204     if(! impl.rd_deflated())
1205     {
1206         if(impl.rd_remain > 0)
1207         {
1208             if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() >
1209                 (std::min)(clamp(impl.rd_remain),
1210                     buffer_bytes(buffers)))
1211             {
1212                 // Fill the read buffer first, otherwise we
1213                 // get fewer bytes at the cost of one I/O.
1214                 impl.rd_buf.commit(impl.stream().read_some(
1215                     impl.rd_buf.prepare(read_size(impl.rd_buf,
1216                         impl.rd_buf.max_size())), ec));
1217                 if(impl.check_stop_now(ec))
1218                     return bytes_written;
1219                 if(impl.rd_fh.mask)
1220                     detail::mask_inplace(
1221                         buffers_prefix(clamp(impl.rd_remain),
1222                             impl.rd_buf.data()), impl.rd_key);
1223             }
1224             if(impl.rd_buf.size() > 0)
1225             {
1226                 // Copy from the read buffer.
1227                 // The mask was already applied.
1228                 auto const bytes_transferred = net::buffer_copy(
1229                     buffers, impl.rd_buf.data(),
1230                         clamp(impl.rd_remain));
1231                 auto const mb = buffers_prefix(
1232                     bytes_transferred, buffers);
1233                 impl.rd_remain -= bytes_transferred;
1234                 if(impl.rd_op == detail::opcode::text)
1235                 {
1236                     if(! impl.rd_utf8.write(mb) ||
1237                         (impl.rd_remain == 0 && impl.rd_fh.fin &&
1238                             ! impl.rd_utf8.finish()))
1239                     {
1240                         // _Fail the WebSocket Connection_
1241                         do_fail(close_code::bad_payload,
1242                             error::bad_frame_payload, ec);
1243                         return bytes_written;
1244                     }
1245                 }
1246                 bytes_written += bytes_transferred;
1247                 impl.rd_size += bytes_transferred;
1248                 impl.rd_buf.consume(bytes_transferred);
1249             }
1250             else
1251             {
1252                 // Read into caller's buffer
1253                 BOOST_ASSERT(impl.rd_remain > 0);
1254                 BOOST_ASSERT(buffer_bytes(buffers) > 0);
1255                 BOOST_ASSERT(buffer_bytes(buffers_prefix(
1256                     clamp(impl.rd_remain), buffers)) > 0);
1257                 auto const bytes_transferred =
1258                     impl.stream().read_some(buffers_prefix(
1259                         clamp(impl.rd_remain), buffers), ec);
1260                 // VFALCO What if some bytes were written?
1261                 if(impl.check_stop_now(ec))
1262                     return bytes_written;
1263                 BOOST_ASSERT(bytes_transferred > 0);
1264                 auto const mb = buffers_prefix(
1265                     bytes_transferred, buffers);
1266                 impl.rd_remain -= bytes_transferred;
1267                 if(impl.rd_fh.mask)
1268                     detail::mask_inplace(mb, impl.rd_key);
1269                 if(impl.rd_op == detail::opcode::text)
1270                 {
1271                     if(! impl.rd_utf8.write(mb) ||
1272                         (impl.rd_remain == 0 && impl.rd_fh.fin &&
1273                             ! impl.rd_utf8.finish()))
1274                     {
1275                         // _Fail the WebSocket Connection_
1276                         do_fail(close_code::bad_payload,
1277                             error::bad_frame_payload, ec);
1278                         return bytes_written;
1279                     }
1280                 }
1281                 bytes_written += bytes_transferred;
1282                 impl.rd_size += bytes_transferred;
1283             }
1284         }
1285         BOOST_ASSERT( ! impl.rd_done );
1286         if( impl.rd_remain == 0 && impl.rd_fh.fin )
1287             impl.rd_done = true;
1288     }
1289     else
1290     {
1291         // Read compressed message frame payload:
1292         // inflate even if rd_fh_.len == 0, otherwise we
1293         // never emit the end-of-stream deflate block.
1294         //
1295         bool did_read = false;
1296         buffers_suffix<MutableBufferSequence> cb(buffers);
1297         while(buffer_bytes(cb) > 0)
1298         {
1299             zlib::z_params zs;
1300             {
1301                 auto const out = beast::buffers_front(cb);
1302                 zs.next_out = out.data();
1303                 zs.avail_out = out.size();
1304                 BOOST_ASSERT(zs.avail_out > 0);
1305             }
1306             // boolean to track the end of the message.
1307             bool fin = false;
1308             if(impl.rd_remain > 0)
1309             {
1310                 if(impl.rd_buf.size() > 0)
1311                 {
1312                     // use what's there
1313                     auto const in = buffers_prefix(
1314                         clamp(impl.rd_remain), beast::buffers_front(
1315                             impl.rd_buf.data()));
1316                     zs.avail_in = in.size();
1317                     zs.next_in = in.data();
1318                 }
1319                 else if(! did_read)
1320                 {
1321                     // read new
1322                     auto const bytes_transferred =
1323                         impl.stream().read_some(
1324                             impl.rd_buf.prepare(read_size(
1325                                 impl.rd_buf, impl.rd_buf.max_size())),
1326                             ec);
1327                     if(impl.check_stop_now(ec))
1328                         return bytes_written;
1329                     BOOST_ASSERT(bytes_transferred > 0);
1330                     impl.rd_buf.commit(bytes_transferred);
1331                     if(impl.rd_fh.mask)
1332                         detail::mask_inplace(
1333                             buffers_prefix(clamp(impl.rd_remain),
1334                                 impl.rd_buf.data()), impl.rd_key);
1335                     auto const in = buffers_prefix(
1336                         clamp(impl.rd_remain), buffers_front(
1337                             impl.rd_buf.data()));
1338                     zs.avail_in = in.size();
1339                     zs.next_in = in.data();
1340                     did_read = true;
1341                 }
1342                 else
1343                 {
1344                     break;
1345                 }
1346             }
1347             else if(impl.rd_fh.fin)
1348             {
1349                 // append the empty block codes
1350                 static std::uint8_t constexpr
1351                     empty_block[4] = { 0x00, 0x00, 0xff, 0xff };
1352                 zs.next_in = empty_block;
1353                 zs.avail_in = sizeof(empty_block);
1354                 fin = true;
1355             }
1356             else
1357             {
1358                 break;
1359             }
1360             impl.inflate(zs, zlib::Flush::sync, ec);
1361             if(impl.check_stop_now(ec))
1362                 return bytes_written;
1363             if (fin && zs.total_out == 0) {
1364                 impl.do_context_takeover_read(impl.role);
1365                 impl.rd_done = true;
1366                 break;
1367             }
1368             if(impl.rd_msg_max && beast::detail::sum_exceeds(
1369                 impl.rd_size, zs.total_out, impl.rd_msg_max))
1370             {
1371                 do_fail(close_code::too_big,
1372                     error::message_too_big, ec);
1373                 return bytes_written;
1374             }
1375             cb.consume(zs.total_out);
1376             impl.rd_size += zs.total_out;
1377             if (! fin) {
1378                 impl.rd_remain -= zs.total_in;
1379                 impl.rd_buf.consume(zs.total_in);
1380             }
1381             bytes_written += zs.total_out;
1382         }
1383         if(impl.rd_op == detail::opcode::text)
1384         {
1385             // check utf8
1386             if(! impl.rd_utf8.write(beast::buffers_prefix(
1387                 bytes_written, buffers)) || (
1388                     impl.rd_done && ! impl.rd_utf8.finish()))
1389             {
1390                 // _Fail the WebSocket Connection_
1391                 do_fail(close_code::bad_payload,
1392                     error::bad_frame_payload, ec);
1393                 return bytes_written;
1394             }
1395         }
1396     }
1397     return bytes_written;
1398 }
1399 
1400 template<class NextLayer, bool deflateSupported>
1401 template<class MutableBufferSequence, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>
1402 BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
1403 stream<NextLayer, deflateSupported>::
1404 async_read_some(
1405     MutableBufferSequence const& buffers,
1406     ReadHandler&& handler)
1407 {
1408     static_assert(is_async_stream<next_layer_type>::value,
1409         "AsyncStream type requirements not met");
1410     static_assert(net::is_mutable_buffer_sequence<
1411             MutableBufferSequence>::value,
1412         "MutableBufferSequence type requirements not met");
1413     return net::async_initiate<
1414         ReadHandler,
1415         void(error_code, std::size_t)>(
1416             run_read_some_op{impl_},
1417             handler,
1418             buffers);
1419 }
1420 
1421 } // websocket
1422 } // beast
1423 } // boost
1424 
1425 #endif