Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 09:29:34

0001 //
0002 // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
0003 //
0004 // Distributed under the Boost Software License, Version 1.0. (See accompanying
0005 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 // Official repository: https://github.com/boostorg/beast
0008 //
0009 
0010 #ifndef BOOST_BEAST_WEBSOCKET_IMPL_READ_HPP
0011 #define BOOST_BEAST_WEBSOCKET_IMPL_READ_HPP
0012 
0013 #include <boost/beast/core/buffer_traits.hpp>
0014 #include <boost/beast/websocket/teardown.hpp>
0015 #include <boost/beast/websocket/detail/mask.hpp>
0016 #include <boost/beast/websocket/impl/stream_impl.hpp>
0017 #include <boost/beast/core/async_base.hpp>
0018 #include <boost/beast/core/bind_handler.hpp>
0019 #include <boost/beast/core/buffers_prefix.hpp>
0020 #include <boost/beast/core/buffers_suffix.hpp>
0021 #include <boost/beast/core/flat_static_buffer.hpp>
0022 #include <boost/beast/core/read_size.hpp>
0023 #include <boost/beast/core/stream_traits.hpp>
0024 #include <boost/beast/core/detail/bind_continuation.hpp>
0025 #include <boost/beast/core/detail/buffer.hpp>
0026 #include <boost/beast/core/detail/clamp.hpp>
0027 #include <boost/beast/core/detail/config.hpp>
0028 #include <boost/asio/coroutine.hpp>
0029 #include <boost/assert.hpp>
0030 #include <boost/config.hpp>
0031 #include <boost/optional.hpp>
0032 #include <boost/throw_exception.hpp>
0033 #include <algorithm>
0034 #include <limits>
0035 #include <memory>
0036 
0037 namespace boost {
0038 namespace beast {
0039 namespace websocket {
0040 
0041 /*  Read some message data into a buffer sequence.
0042 
0043     Also reads and handles control frames.
0044 */
0045 template<class NextLayer, bool deflateSupported>
0046 template<class Handler, class MutableBufferSequence>
0047 class stream<NextLayer, deflateSupported>::read_some_op
0048     : public beast::async_base<
0049         Handler, beast::executor_type<stream>>
0050     , public asio::coroutine
0051 {
0052     boost::weak_ptr<impl_type> wp_;
0053     MutableBufferSequence bs_;
0054     buffers_suffix<MutableBufferSequence> cb_;
0055     std::size_t bytes_written_ = 0;
0056     error_code result_;
0057     close_code code_;
0058     bool did_read_ = false;
0059 
0060 public:
0061     static constexpr int id = 1; // for soft_mutex
0062 
0063     template<class Handler_>
0064     read_some_op(
0065         Handler_&& h,
0066         boost::shared_ptr<impl_type> const& sp,
0067         MutableBufferSequence const& bs)
0068         : async_base<
0069             Handler, beast::executor_type<stream>>(
0070                 std::forward<Handler_>(h),
0071                     sp->stream().get_executor())
0072         , wp_(sp)
0073         , bs_(bs)
0074         , cb_(bs)
0075         , code_(close_code::none)
0076     {
0077         (*this)({}, 0, false);
0078     }
0079 
0080     void operator()(
0081         error_code ec = {},
0082         std::size_t bytes_transferred = 0,
0083         bool cont = true)
0084     {
0085         using beast::detail::clamp;
0086         auto sp = wp_.lock();
0087         if(! sp)
0088         {
0089             BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
0090             bytes_written_ = 0;
0091             return this->complete(cont, ec, bytes_written_);
0092         }
0093         auto& impl = *sp;
0094         BOOST_ASIO_CORO_REENTER(*this)
0095         {
0096             impl.update_timer(this->get_executor());
0097 
0098         acquire_read_lock:
0099             // Acquire the read lock
0100             if(! impl.rd_block.try_lock(this))
0101             {
0102             do_suspend:
0103                 BOOST_ASIO_CORO_YIELD
0104                 {
0105                     BOOST_ASIO_HANDLER_LOCATION((
0106                         __FILE__, __LINE__,
0107                         "websocket::async_read_some"));
0108 
0109                     this->set_allowed_cancellation(net::cancellation_type::all);
0110                     impl.op_r_rd.emplace(std::move(*this), net::cancellation_type::all);
0111                 }
0112                 if (ec)
0113                     return this->complete(cont, ec, bytes_written_);
0114 
0115                 this->set_allowed_cancellation(net::cancellation_type::terminal);
0116 
0117                 impl.rd_block.lock(this);
0118                 BOOST_ASIO_CORO_YIELD
0119                 {
0120                     BOOST_ASIO_HANDLER_LOCATION((
0121                         __FILE__, __LINE__,
0122                         "websocket::async_read_some"));
0123 
0124                     const auto ex = this->get_immediate_executor();
0125                     net::dispatch(ex, std::move(*this));
0126                 }
0127                 BOOST_ASSERT(impl.rd_block.is_locked(this));
0128 
0129                 BOOST_ASSERT(!ec);
0130                 if(impl.check_stop_now(ec))
0131                 {
0132                     // Issue 2264 - There is no guarantee that the next
0133                     // error will be operation_aborted.
0134                     // The error could be a result of the peer resetting the 
0135                     // connection
0136                     // BOOST_ASSERT(ec == net::error::operation_aborted);
0137                     goto upcall;
0138                 }
0139                 // VFALCO Should never get here
0140 
0141                 // The only way to get read blocked is if
0142                 // a `close_op` wrote a close frame
0143                 BOOST_ASSERT(impl.wr_close);
0144                 BOOST_ASSERT(impl.status_ != status::open);
0145                 BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
0146                 goto upcall;
0147             }
0148             else
0149             {
0150                 // Make sure the stream is not closed
0151                 if( impl.status_ == status::closed ||
0152                     impl.status_ == status::failed)
0153                 {
0154                     BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
0155                     goto upcall;
0156                 }
0157             }
0158 
0159             // if status_ == status::closing, we want to suspend
0160             // the read operation until the close completes,
0161             // then finish the read with operation_aborted.
0162 
0163         loop:
0164             BOOST_ASSERT(impl.rd_block.is_locked(this));
0165             // See if we need to read a frame header. This
0166             // condition is structured to give the decompressor
0167             // a chance to emit the final empty deflate block
0168             //
0169             if(impl.rd_remain == 0 &&
0170                 (! impl.rd_fh.fin || impl.rd_done))
0171             {
0172                 // Read frame header
0173                 while(! impl.parse_fh(
0174                     impl.rd_fh, impl.rd_buf, result_))
0175                 {
0176                     if(result_)
0177                     {
0178                         // _Fail the WebSocket Connection_
0179                         if(result_ == error::message_too_big)
0180                             code_ = close_code::too_big;
0181                         else
0182                             code_ = close_code::protocol_error;
0183                         goto close;
0184                     }
0185                     BOOST_ASSERT(impl.rd_block.is_locked(this));
0186                     BOOST_ASIO_CORO_YIELD
0187                     {
0188                         BOOST_ASIO_HANDLER_LOCATION((
0189                             __FILE__, __LINE__,
0190                             "websocket::async_read_some"));
0191 
0192                         impl.stream().async_read_some(
0193                             impl.rd_buf.prepare(read_size(
0194                                 impl.rd_buf, impl.rd_buf.max_size())),
0195                                     std::move(*this));
0196                     }
0197                     BOOST_ASSERT(impl.rd_block.is_locked(this));
0198                     impl.rd_buf.commit(bytes_transferred);
0199                     if(impl.check_stop_now(ec))
0200                         goto upcall;
0201                     impl.reset_idle();
0202 
0203                     // Allow a close operation
0204                     // to acquire the read block
0205                     impl.rd_block.unlock(this);
0206                     if( impl.op_r_close.maybe_invoke())
0207                     {
0208                         // Suspend
0209                         BOOST_ASSERT(impl.rd_block.is_locked());
0210                         goto do_suspend;
0211                     }
0212                     // Acquire read block
0213                     impl.rd_block.lock(this);
0214                 }
0215                 // Immediately apply the mask to the portion
0216                 // of the buffer holding payload data.
0217                 if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
0218                     detail::mask_inplace(buffers_prefix(
0219                         clamp(impl.rd_fh.len),
0220                             impl.rd_buf.data()),
0221                                 impl.rd_key);
0222                 if(detail::is_control(impl.rd_fh.op))
0223                 {
0224                     // Clear this otherwise the next
0225                     // frame will be considered final.
0226                     impl.rd_fh.fin = false;
0227 
0228                     // Handle ping frame
0229                     if(impl.rd_fh.op == detail::opcode::ping)
0230                     {
0231                         if(impl.ctrl_cb)
0232                         {
0233                             if(! cont)
0234                             {
0235                                 BOOST_ASIO_CORO_YIELD
0236                                 {
0237                                     BOOST_ASIO_HANDLER_LOCATION((
0238                                         __FILE__, __LINE__,
0239                                         "websocket::async_read_some"));
0240 
0241                                     const auto ex = this->get_immediate_executor();
0242                                     net::dispatch(ex, std::move(*this));
0243                                 }
0244                                 BOOST_ASSERT(cont);
0245                                 // VFALCO call check_stop_now() here?
0246                             }
0247                         }
0248                         {
0249                             auto const b = buffers_prefix(
0250                                 clamp(impl.rd_fh.len),
0251                                     impl.rd_buf.data());
0252                             auto const len = buffer_bytes(b);
0253                             BOOST_ASSERT(len == impl.rd_fh.len);
0254                             ping_data payload;
0255                             detail::read_ping(payload, b);
0256                             impl.rd_buf.consume(len);
0257                             // Ignore ping when closing
0258                             if(impl.status_ == status::closing)
0259                                 goto loop;
0260                             if(impl.ctrl_cb)
0261                                 impl.ctrl_cb(
0262                                     frame_type::ping, to_string_view(payload));
0263                             impl.rd_fb.clear();
0264                             impl.template write_ping<
0265                                 flat_static_buffer_base>(impl.rd_fb,
0266                                     detail::opcode::pong, payload);
0267                         }
0268 
0269                         // Allow a close operation
0270                         // to acquire the read block
0271                         impl.rd_block.unlock(this);
0272                         impl.op_r_close.maybe_invoke();
0273 
0274                         // Acquire the write lock
0275                         if(! impl.wr_block.try_lock(this))
0276                         {
0277                             BOOST_ASIO_CORO_YIELD
0278                             {
0279                                 BOOST_ASIO_HANDLER_LOCATION((
0280                                     __FILE__, __LINE__,
0281                                     "websocket::async_read_some"));
0282 
0283                                 impl.op_rd.emplace(std::move(*this));
0284                             }
0285                             if (ec)
0286                                 return this->complete(cont, ec, bytes_written_);
0287 
0288                             impl.wr_block.lock(this);
0289                             BOOST_ASIO_CORO_YIELD
0290                             {
0291                                 BOOST_ASIO_HANDLER_LOCATION((
0292                                     __FILE__, __LINE__,
0293                                     "websocket::async_read_some"));
0294 
0295                                 const auto ex = this->get_immediate_executor();
0296                                 net::dispatch(ex, std::move(*this));
0297                             }
0298                             BOOST_ASSERT(impl.wr_block.is_locked(this));
0299                             if(impl.check_stop_now(ec))
0300                                 goto upcall;
0301                         }
0302 
0303                         // Send pong
0304                         BOOST_ASSERT(impl.wr_block.is_locked(this));
0305                         BOOST_ASIO_CORO_YIELD
0306                         {
0307                             BOOST_ASIO_HANDLER_LOCATION((
0308                                 __FILE__, __LINE__,
0309                                 "websocket::async_read_some"));
0310 
0311                             net::async_write(
0312                                 impl.stream(), net::const_buffer(impl.rd_fb.data()),
0313                                 beast::detail::bind_continuation(std::move(*this)));
0314                         }
0315                         BOOST_ASSERT(impl.wr_block.is_locked(this));
0316                         if(impl.check_stop_now(ec))
0317                             goto upcall;
0318                         impl.wr_block.unlock(this);
0319                         impl.op_close.maybe_invoke()
0320                             || impl.op_idle_ping.maybe_invoke()
0321                             || impl.op_ping.maybe_invoke()
0322                             || impl.op_wr.maybe_invoke();
0323                         goto acquire_read_lock;
0324                     }
0325 
0326                     // Handle pong frame
0327                     if(impl.rd_fh.op == detail::opcode::pong)
0328                     {
0329                         // Ignore pong when closing
0330                         if(! impl.wr_close && impl.ctrl_cb)
0331                         {
0332                             if(! cont)
0333                             {
0334                                 BOOST_ASIO_CORO_YIELD
0335                                 {
0336                                     BOOST_ASIO_HANDLER_LOCATION((
0337                                         __FILE__, __LINE__,
0338                                         "websocket::async_read_some"));
0339 
0340                                     const auto ex = this->get_immediate_executor();
0341                                     net::dispatch(ex, std::move(*this));
0342                                 }
0343                                 BOOST_ASSERT(cont);
0344                             }
0345                         }
0346                         auto const cb = buffers_prefix(clamp(
0347                             impl.rd_fh.len), impl.rd_buf.data());
0348                         auto const len = buffer_bytes(cb);
0349                         BOOST_ASSERT(len == impl.rd_fh.len);
0350                         ping_data payload;
0351                         detail::read_ping(payload, cb);
0352                         impl.rd_buf.consume(len);
0353                         // Ignore pong when closing
0354                         if(! impl.wr_close && impl.ctrl_cb)
0355                             impl.ctrl_cb(frame_type::pong, to_string_view(payload));
0356                         goto loop;
0357                     }
0358 
0359                     // Handle close frame
0360                     BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close);
0361                     {
0362                         if(impl.ctrl_cb)
0363                         {
0364                             if(! cont)
0365                             {
0366                                 BOOST_ASIO_CORO_YIELD
0367                                 {
0368                                     BOOST_ASIO_HANDLER_LOCATION((
0369                                         __FILE__, __LINE__,
0370                                         "websocket::async_read_some"));
0371 
0372                                     const auto ex = this->get_immediate_executor();
0373                                     net::dispatch(ex, std::move(*this));
0374                                 }
0375                                 BOOST_ASSERT(cont);
0376                             }
0377                         }
0378                         auto const cb = buffers_prefix(clamp(
0379                             impl.rd_fh.len), impl.rd_buf.data());
0380                         auto const len = buffer_bytes(cb);
0381                         BOOST_ASSERT(len == impl.rd_fh.len);
0382                         BOOST_ASSERT(! impl.rd_close);
0383                         impl.rd_close = true;
0384                         close_reason cr;
0385                         detail::read_close(cr, cb, result_);
0386                         if(result_)
0387                         {
0388                             // _Fail the WebSocket Connection_
0389                             code_ = close_code::protocol_error;
0390                             goto close;
0391                         }
0392                         impl.cr = cr;
0393                         impl.rd_buf.consume(len);
0394                         if(impl.ctrl_cb)
0395                             impl.ctrl_cb(frame_type::close,
0396                                 to_string_view(impl.cr.reason));
0397                         // See if we are already closing
0398                         if(impl.status_ == status::closing)
0399                         {
0400                             // _Close the WebSocket Connection_
0401                             BOOST_ASSERT(impl.wr_close);
0402                             code_ = close_code::none;
0403                             result_ = error::closed;
0404                             goto close;
0405                         }
0406                         // _Start the WebSocket Closing Handshake_
0407                         code_ = cr.code == close_code::none ?
0408                             close_code::normal :
0409                             static_cast<close_code>(cr.code);
0410                         result_ = error::closed;
0411                         goto close;
0412                     }
0413                 }
0414                 if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin)
0415                 {
0416                     // Empty non-final frame
0417                     goto loop;
0418                 }
0419                 impl.rd_done = false;
0420             }
0421             if(! impl.rd_deflated())
0422             {
0423                 if(impl.rd_remain > 0)
0424                 {
0425                     if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() >
0426                         (std::min)(clamp(impl.rd_remain),
0427                             buffer_bytes(cb_)))
0428                     {
0429                         // Fill the read buffer first, otherwise we
0430                         // get fewer bytes at the cost of one I/O.
0431                         BOOST_ASIO_CORO_YIELD
0432                         {
0433                             BOOST_ASIO_HANDLER_LOCATION((
0434                                 __FILE__, __LINE__,
0435                                 "websocket::async_read_some"));
0436 
0437                             impl.stream().async_read_some(
0438                                 impl.rd_buf.prepare(read_size(
0439                                     impl.rd_buf, impl.rd_buf.max_size())),
0440                                         std::move(*this));
0441                         }
0442                         impl.rd_buf.commit(bytes_transferred);
0443                         if(impl.check_stop_now(ec))
0444                             goto upcall;
0445                         impl.reset_idle();
0446                         if(impl.rd_fh.mask)
0447                             detail::mask_inplace(buffers_prefix(clamp(
0448                                 impl.rd_remain), impl.rd_buf.data()),
0449                                     impl.rd_key);
0450                     }
0451                     if(impl.rd_buf.size() > 0)
0452                     {
0453                         // Copy from the read buffer.
0454                         // The mask was already applied.
0455                         bytes_transferred = net::buffer_copy(cb_,
0456                             impl.rd_buf.data(), clamp(impl.rd_remain));
0457                         auto const mb = buffers_prefix(
0458                             bytes_transferred, cb_);
0459                         impl.rd_remain -= bytes_transferred;
0460                         if(impl.rd_op == detail::opcode::text)
0461                         {
0462                             if(! impl.rd_utf8.write(mb) ||
0463                                 (impl.rd_remain == 0 && impl.rd_fh.fin &&
0464                                     ! impl.rd_utf8.finish()))
0465                             {
0466                                 // _Fail the WebSocket Connection_
0467                                 code_ = close_code::bad_payload;
0468                                 result_ = error::bad_frame_payload;
0469                                 goto close;
0470                             }
0471                         }
0472                         bytes_written_ += bytes_transferred;
0473                         impl.rd_size += bytes_transferred;
0474                         impl.rd_buf.consume(bytes_transferred);
0475                     }
0476                     else
0477                     {
0478                         // Read into caller's buffer
0479                         BOOST_ASSERT(impl.rd_remain > 0);
0480                         BOOST_ASSERT(buffer_bytes(cb_) > 0);
0481                         BOOST_ASSERT(buffer_bytes(buffers_prefix(
0482                             clamp(impl.rd_remain), cb_)) > 0);
0483                         BOOST_ASIO_CORO_YIELD
0484                         {
0485                             BOOST_ASIO_HANDLER_LOCATION((
0486                                 __FILE__, __LINE__,
0487                                 "websocket::async_read_some"));
0488 
0489                             impl.stream().async_read_some(buffers_prefix(
0490                                 clamp(impl.rd_remain), cb_), std::move(*this));
0491                         }
0492                         if(impl.check_stop_now(ec))
0493                             goto upcall;
0494                         impl.reset_idle();
0495                         BOOST_ASSERT(bytes_transferred > 0);
0496                         auto const mb = buffers_prefix(
0497                             bytes_transferred, cb_);
0498                         impl.rd_remain -= bytes_transferred;
0499                         if(impl.rd_fh.mask)
0500                             detail::mask_inplace(mb, impl.rd_key);
0501                         if(impl.rd_op == detail::opcode::text)
0502                         {
0503                             if(! impl.rd_utf8.write(mb) ||
0504                                 (impl.rd_remain == 0 && impl.rd_fh.fin &&
0505                                     ! impl.rd_utf8.finish()))
0506                             {
0507                                 // _Fail the WebSocket Connection_
0508                                 code_ = close_code::bad_payload;
0509                                 result_ = error::bad_frame_payload;
0510                                 goto close;
0511                             }
0512                         }
0513                         bytes_written_ += bytes_transferred;
0514                         impl.rd_size += bytes_transferred;
0515                     }
0516                 }
0517                 BOOST_ASSERT( ! impl.rd_done );
0518                 if( impl.rd_remain == 0 && impl.rd_fh.fin )
0519                     impl.rd_done = true;
0520             }
0521             else
0522             {
0523                 // Read compressed message frame payload:
0524                 // inflate even if rd_fh_.len == 0, otherwise we
0525                 // never emit the end-of-stream deflate block.
0526                 while(buffer_bytes(cb_) > 0)
0527                 {
0528                     if( impl.rd_remain > 0 &&
0529                         impl.rd_buf.size() == 0 &&
0530                         ! did_read_)
0531                     {
0532                         // read new
0533                         BOOST_ASIO_CORO_YIELD
0534                         {
0535                             BOOST_ASIO_HANDLER_LOCATION((
0536                                 __FILE__, __LINE__,
0537                                 "websocket::async_read_some"));
0538 
0539                             impl.stream().async_read_some(
0540                                 impl.rd_buf.prepare(read_size(
0541                                     impl.rd_buf, impl.rd_buf.max_size())),
0542                                         std::move(*this));
0543                         }
0544                         if(impl.check_stop_now(ec))
0545                             goto upcall;
0546                         impl.reset_idle();
0547                         BOOST_ASSERT(bytes_transferred > 0);
0548                         impl.rd_buf.commit(bytes_transferred);
0549                         if(impl.rd_fh.mask)
0550                             detail::mask_inplace(
0551                                 buffers_prefix(clamp(impl.rd_remain),
0552                                     impl.rd_buf.data()), impl.rd_key);
0553                         did_read_ = true;
0554                     }
0555                     zlib::z_params zs;
0556                     {
0557                         auto const out = buffers_front(cb_);
0558                         zs.next_out = out.data();
0559                         zs.avail_out = out.size();
0560                         BOOST_ASSERT(zs.avail_out > 0);
0561                     }
0562                     // boolean to track the end of the message.
0563                     bool fin = false;
0564                     if(impl.rd_remain > 0)
0565                     {
0566                         if(impl.rd_buf.size() > 0)
0567                         {
0568                             // use what's there
0569                             auto const in = buffers_prefix(
0570                                 clamp(impl.rd_remain), buffers_front(
0571                                     impl.rd_buf.data()));
0572                             zs.avail_in = in.size();
0573                             zs.next_in = in.data();
0574                         }
0575                         else
0576                         {
0577                             break;
0578                         }
0579                     }
0580                     else if(impl.rd_fh.fin)
0581                     {
0582                         // append the empty block codes
0583                         static std::uint8_t constexpr
0584                             empty_block[4] = { 0x00, 0x00, 0xff, 0xff };
0585                         zs.next_in = empty_block;
0586                         zs.avail_in = sizeof(empty_block);
0587                         fin = true;
0588                     }
0589                     else
0590                     {
0591                         break;
0592                     }
0593                     impl.inflate(zs, zlib::Flush::sync, ec);
0594                     if(impl.check_stop_now(ec))
0595                         goto upcall;
0596                     if(fin && zs.total_out == 0) {
0597                         impl.do_context_takeover_read(impl.role);
0598                         impl.rd_done = true;
0599                         break;
0600                     }
0601                     if(impl.rd_msg_max && beast::detail::sum_exceeds(
0602                         impl.rd_size, zs.total_out, impl.rd_msg_max))
0603                     {
0604                         // _Fail the WebSocket Connection_
0605                         code_ = close_code::too_big;
0606                         result_ = error::message_too_big;
0607                         goto close;
0608                     }
0609                     cb_.consume(zs.total_out);
0610                     impl.rd_size += zs.total_out;
0611                     if (! fin) {
0612                         impl.rd_remain -= zs.total_in;
0613                         impl.rd_buf.consume(zs.total_in);
0614                     }
0615                     bytes_written_ += zs.total_out;
0616                 }
0617                 if(impl.rd_op == detail::opcode::text)
0618                 {
0619                     // check utf8
0620                     if(! impl.rd_utf8.write(
0621                         buffers_prefix(bytes_written_, bs_)) || (
0622                             impl.rd_done && ! impl.rd_utf8.finish()))
0623                     {
0624                         // _Fail the WebSocket Connection_
0625                         code_ = close_code::bad_payload;
0626                         result_ = error::bad_frame_payload;
0627                         goto close;
0628                     }
0629                 }
0630             }
0631             goto upcall;
0632 
0633         close:
0634             // Acquire the write lock
0635             if(! impl.wr_block.try_lock(this))
0636             {
0637                 BOOST_ASIO_CORO_YIELD
0638                 {
0639                     BOOST_ASIO_HANDLER_LOCATION((
0640                         __FILE__, __LINE__,
0641                         "websocket::async_read_some"));
0642 
0643                     impl.op_rd.emplace(std::move(*this));
0644                 }
0645                 if (ec)
0646                     return this->complete(cont, ec, bytes_written_);
0647 
0648                 impl.wr_block.lock(this);
0649                 BOOST_ASIO_CORO_YIELD
0650                 {
0651                     BOOST_ASIO_HANDLER_LOCATION((
0652                         __FILE__, __LINE__,
0653                         "websocket::async_read_some"));
0654 
0655                     const auto ex = this->get_immediate_executor();
0656                     net::dispatch(ex, std::move(*this));
0657                 }
0658                 BOOST_ASSERT(impl.wr_block.is_locked(this));
0659                 if(impl.check_stop_now(ec))
0660                     goto upcall;
0661             }
0662 
0663             impl.change_status(status::closing);
0664 
0665             if(! impl.wr_close)
0666             {
0667                 impl.wr_close = true;
0668 
0669                 // Serialize close frame
0670                 impl.rd_fb.clear();
0671                 impl.template write_close<
0672                     flat_static_buffer_base>(
0673                         impl.rd_fb, code_);
0674 
0675                 // Send close frame
0676                 BOOST_ASSERT(impl.wr_block.is_locked(this));
0677                 BOOST_ASIO_CORO_YIELD
0678                 {
0679                     BOOST_ASIO_HANDLER_LOCATION((
0680                         __FILE__, __LINE__,
0681                         "websocket::async_read_some"));
0682 
0683                     net::async_write(impl.stream(), net::const_buffer(impl.rd_fb.data()),
0684                         beast::detail::bind_continuation(std::move(*this)));
0685                 }
0686                 BOOST_ASSERT(impl.wr_block.is_locked(this));
0687                 if(impl.check_stop_now(ec))
0688                     goto upcall;
0689             }
0690 
0691             // Teardown
0692             using beast::websocket::async_teardown;
0693             BOOST_ASSERT(impl.wr_block.is_locked(this));
0694             BOOST_ASIO_CORO_YIELD
0695             {
0696                 BOOST_ASIO_HANDLER_LOCATION((
0697                     __FILE__, __LINE__,
0698                     "websocket::async_read_some"));
0699 
0700                 async_teardown(impl.role, impl.stream(),
0701                     beast::detail::bind_continuation(std::move(*this)));
0702             }
0703             BOOST_ASSERT(impl.wr_block.is_locked(this));
0704             if(ec == net::error::eof)
0705             {
0706                 // Rationale:
0707                 // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
0708                 ec = {};
0709             }
0710             if(! ec)
0711             {
0712                 BOOST_BEAST_ASSIGN_EC(ec, result_);
0713             }
0714             if(ec && ec != error::closed)
0715                 impl.change_status(status::failed);
0716             else
0717                 impl.change_status(status::closed);
0718             impl.close();
0719 
0720         upcall:
0721             impl.rd_block.try_unlock(this);
0722             impl.op_r_close.maybe_invoke();
0723             if(impl.wr_block.try_unlock(this))
0724                 impl.op_close.maybe_invoke()
0725                     || impl.op_idle_ping.maybe_invoke()
0726                     || impl.op_ping.maybe_invoke()
0727                     || impl.op_wr.maybe_invoke();
0728             this->complete(cont, ec, bytes_written_);
0729         }
0730     }
0731 };
0732 
0733 //------------------------------------------------------------------------------
0734 
0735 template<class NextLayer, bool deflateSupported>
0736 template<class Handler,  class DynamicBuffer>
0737 class stream<NextLayer, deflateSupported>::read_op
0738     : public beast::async_base<
0739         Handler, beast::executor_type<stream>>
0740     , public asio::coroutine
0741 {
0742     boost::weak_ptr<impl_type> wp_;
0743     DynamicBuffer& b_;
0744     std::size_t limit_;
0745     std::size_t bytes_written_ = 0;
0746     bool some_;
0747 
0748 public:
0749     template<class Handler_>
0750     read_op(
0751         Handler_&& h,
0752         boost::shared_ptr<impl_type> const& sp,
0753         DynamicBuffer& b,
0754         std::size_t limit,
0755         bool some)
0756         : async_base<Handler,
0757             beast::executor_type<stream>>(
0758                 std::forward<Handler_>(h),
0759                     sp->stream().get_executor())
0760         , wp_(sp)
0761         , b_(b)
0762         , limit_(limit ? limit : (
0763             std::numeric_limits<std::size_t>::max)())
0764         , some_(some)
0765     {
0766         (*this)({}, 0, false);
0767     }
0768 
0769     void operator()(
0770         error_code ec = {},
0771         std::size_t bytes_transferred = 0,
0772         bool cont = true)
0773     {
0774         using beast::detail::clamp;
0775         auto sp = wp_.lock();
0776         if(! sp)
0777         {
0778             BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
0779             bytes_written_ = 0;
0780             return this->complete(cont, ec, bytes_written_);
0781         }
0782         auto& impl = *sp;
0783         using mutable_buffers_type = typename
0784             DynamicBuffer::mutable_buffers_type;
0785         BOOST_ASIO_CORO_REENTER(*this)
0786         {
0787             do
0788             {
0789                 // VFALCO TODO use boost::beast::bind_continuation
0790                 BOOST_ASIO_CORO_YIELD
0791                 {
0792                     auto mb = beast::detail::dynamic_buffer_prepare(b_,
0793                         clamp(impl.read_size_hint_db(b_), limit_),
0794                             ec, error::buffer_overflow);
0795                     if(impl.check_stop_now(ec))
0796                         goto upcall;
0797 
0798                     BOOST_ASIO_HANDLER_LOCATION((
0799                         __FILE__, __LINE__,
0800                         "websocket::async_read"));
0801 
0802                     read_some_op<read_op, mutable_buffers_type>(
0803                         std::move(*this), sp, *mb);
0804                 }
0805 
0806                 b_.commit(bytes_transferred);
0807                 bytes_written_ += bytes_transferred;
0808                 if(ec)
0809                     goto upcall;
0810             }
0811             while(! some_ && ! impl.rd_done);
0812 
0813         upcall:
0814             this->complete(cont, ec, bytes_written_);
0815         }
0816     }
0817 };
0818 
0819 template<class NextLayer, bool deflateSupported>
0820 struct stream<NextLayer, deflateSupported>::
0821     run_read_some_op
0822 {
0823     template<
0824         class ReadHandler,
0825         class MutableBufferSequence>
0826     void
0827     operator()(
0828         ReadHandler&& h,
0829         boost::shared_ptr<impl_type> const& sp,
0830         MutableBufferSequence const& b)
0831     {
0832         // If you get an error on the following line it means
0833         // that your handler does not meet the documented type
0834         // requirements for the handler.
0835 
0836         static_assert(
0837             beast::detail::is_invocable<ReadHandler,
0838                 void(error_code, std::size_t)>::value,
0839             "ReadHandler type requirements not met");
0840 
0841         read_some_op<
0842             typename std::decay<ReadHandler>::type,
0843             MutableBufferSequence>(
0844                 std::forward<ReadHandler>(h),
0845                 sp,
0846                 b);
0847     }
0848 };
0849 
0850 template<class NextLayer, bool deflateSupported>
0851 struct stream<NextLayer, deflateSupported>::
0852     run_read_op
0853 {
0854     template<
0855         class ReadHandler,
0856         class DynamicBuffer>
0857     void
0858     operator()(
0859         ReadHandler&& h,
0860         boost::shared_ptr<impl_type> const& sp,
0861         DynamicBuffer* b,
0862         std::size_t limit,
0863         bool some)
0864     {
0865         // If you get an error on the following line it means
0866         // that your handler does not meet the documented type
0867         // requirements for the handler.
0868 
0869         static_assert(
0870             beast::detail::is_invocable<ReadHandler,
0871                 void(error_code, std::size_t)>::value,
0872             "ReadHandler type requirements not met");
0873 
0874         read_op<
0875             typename std::decay<ReadHandler>::type,
0876             DynamicBuffer>(
0877                 std::forward<ReadHandler>(h),
0878                 sp,
0879                 *b,
0880                 limit,
0881                 some);
0882     }
0883 };
0884 
0885 //------------------------------------------------------------------------------
0886 
0887 template<class NextLayer, bool deflateSupported>
0888 template<class DynamicBuffer>
0889 std::size_t
0890 stream<NextLayer, deflateSupported>::
0891 read(DynamicBuffer& buffer)
0892 {
0893     static_assert(is_sync_stream<next_layer_type>::value,
0894         "SyncStream type requirements not met");
0895     static_assert(
0896         net::is_dynamic_buffer<DynamicBuffer>::value,
0897         "DynamicBuffer type requirements not met");
0898     error_code ec;
0899     auto const bytes_written = read(buffer, ec);
0900     if(ec)
0901         BOOST_THROW_EXCEPTION(system_error{ec});
0902     return bytes_written;
0903 }
0904 
0905 template<class NextLayer, bool deflateSupported>
0906 template<class DynamicBuffer>
0907 std::size_t
0908 stream<NextLayer, deflateSupported>::
0909 read(DynamicBuffer& buffer, error_code& ec)
0910 {
0911     static_assert(is_sync_stream<next_layer_type>::value,
0912         "SyncStream type requirements not met");
0913     static_assert(
0914         net::is_dynamic_buffer<DynamicBuffer>::value,
0915         "DynamicBuffer type requirements not met");
0916     std::size_t bytes_written = 0;
0917     do
0918     {
0919         bytes_written += read_some(buffer, 0, ec);
0920         if(ec)
0921             return bytes_written;
0922     }
0923     while(! is_message_done());
0924     return bytes_written;
0925 }
0926 
0927 template<class NextLayer, bool deflateSupported>
0928 template<class DynamicBuffer, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>
0929 BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
0930 stream<NextLayer, deflateSupported>::
0931 async_read(DynamicBuffer& buffer, ReadHandler&& handler)
0932 {
0933     static_assert(is_async_stream<next_layer_type>::value,
0934         "AsyncStream type requirements not met");
0935     static_assert(
0936         net::is_dynamic_buffer<DynamicBuffer>::value,
0937         "DynamicBuffer type requirements not met");
0938     return net::async_initiate<
0939         ReadHandler,
0940         void(error_code, std::size_t)>(
0941             run_read_op{},
0942             handler,
0943             impl_,
0944             &buffer,
0945             0,
0946             false);
0947 }
0948 
0949 //------------------------------------------------------------------------------
0950 
0951 template<class NextLayer, bool deflateSupported>
0952 template<class DynamicBuffer>
0953 std::size_t
0954 stream<NextLayer, deflateSupported>::
0955 read_some(
0956     DynamicBuffer& buffer,
0957     std::size_t limit)
0958 {
0959     static_assert(is_sync_stream<next_layer_type>::value,
0960         "SyncStream type requirements not met");
0961     static_assert(
0962         net::is_dynamic_buffer<DynamicBuffer>::value,
0963         "DynamicBuffer type requirements not met");
0964     error_code ec;
0965     auto const bytes_written =
0966         read_some(buffer, limit, ec);
0967     if(ec)
0968         BOOST_THROW_EXCEPTION(system_error{ec});
0969     return bytes_written;
0970 }
0971 
0972 template<class NextLayer, bool deflateSupported>
0973 template<class DynamicBuffer>
0974 std::size_t
0975 stream<NextLayer, deflateSupported>::
0976 read_some(
0977     DynamicBuffer& buffer,
0978     std::size_t limit,
0979     error_code& ec)
0980 {
0981     static_assert(is_sync_stream<next_layer_type>::value,
0982         "SyncStream type requirements not met");
0983     static_assert(
0984         net::is_dynamic_buffer<DynamicBuffer>::value,
0985         "DynamicBuffer type requirements not met");
0986     using beast::detail::clamp;
0987     if(! limit)
0988         limit = (std::numeric_limits<std::size_t>::max)();
0989     auto const size =
0990         clamp(read_size_hint(buffer), limit);
0991     BOOST_ASSERT(size > 0);
0992     auto mb = beast::detail::dynamic_buffer_prepare(
0993         buffer, size, ec, error::buffer_overflow);
0994     if(impl_->check_stop_now(ec))
0995         return 0;
0996     auto const bytes_written = read_some(*mb, ec);
0997     buffer.commit(bytes_written);
0998     return bytes_written;
0999 }
1000 
1001 template<class NextLayer, bool deflateSupported>
1002 template<class DynamicBuffer, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>
1003 BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
1004 stream<NextLayer, deflateSupported>::
1005 async_read_some(
1006     DynamicBuffer& buffer,
1007     std::size_t limit,
1008     ReadHandler&& handler)
1009 {
1010     static_assert(is_async_stream<next_layer_type>::value,
1011         "AsyncStream type requirements not met");
1012     static_assert(
1013         net::is_dynamic_buffer<DynamicBuffer>::value,
1014         "DynamicBuffer type requirements not met");
1015     return net::async_initiate<
1016         ReadHandler,
1017         void(error_code, std::size_t)>(
1018             run_read_op{},
1019             handler,
1020             impl_,
1021             &buffer,
1022             limit,
1023             true);
1024 }
1025 
1026 //------------------------------------------------------------------------------
1027 
1028 template<class NextLayer, bool deflateSupported>
1029 template<class MutableBufferSequence>
1030 std::size_t
1031 stream<NextLayer, deflateSupported>::
1032 read_some(
1033     MutableBufferSequence const& buffers)
1034 {
1035     static_assert(is_sync_stream<next_layer_type>::value,
1036         "SyncStream type requirements not met");
1037     static_assert(net::is_mutable_buffer_sequence<
1038             MutableBufferSequence>::value,
1039         "MutableBufferSequence type requirements not met");
1040     error_code ec;
1041     auto const bytes_written = read_some(buffers, ec);
1042     if(ec)
1043         BOOST_THROW_EXCEPTION(system_error{ec});
1044     return bytes_written;
1045 }
1046 
1047 template<class NextLayer, bool deflateSupported>
1048 template<class MutableBufferSequence>
1049 std::size_t
1050 stream<NextLayer, deflateSupported>::
1051 read_some(
1052     MutableBufferSequence const& buffers,
1053     error_code& ec)
1054 {
1055     static_assert(is_sync_stream<next_layer_type>::value,
1056         "SyncStream type requirements not met");
1057     static_assert(net::is_mutable_buffer_sequence<
1058             MutableBufferSequence>::value,
1059         "MutableBufferSequence type requirements not met");
1060     using beast::detail::clamp;
1061     auto& impl = *impl_;
1062     close_code code{};
1063     std::size_t bytes_written = 0;
1064     ec = {};
1065     // Make sure the stream is open
1066     if(impl.check_stop_now(ec))
1067         return bytes_written;
1068 loop:
1069     // See if we need to read a frame header. This
1070     // condition is structured to give the decompressor
1071     // a chance to emit the final empty deflate block
1072     //
1073     if(impl.rd_remain == 0 && (
1074         ! impl.rd_fh.fin || impl.rd_done))
1075     {
1076         // Read frame header
1077         error_code result;
1078         while(! impl.parse_fh(impl.rd_fh, impl.rd_buf, result))
1079         {
1080             if(result)
1081             {
1082                 // _Fail the WebSocket Connection_
1083                 if(result == error::message_too_big)
1084                     code = close_code::too_big;
1085                 else
1086                     code = close_code::protocol_error;
1087                 do_fail(code, result, ec);
1088                 return bytes_written;
1089             }
1090             auto const bytes_transferred =
1091                 impl.stream().read_some(
1092                     impl.rd_buf.prepare(read_size(
1093                         impl.rd_buf, impl.rd_buf.max_size())),
1094                     ec);
1095             impl.rd_buf.commit(bytes_transferred);
1096             if(impl.check_stop_now(ec))
1097                 return bytes_written;
1098         }
1099         // Immediately apply the mask to the portion
1100         // of the buffer holding payload data.
1101         if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
1102             detail::mask_inplace(buffers_prefix(
1103                 clamp(impl.rd_fh.len), impl.rd_buf.data()),
1104                     impl.rd_key);
1105         if(detail::is_control(impl.rd_fh.op))
1106         {
1107             // Get control frame payload
1108             auto const b = buffers_prefix(
1109                 clamp(impl.rd_fh.len), impl.rd_buf.data());
1110             auto const len = buffer_bytes(b);
1111             BOOST_ASSERT(len == impl.rd_fh.len);
1112 
1113             // Clear this otherwise the next
1114             // frame will be considered final.
1115             impl.rd_fh.fin = false;
1116 
1117             // Handle ping frame
1118             if(impl.rd_fh.op == detail::opcode::ping)
1119             {
1120                 ping_data payload;
1121                 detail::read_ping(payload, b);
1122                 impl.rd_buf.consume(len);
1123                 if(impl.wr_close)
1124                 {
1125                     // Ignore ping when closing
1126                     goto loop;
1127                 }
1128                 if(impl.ctrl_cb)
1129                     impl.ctrl_cb(frame_type::ping, to_string_view(payload));
1130                 detail::frame_buffer fb;
1131                 impl.template write_ping<flat_static_buffer_base>(fb,
1132                     detail::opcode::pong, payload);
1133                 net::write(impl.stream(), fb.data(), ec);
1134                 if(impl.check_stop_now(ec))
1135                     return bytes_written;
1136                 goto loop;
1137             }
1138             // Handle pong frame
1139             if(impl.rd_fh.op == detail::opcode::pong)
1140             {
1141                 ping_data payload;
1142                 detail::read_ping(payload, b);
1143                 impl.rd_buf.consume(len);
1144                 if(impl.ctrl_cb)
1145                     impl.ctrl_cb(frame_type::pong, to_string_view(payload));
1146                 goto loop;
1147             }
1148             // Handle close frame
1149             BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close);
1150             {
1151                 BOOST_ASSERT(! impl.rd_close);
1152                 impl.rd_close = true;
1153                 close_reason cr;
1154                 detail::read_close(cr, b, result);
1155                 if(result)
1156                 {
1157                     // _Fail the WebSocket Connection_
1158                     do_fail(close_code::protocol_error,
1159                         result, ec);
1160                     return bytes_written;
1161                 }
1162                 impl.cr = cr;
1163                 impl.rd_buf.consume(len);
1164                 if(impl.ctrl_cb)
1165                     impl.ctrl_cb(frame_type::close, to_string_view(impl.cr.reason));
1166                 BOOST_ASSERT(! impl.wr_close);
1167                 // _Start the WebSocket Closing Handshake_
1168                 do_fail(
1169                     cr.code == close_code::none ?
1170                         close_code::normal :
1171                         static_cast<close_code>(cr.code),
1172                     error::closed, ec);
1173                 return bytes_written;
1174             }
1175         }
1176         if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin)
1177         {
1178             // Empty non-final frame
1179             goto loop;
1180         }
1181         impl.rd_done = false;
1182     }
1183     else
1184     {
1185         ec = {};
1186     }
1187     if(! impl.rd_deflated())
1188     {
1189         if(impl.rd_remain > 0)
1190         {
1191             if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() >
1192                 (std::min)(clamp(impl.rd_remain),
1193                     buffer_bytes(buffers)))
1194             {
1195                 // Fill the read buffer first, otherwise we
1196                 // get fewer bytes at the cost of one I/O.
1197                 impl.rd_buf.commit(impl.stream().read_some(
1198                     impl.rd_buf.prepare(read_size(impl.rd_buf,
1199                         impl.rd_buf.max_size())), ec));
1200                 if(impl.check_stop_now(ec))
1201                     return bytes_written;
1202                 if(impl.rd_fh.mask)
1203                     detail::mask_inplace(
1204                         buffers_prefix(clamp(impl.rd_remain),
1205                             impl.rd_buf.data()), impl.rd_key);
1206             }
1207             if(impl.rd_buf.size() > 0)
1208             {
1209                 // Copy from the read buffer.
1210                 // The mask was already applied.
1211                 auto const bytes_transferred = net::buffer_copy(
1212                     buffers, impl.rd_buf.data(),
1213                         clamp(impl.rd_remain));
1214                 auto const mb = buffers_prefix(
1215                     bytes_transferred, buffers);
1216                 impl.rd_remain -= bytes_transferred;
1217                 if(impl.rd_op == detail::opcode::text)
1218                 {
1219                     if(! impl.rd_utf8.write(mb) ||
1220                         (impl.rd_remain == 0 && impl.rd_fh.fin &&
1221                             ! impl.rd_utf8.finish()))
1222                     {
1223                         // _Fail the WebSocket Connection_
1224                         do_fail(close_code::bad_payload,
1225                             error::bad_frame_payload, ec);
1226                         return bytes_written;
1227                     }
1228                 }
1229                 bytes_written += bytes_transferred;
1230                 impl.rd_size += bytes_transferred;
1231                 impl.rd_buf.consume(bytes_transferred);
1232             }
1233             else
1234             {
1235                 // Read into caller's buffer
1236                 BOOST_ASSERT(impl.rd_remain > 0);
1237                 BOOST_ASSERT(buffer_bytes(buffers) > 0);
1238                 BOOST_ASSERT(buffer_bytes(buffers_prefix(
1239                     clamp(impl.rd_remain), buffers)) > 0);
1240                 auto const bytes_transferred =
1241                     impl.stream().read_some(buffers_prefix(
1242                         clamp(impl.rd_remain), buffers), ec);
1243                 // VFALCO What if some bytes were written?
1244                 if(impl.check_stop_now(ec))
1245                     return bytes_written;
1246                 BOOST_ASSERT(bytes_transferred > 0);
1247                 auto const mb = buffers_prefix(
1248                     bytes_transferred, buffers);
1249                 impl.rd_remain -= bytes_transferred;
1250                 if(impl.rd_fh.mask)
1251                     detail::mask_inplace(mb, impl.rd_key);
1252                 if(impl.rd_op == detail::opcode::text)
1253                 {
1254                     if(! impl.rd_utf8.write(mb) ||
1255                         (impl.rd_remain == 0 && impl.rd_fh.fin &&
1256                             ! impl.rd_utf8.finish()))
1257                     {
1258                         // _Fail the WebSocket Connection_
1259                         do_fail(close_code::bad_payload,
1260                             error::bad_frame_payload, ec);
1261                         return bytes_written;
1262                     }
1263                 }
1264                 bytes_written += bytes_transferred;
1265                 impl.rd_size += bytes_transferred;
1266             }
1267         }
1268         BOOST_ASSERT( ! impl.rd_done );
1269         if( impl.rd_remain == 0 && impl.rd_fh.fin )
1270             impl.rd_done = true;
1271     }
1272     else
1273     {
1274         // Read compressed message frame payload:
1275         // inflate even if rd_fh_.len == 0, otherwise we
1276         // never emit the end-of-stream deflate block.
1277         //
1278         bool did_read = false;
1279         buffers_suffix<MutableBufferSequence> cb(buffers);
1280         while(buffer_bytes(cb) > 0)
1281         {
1282             zlib::z_params zs;
1283             {
1284                 auto const out = beast::buffers_front(cb);
1285                 zs.next_out = out.data();
1286                 zs.avail_out = out.size();
1287                 BOOST_ASSERT(zs.avail_out > 0);
1288             }
1289             // boolean to track the end of the message.
1290             bool fin = false;
1291             if(impl.rd_remain > 0)
1292             {
1293                 if(impl.rd_buf.size() > 0)
1294                 {
1295                     // use what's there
1296                     auto const in = buffers_prefix(
1297                         clamp(impl.rd_remain), beast::buffers_front(
1298                             impl.rd_buf.data()));
1299                     zs.avail_in = in.size();
1300                     zs.next_in = in.data();
1301                 }
1302                 else if(! did_read)
1303                 {
1304                     // read new
1305                     auto const bytes_transferred =
1306                         impl.stream().read_some(
1307                             impl.rd_buf.prepare(read_size(
1308                                 impl.rd_buf, impl.rd_buf.max_size())),
1309                             ec);
1310                     if(impl.check_stop_now(ec))
1311                         return bytes_written;
1312                     BOOST_ASSERT(bytes_transferred > 0);
1313                     impl.rd_buf.commit(bytes_transferred);
1314                     if(impl.rd_fh.mask)
1315                         detail::mask_inplace(
1316                             buffers_prefix(clamp(impl.rd_remain),
1317                                 impl.rd_buf.data()), impl.rd_key);
1318                     auto const in = buffers_prefix(
1319                         clamp(impl.rd_remain), buffers_front(
1320                             impl.rd_buf.data()));
1321                     zs.avail_in = in.size();
1322                     zs.next_in = in.data();
1323                     did_read = true;
1324                 }
1325                 else
1326                 {
1327                     break;
1328                 }
1329             }
1330             else if(impl.rd_fh.fin)
1331             {
1332                 // append the empty block codes
1333                 static std::uint8_t constexpr
1334                     empty_block[4] = { 0x00, 0x00, 0xff, 0xff };
1335                 zs.next_in = empty_block;
1336                 zs.avail_in = sizeof(empty_block);
1337                 fin = true;
1338             }
1339             else
1340             {
1341                 break;
1342             }
1343             impl.inflate(zs, zlib::Flush::sync, ec);
1344             if(impl.check_stop_now(ec))
1345                 return bytes_written;
1346             if (fin && zs.total_out == 0) {
1347                 impl.do_context_takeover_read(impl.role);
1348                 impl.rd_done = true;
1349                 break;
1350             }
1351             if(impl.rd_msg_max && beast::detail::sum_exceeds(
1352                 impl.rd_size, zs.total_out, impl.rd_msg_max))
1353             {
1354                 do_fail(close_code::too_big,
1355                     error::message_too_big, ec);
1356                 return bytes_written;
1357             }
1358             cb.consume(zs.total_out);
1359             impl.rd_size += zs.total_out;
1360             if (! fin) {
1361                 impl.rd_remain -= zs.total_in;
1362                 impl.rd_buf.consume(zs.total_in);
1363             }
1364             bytes_written += zs.total_out;
1365         }
1366         if(impl.rd_op == detail::opcode::text)
1367         {
1368             // check utf8
1369             if(! impl.rd_utf8.write(beast::buffers_prefix(
1370                 bytes_written, buffers)) || (
1371                     impl.rd_done && ! impl.rd_utf8.finish()))
1372             {
1373                 // _Fail the WebSocket Connection_
1374                 do_fail(close_code::bad_payload,
1375                     error::bad_frame_payload, ec);
1376                 return bytes_written;
1377             }
1378         }
1379     }
1380     return bytes_written;
1381 }
1382 
1383 template<class NextLayer, bool deflateSupported>
1384 template<class MutableBufferSequence, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>
1385 BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
1386 stream<NextLayer, deflateSupported>::
1387 async_read_some(
1388     MutableBufferSequence const& buffers,
1389     ReadHandler&& handler)
1390 {
1391     static_assert(is_async_stream<next_layer_type>::value,
1392         "AsyncStream type requirements not met");
1393     static_assert(net::is_mutable_buffer_sequence<
1394             MutableBufferSequence>::value,
1395         "MutableBufferSequence type requirements not met");
1396     return net::async_initiate<
1397         ReadHandler,
1398         void(error_code, std::size_t)>(
1399             run_read_some_op{},
1400             handler,
1401             impl_,
1402             buffers);
1403 }
1404 
1405 } // websocket
1406 } // beast
1407 } // boost
1408 
1409 #endif