File indexing completed on 2025-07-15 08:29:01
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 #ifndef BOOST_BEAST_WEBSOCKET_IMPL_CLOSE_HPP
0011 #define BOOST_BEAST_WEBSOCKET_IMPL_CLOSE_HPP
0012
0013 #include <boost/beast/websocket/teardown.hpp>
0014 #include <boost/beast/websocket/detail/mask.hpp>
0015 #include <boost/beast/websocket/impl/stream_impl.hpp>
0016 #include <boost/beast/core/async_base.hpp>
0017 #include <boost/beast/core/flat_static_buffer.hpp>
0018 #include <boost/beast/core/stream_traits.hpp>
0019 #include <boost/beast/core/detail/bind_continuation.hpp>
0020 #include <boost/asio/coroutine.hpp>
0021 #include <boost/asio/dispatch.hpp>
0022 #include <boost/throw_exception.hpp>
0023 #include <memory>
0024
0025 namespace boost {
0026 namespace beast {
0027 namespace websocket {
0028
0029
0030
0031
0032
0033
0034
0035
0036 template<class NextLayer, bool deflateSupported>
0037 template<class Handler>
0038 class stream<NextLayer, deflateSupported>::close_op
0039 : public beast::stable_async_base<
0040 Handler, beast::executor_type<stream>>
0041 , public asio::coroutine
0042 {
0043 boost::weak_ptr<impl_type> wp_;
0044 error_code ev_;
0045 detail::frame_buffer& fb_;
0046
0047 public:
0048 static constexpr int id = 5;
0049
0050 template<class Handler_>
0051 close_op(
0052 Handler_&& h,
0053 boost::shared_ptr<impl_type> const& sp,
0054 close_reason const& cr)
0055 : stable_async_base<Handler,
0056 beast::executor_type<stream>>(
0057 std::forward<Handler_>(h),
0058 sp->stream().get_executor())
0059 , wp_(sp)
0060 , fb_(beast::allocate_stable<
0061 detail::frame_buffer>(*this))
0062 {
0063
0064 sp->template write_close<
0065 flat_static_buffer_base>(fb_, cr);
0066 (*this)({}, 0, false);
0067 }
0068
0069 void
0070 operator()(
0071 error_code ec = {},
0072 std::size_t bytes_transferred = 0,
0073 bool cont = true)
0074 {
0075 using beast::detail::clamp;
0076 auto sp = wp_.lock();
0077 if(! sp)
0078 {
0079 BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
0080 return this->complete(cont, ec);
0081 }
0082 auto& impl = *sp;
0083 BOOST_ASIO_CORO_REENTER(*this)
0084 {
0085
0086 if(! impl.wr_block.try_lock(this))
0087 {
0088 BOOST_ASIO_CORO_YIELD
0089 {
0090 BOOST_ASIO_HANDLER_LOCATION((
0091 __FILE__, __LINE__,
0092 "websocket::async_close"));
0093 this->set_allowed_cancellation(net::cancellation_type::all);
0094 impl.op_close.emplace(std::move(*this),
0095 net::cancellation_type::all);
0096 }
0097
0098 if (ec == net::error::operation_aborted)
0099 return this->complete(cont, ec);
0100 this->set_allowed_cancellation(net::cancellation_type::terminal);
0101
0102 impl.wr_block.lock(this);
0103 BOOST_ASIO_CORO_YIELD
0104 {
0105 BOOST_ASIO_HANDLER_LOCATION((
0106 __FILE__, __LINE__,
0107 "websocket::async_close"));
0108
0109 const auto ex = this->get_immediate_executor();
0110 net::dispatch(ex, std::move(*this));
0111 }
0112 BOOST_ASSERT(impl.wr_block.is_locked(this));
0113 }
0114 if(impl.check_stop_now(ec))
0115 goto upcall;
0116
0117
0118
0119 BOOST_ASSERT(! impl.wr_close);
0120
0121
0122 impl.wr_close = true;
0123 impl.change_status(status::closing);
0124 impl.update_timer(this->get_executor());
0125 BOOST_ASIO_CORO_YIELD
0126 {
0127 BOOST_ASIO_HANDLER_LOCATION((
0128 __FILE__, __LINE__,
0129 "websocket::async_close"));
0130
0131 net::async_write(impl.stream(), fb_.data(),
0132 beast::detail::bind_continuation(std::move(*this)));
0133 }
0134 if(impl.check_stop_now(ec))
0135 goto upcall;
0136
0137 if(impl.rd_close)
0138 {
0139
0140
0141
0142 goto teardown;
0143 }
0144
0145
0146 if(! impl.rd_block.try_lock(this))
0147 {
0148 BOOST_ASIO_CORO_YIELD
0149 {
0150 BOOST_ASIO_HANDLER_LOCATION((
0151 __FILE__, __LINE__,
0152 "websocket::async_close"));
0153
0154 impl.op_r_close.emplace(std::move(*this));
0155 }
0156 if (ec == net::error::operation_aborted)
0157 {
0158
0159 impl.change_status(status::closed);
0160 close_socket(get_lowest_layer(impl.stream()));
0161 return this->complete(cont, ec);
0162 }
0163
0164 impl.rd_block.lock(this);
0165 BOOST_ASIO_CORO_YIELD
0166 {
0167 BOOST_ASIO_HANDLER_LOCATION((
0168 __FILE__, __LINE__,
0169 "websocket::async_close"));
0170
0171 const auto ex = this->get_immediate_executor();
0172 net::dispatch(ex, std::move(*this));
0173 }
0174 BOOST_ASSERT(impl.rd_block.is_locked(this));
0175 if(impl.check_stop_now(ec))
0176 goto upcall;
0177 BOOST_ASSERT(! impl.rd_close);
0178 }
0179
0180
0181
0182 if(impl.rd_remain > 0)
0183 goto read_payload;
0184 for(;;)
0185 {
0186
0187 while(! impl.parse_fh(
0188 impl.rd_fh, impl.rd_buf, ev_))
0189 {
0190 if(ev_)
0191 goto teardown;
0192 BOOST_ASIO_CORO_YIELD
0193 {
0194 BOOST_ASIO_HANDLER_LOCATION((
0195 __FILE__, __LINE__,
0196 "websocket::async_close"));
0197
0198 impl.stream().async_read_some(
0199 impl.rd_buf.prepare(read_size(
0200 impl.rd_buf, impl.rd_buf.max_size())),
0201 beast::detail::bind_continuation(std::move(*this)));
0202 }
0203 impl.rd_buf.commit(bytes_transferred);
0204 if(impl.check_stop_now(ec))
0205 goto upcall;
0206 }
0207 if(detail::is_control(impl.rd_fh.op))
0208 {
0209
0210 if(impl.rd_fh.op != detail::opcode::close)
0211 {
0212 impl.rd_buf.consume(clamp(impl.rd_fh.len));
0213 continue;
0214 }
0215
0216
0217
0218 BOOST_ASSERT(! impl.rd_close);
0219 impl.rd_close = true;
0220 auto const mb = buffers_prefix(
0221 clamp(impl.rd_fh.len),
0222 impl.rd_buf.data());
0223 if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
0224 detail::mask_inplace(mb, impl.rd_key);
0225 detail::read_close(impl.cr, mb, ev_);
0226 if(ev_)
0227 goto teardown;
0228 impl.rd_buf.consume(clamp(impl.rd_fh.len));
0229 goto teardown;
0230 }
0231
0232 read_payload:
0233
0234 while(impl.rd_buf.size() < impl.rd_remain)
0235 {
0236 impl.rd_remain -= impl.rd_buf.size();
0237 impl.rd_buf.consume(impl.rd_buf.size());
0238 BOOST_ASIO_CORO_YIELD
0239 {
0240 BOOST_ASIO_HANDLER_LOCATION((
0241 __FILE__, __LINE__,
0242 "websocket::async_close"));
0243
0244 impl.stream().async_read_some(
0245 impl.rd_buf.prepare(read_size(
0246 impl.rd_buf, impl.rd_buf.max_size())),
0247 beast::detail::bind_continuation(std::move(*this)));
0248 }
0249 impl.rd_buf.commit(bytes_transferred);
0250 if(impl.check_stop_now(ec))
0251 goto upcall;
0252 }
0253 BOOST_ASSERT(impl.rd_buf.size() >= impl.rd_remain);
0254 impl.rd_buf.consume(clamp(impl.rd_remain));
0255 impl.rd_remain = 0;
0256 }
0257
0258 teardown:
0259
0260 BOOST_ASSERT(impl.wr_block.is_locked(this));
0261 using beast::websocket::async_teardown;
0262 BOOST_ASIO_CORO_YIELD
0263 {
0264 BOOST_ASIO_HANDLER_LOCATION((
0265 __FILE__, __LINE__,
0266 "websocket::async_close"));
0267
0268 async_teardown(impl.role, impl.stream(),
0269 beast::detail::bind_continuation(std::move(*this)));
0270 }
0271 BOOST_ASSERT(impl.wr_block.is_locked(this));
0272 if(ec == net::error::eof)
0273 {
0274
0275
0276 ec = {};
0277 }
0278 if(! ec)
0279 {
0280 BOOST_BEAST_ASSIGN_EC(ec, ev_);
0281 }
0282 if(ec)
0283 impl.change_status(status::failed);
0284 else
0285 impl.change_status(status::closed);
0286 impl.close();
0287
0288 upcall:
0289 impl.wr_block.unlock(this);
0290 impl.rd_block.try_unlock(this)
0291 && impl.op_r_rd.maybe_invoke();
0292 impl.op_rd.maybe_invoke()
0293 || impl.op_idle_ping.maybe_invoke()
0294 || impl.op_ping.maybe_invoke()
0295 || impl.op_wr.maybe_invoke();
0296 this->complete(cont, ec);
0297 }
0298 }
0299 };
0300
0301 template<class NextLayer, bool deflateSupported>
0302 struct stream<NextLayer, deflateSupported>::
0303 run_close_op
0304 {
0305 boost::shared_ptr<impl_type> const& self;
0306
0307 using executor_type = typename stream::executor_type;
0308
0309 executor_type
0310 get_executor() const noexcept
0311 {
0312 return self->stream().get_executor();
0313 }
0314
0315 template<class CloseHandler>
0316 void
0317 operator()(
0318 CloseHandler&& h,
0319 close_reason const& cr)
0320 {
0321
0322
0323
0324
0325 static_assert(
0326 beast::detail::is_invocable<CloseHandler,
0327 void(error_code)>::value,
0328 "CloseHandler type requirements not met");
0329
0330 close_op<
0331 typename std::decay<CloseHandler>::type>(
0332 std::forward<CloseHandler>(h),
0333 self,
0334 cr);
0335 }
0336 };
0337
0338
0339
0340 template<class NextLayer, bool deflateSupported>
0341 void
0342 stream<NextLayer, deflateSupported>::
0343 close(close_reason const& cr)
0344 {
0345 static_assert(is_sync_stream<next_layer_type>::value,
0346 "SyncStream type requirements not met");
0347 error_code ec;
0348 close(cr, ec);
0349 if(ec)
0350 BOOST_THROW_EXCEPTION(system_error{ec});
0351 }
0352
0353 template<class NextLayer, bool deflateSupported>
0354 void
0355 stream<NextLayer, deflateSupported>::
0356 close(close_reason const& cr, error_code& ec)
0357 {
0358 static_assert(is_sync_stream<next_layer_type>::value,
0359 "SyncStream type requirements not met");
0360 using beast::detail::clamp;
0361 auto& impl = *impl_;
0362 ec = {};
0363 if(impl.check_stop_now(ec))
0364 return;
0365 BOOST_ASSERT(! impl.rd_close);
0366
0367
0368
0369 BOOST_ASSERT(! impl.wr_close);
0370
0371
0372 {
0373 impl.wr_close = true;
0374 impl.change_status(status::closing);
0375 detail::frame_buffer fb;
0376 impl.template write_close<flat_static_buffer_base>(fb, cr);
0377 net::write(impl.stream(), fb.data(), ec);
0378 if(impl.check_stop_now(ec))
0379 return;
0380 }
0381
0382
0383 error_code ev;
0384 if(impl.rd_remain > 0)
0385 goto read_payload;
0386 for(;;)
0387 {
0388
0389 while(! impl.parse_fh(
0390 impl.rd_fh, impl.rd_buf, ev))
0391 {
0392 if(ev)
0393 {
0394
0395 return do_fail(close_code::none, ev, ec);
0396 }
0397 impl.rd_buf.commit(impl.stream().read_some(
0398 impl.rd_buf.prepare(read_size(
0399 impl.rd_buf, impl.rd_buf.max_size())), ec));
0400 if(impl.check_stop_now(ec))
0401 return;
0402 }
0403
0404 if(detail::is_control(impl.rd_fh.op))
0405 {
0406
0407 if(impl.rd_fh.op != detail::opcode::close)
0408 {
0409 impl.rd_buf.consume(clamp(impl.rd_fh.len));
0410 continue;
0411 }
0412
0413
0414
0415 BOOST_ASSERT(! impl.rd_close);
0416 impl.rd_close = true;
0417 auto const mb = buffers_prefix(
0418 clamp(impl.rd_fh.len),
0419 impl.rd_buf.data());
0420 if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
0421 detail::mask_inplace(mb, impl.rd_key);
0422 detail::read_close(impl.cr, mb, ev);
0423 if(ev)
0424 {
0425
0426 return do_fail(close_code::none, ev, ec);
0427 }
0428 impl.rd_buf.consume(clamp(impl.rd_fh.len));
0429 break;
0430 }
0431
0432 read_payload:
0433
0434 while(impl.rd_buf.size() < impl.rd_remain)
0435 {
0436 impl.rd_remain -= impl.rd_buf.size();
0437 impl.rd_buf.consume(impl.rd_buf.size());
0438 impl.rd_buf.commit(
0439 impl.stream().read_some(
0440 impl.rd_buf.prepare(
0441 read_size(
0442 impl.rd_buf,
0443 impl.rd_buf.max_size())),
0444 ec));
0445 if(impl.check_stop_now(ec))
0446 return;
0447 }
0448 BOOST_ASSERT(
0449 impl.rd_buf.size() >= impl.rd_remain);
0450 impl.rd_buf.consume(clamp(impl.rd_remain));
0451 impl.rd_remain = 0;
0452 }
0453
0454 do_fail(close_code::none, error::closed, ec);
0455 if(ec == error::closed)
0456 ec = {};
0457 }
0458
0459 template<class NextLayer, bool deflateSupported>
0460 template<BOOST_BEAST_ASYNC_TPARAM1 CloseHandler>
0461 BOOST_BEAST_ASYNC_RESULT1(CloseHandler)
0462 stream<NextLayer, deflateSupported>::
0463 async_close(close_reason const& cr, CloseHandler&& handler)
0464 {
0465 static_assert(is_async_stream<next_layer_type>::value,
0466 "AsyncStream type requirements not met");
0467 return net::async_initiate<
0468 CloseHandler,
0469 void(error_code)>(
0470 run_close_op{impl_},
0471 handler,
0472 cr);
0473 }
0474
0475 }
0476 }
0477 }
0478
0479 #endif