File indexing completed on 2025-09-17 08:24:17
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 #ifndef BOOST_BEAST_WEBSOCKET_IMPL_CLOSE_HPP
0011 #define BOOST_BEAST_WEBSOCKET_IMPL_CLOSE_HPP
0012
0013 #include <boost/beast/websocket/teardown.hpp>
0014 #include <boost/beast/websocket/detail/mask.hpp>
0015 #include <boost/beast/websocket/impl/stream_impl.hpp>
0016 #include <boost/beast/core/async_base.hpp>
0017 #include <boost/beast/core/flat_static_buffer.hpp>
0018 #include <boost/beast/core/stream_traits.hpp>
0019 #include <boost/beast/core/detail/bind_continuation.hpp>
0020 #include <boost/asio/coroutine.hpp>
0021 #include <boost/asio/dispatch.hpp>
0022 #include <boost/throw_exception.hpp>
0023 #include <memory>
0024
0025 namespace boost {
0026 namespace beast {
0027 namespace websocket {
0028
0029
0030
0031
0032
0033
0034
0035
0036 template<class NextLayer, bool deflateSupported>
0037 template<class Handler>
0038 class stream<NextLayer, deflateSupported>::close_op
0039 : public beast::stable_async_base<
0040 Handler, beast::executor_type<stream>>
0041 , public asio::coroutine
0042 {
0043 boost::weak_ptr<impl_type> wp_;
0044 error_code ev_;
0045 detail::frame_buffer& fb_;
0046
0047 public:
0048 static constexpr int id = 5;
0049
0050 template<class Handler_>
0051 close_op(
0052 Handler_&& h,
0053 boost::shared_ptr<impl_type> const& sp,
0054 close_reason const& cr)
0055 : stable_async_base<Handler,
0056 beast::executor_type<stream>>(
0057 std::forward<Handler_>(h),
0058 sp->stream().get_executor())
0059 , wp_(sp)
0060 , fb_(beast::allocate_stable<
0061 detail::frame_buffer>(*this))
0062 {
0063
0064 sp->template write_close<
0065 flat_static_buffer_base>(fb_, cr);
0066 (*this)({}, 0, false);
0067 }
0068
0069 void
0070 operator()(
0071 error_code ec = {},
0072 std::size_t bytes_transferred = 0,
0073 bool cont = true)
0074 {
0075 using beast::detail::clamp;
0076 auto sp = wp_.lock();
0077 if(! sp)
0078 {
0079 BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
0080 return this->complete(cont, ec);
0081 }
0082 auto& impl = *sp;
0083 BOOST_ASIO_CORO_REENTER(*this)
0084 {
0085
0086 if(! impl.wr_block.try_lock(this))
0087 {
0088 BOOST_ASIO_CORO_YIELD
0089 {
0090 BOOST_ASIO_HANDLER_LOCATION((
0091 __FILE__, __LINE__,
0092 "websocket::async_close"));
0093 this->set_allowed_cancellation(net::cancellation_type::all);
0094 impl.op_close.emplace(std::move(*this),
0095 net::cancellation_type::all);
0096 }
0097
0098 if (ec == net::error::operation_aborted)
0099 return this->complete(cont, ec);
0100 this->set_allowed_cancellation(net::cancellation_type::terminal);
0101
0102 impl.wr_block.lock(this);
0103 BOOST_ASIO_CORO_YIELD
0104 {
0105 BOOST_ASIO_HANDLER_LOCATION((
0106 __FILE__, __LINE__,
0107 "websocket::async_close"));
0108
0109 const auto ex = this->get_immediate_executor();
0110 net::dispatch(ex, std::move(*this));
0111 }
0112 BOOST_ASSERT(impl.wr_block.is_locked(this));
0113 }
0114 if(impl.check_stop_now(ec))
0115 goto upcall;
0116
0117
0118
0119 BOOST_ASSERT(! impl.wr_close);
0120
0121
0122 impl.wr_close = true;
0123 impl.change_status(status::closing);
0124 impl.update_timer(this->get_executor());
0125 BOOST_ASIO_CORO_YIELD
0126 {
0127 BOOST_ASIO_HANDLER_LOCATION((
0128 __FILE__, __LINE__,
0129 "websocket::async_close"));
0130
0131 net::async_write(impl.stream(), fb_.data(),
0132 beast::detail::bind_continuation(std::move(*this)));
0133 }
0134 if(impl.check_stop_now(ec))
0135 goto upcall;
0136
0137 if(impl.rd_close)
0138 {
0139
0140
0141
0142 goto teardown;
0143 }
0144
0145
0146 if(! impl.rd_block.try_lock(this))
0147 {
0148 BOOST_ASIO_CORO_YIELD
0149 {
0150 BOOST_ASIO_HANDLER_LOCATION((
0151 __FILE__, __LINE__,
0152 "websocket::async_close"));
0153
0154 impl.op_r_close.emplace(std::move(*this));
0155 }
0156 if (ec == net::error::operation_aborted)
0157 {
0158
0159 impl.change_status(status::closed);
0160 close_socket(get_lowest_layer(impl.stream()));
0161 return this->complete(cont, ec);
0162 }
0163
0164 impl.rd_block.lock(this);
0165 BOOST_ASIO_CORO_YIELD
0166 {
0167 BOOST_ASIO_HANDLER_LOCATION((
0168 __FILE__, __LINE__,
0169 "websocket::async_close"));
0170
0171 const auto ex = this->get_immediate_executor();
0172 net::dispatch(ex, std::move(*this));
0173 }
0174 BOOST_ASSERT(impl.rd_block.is_locked(this));
0175 if(impl.check_stop_now(ec))
0176 goto upcall;
0177 BOOST_ASSERT(! impl.rd_close);
0178 }
0179
0180
0181 if(impl.rd_remain > 0)
0182 goto read_payload;
0183 for(;;)
0184 {
0185
0186 while(! impl.parse_fh(
0187 impl.rd_fh, impl.rd_buf, ev_))
0188 {
0189 if(ev_)
0190 goto teardown;
0191 BOOST_ASIO_CORO_YIELD
0192 {
0193 BOOST_ASIO_HANDLER_LOCATION((
0194 __FILE__, __LINE__,
0195 "websocket::async_close"));
0196
0197 impl.stream().async_read_some(
0198 impl.rd_buf.prepare(read_size(
0199 impl.rd_buf, impl.rd_buf.max_size())),
0200 beast::detail::bind_continuation(std::move(*this)));
0201 }
0202 impl.rd_buf.commit(bytes_transferred);
0203 if(impl.check_stop_now(ec))
0204 goto upcall;
0205 }
0206 if(detail::is_control(impl.rd_fh.op))
0207 {
0208
0209 if(impl.rd_fh.op != detail::opcode::close)
0210 {
0211 impl.rd_buf.consume(clamp(impl.rd_fh.len));
0212 continue;
0213 }
0214
0215
0216
0217 BOOST_ASSERT(! impl.rd_close);
0218 impl.rd_close = true;
0219 auto const mb = buffers_prefix(
0220 clamp(impl.rd_fh.len),
0221 impl.rd_buf.data());
0222 if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
0223 detail::mask_inplace(mb, impl.rd_key);
0224 detail::read_close(impl.cr, mb, ev_);
0225 if(ev_)
0226 goto teardown;
0227 impl.rd_buf.consume(clamp(impl.rd_fh.len));
0228 goto teardown;
0229 }
0230
0231 read_payload:
0232
0233 while(impl.rd_buf.size() < impl.rd_remain)
0234 {
0235 impl.rd_remain -= impl.rd_buf.size();
0236 impl.rd_buf.consume(impl.rd_buf.size());
0237 BOOST_ASIO_CORO_YIELD
0238 {
0239 BOOST_ASIO_HANDLER_LOCATION((
0240 __FILE__, __LINE__,
0241 "websocket::async_close"));
0242
0243 impl.stream().async_read_some(
0244 impl.rd_buf.prepare(read_size(
0245 impl.rd_buf, impl.rd_buf.max_size())),
0246 beast::detail::bind_continuation(std::move(*this)));
0247 }
0248 impl.rd_buf.commit(bytes_transferred);
0249 if(impl.check_stop_now(ec))
0250 goto upcall;
0251 }
0252 BOOST_ASSERT(impl.rd_buf.size() >= impl.rd_remain);
0253 impl.rd_buf.consume(clamp(impl.rd_remain));
0254 impl.rd_remain = 0;
0255 }
0256
0257 teardown:
0258
0259 BOOST_ASSERT(impl.wr_block.is_locked(this));
0260 using beast::websocket::async_teardown;
0261 BOOST_ASIO_CORO_YIELD
0262 {
0263 BOOST_ASIO_HANDLER_LOCATION((
0264 __FILE__, __LINE__,
0265 "websocket::async_close"));
0266
0267 async_teardown(impl.role, impl.stream(),
0268 beast::detail::bind_continuation(std::move(*this)));
0269 }
0270 BOOST_ASSERT(impl.wr_block.is_locked(this));
0271 if(ec == net::error::eof)
0272 {
0273
0274
0275 ec = {};
0276 }
0277 if(! ec)
0278 {
0279 BOOST_BEAST_ASSIGN_EC(ec, ev_);
0280 }
0281 if(ec)
0282 impl.change_status(status::failed);
0283 else
0284 impl.change_status(status::closed);
0285 impl.close();
0286
0287 upcall:
0288 impl.wr_block.unlock(this);
0289 impl.rd_block.try_unlock(this)
0290 && impl.op_r_rd.maybe_invoke();
0291 impl.op_rd.maybe_invoke()
0292 || impl.op_idle_ping.maybe_invoke()
0293 || impl.op_ping.maybe_invoke()
0294 || impl.op_wr.maybe_invoke();
0295 this->complete(cont, ec);
0296 }
0297 }
0298 };
0299
0300 template<class NextLayer, bool deflateSupported>
0301 struct stream<NextLayer, deflateSupported>::
0302 run_close_op
0303 {
0304 boost::shared_ptr<impl_type> const& self;
0305
0306 using executor_type = typename stream::executor_type;
0307
0308 executor_type
0309 get_executor() const noexcept
0310 {
0311 return self->stream().get_executor();
0312 }
0313
0314 template<class CloseHandler>
0315 void
0316 operator()(
0317 CloseHandler&& h,
0318 close_reason const& cr)
0319 {
0320
0321
0322
0323
0324 static_assert(
0325 beast::detail::is_invocable<CloseHandler,
0326 void(error_code)>::value,
0327 "CloseHandler type requirements not met");
0328
0329 close_op<
0330 typename std::decay<CloseHandler>::type>(
0331 std::forward<CloseHandler>(h),
0332 self,
0333 cr);
0334 }
0335 };
0336
0337
0338
0339 template<class NextLayer, bool deflateSupported>
0340 void
0341 stream<NextLayer, deflateSupported>::
0342 close(close_reason const& cr)
0343 {
0344 static_assert(is_sync_stream<next_layer_type>::value,
0345 "SyncStream type requirements not met");
0346 error_code ec;
0347 close(cr, ec);
0348 if(ec)
0349 BOOST_THROW_EXCEPTION(system_error{ec});
0350 }
0351
0352 template<class NextLayer, bool deflateSupported>
0353 void
0354 stream<NextLayer, deflateSupported>::
0355 close(close_reason const& cr, error_code& ec)
0356 {
0357 static_assert(is_sync_stream<next_layer_type>::value,
0358 "SyncStream type requirements not met");
0359 using beast::detail::clamp;
0360 auto& impl = *impl_;
0361 ec = {};
0362 if(impl.check_stop_now(ec))
0363 return;
0364 BOOST_ASSERT(! impl.rd_close);
0365
0366
0367
0368 BOOST_ASSERT(! impl.wr_close);
0369
0370
0371 {
0372 impl.wr_close = true;
0373 impl.change_status(status::closing);
0374 detail::frame_buffer fb;
0375 impl.template write_close<flat_static_buffer_base>(fb, cr);
0376 net::write(impl.stream(), fb.data(), ec);
0377 if(impl.check_stop_now(ec))
0378 return;
0379 }
0380
0381
0382 error_code ev;
0383 if(impl.rd_remain > 0)
0384 goto read_payload;
0385 for(;;)
0386 {
0387
0388 while(! impl.parse_fh(
0389 impl.rd_fh, impl.rd_buf, ev))
0390 {
0391 if(ev)
0392 {
0393
0394 return do_fail(close_code::none, ev, ec);
0395 }
0396 impl.rd_buf.commit(impl.stream().read_some(
0397 impl.rd_buf.prepare(read_size(
0398 impl.rd_buf, impl.rd_buf.max_size())), ec));
0399 if(impl.check_stop_now(ec))
0400 return;
0401 }
0402
0403 if(detail::is_control(impl.rd_fh.op))
0404 {
0405
0406 if(impl.rd_fh.op != detail::opcode::close)
0407 {
0408 impl.rd_buf.consume(clamp(impl.rd_fh.len));
0409 continue;
0410 }
0411
0412
0413
0414 BOOST_ASSERT(! impl.rd_close);
0415 impl.rd_close = true;
0416 auto const mb = buffers_prefix(
0417 clamp(impl.rd_fh.len),
0418 impl.rd_buf.data());
0419 if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
0420 detail::mask_inplace(mb, impl.rd_key);
0421 detail::read_close(impl.cr, mb, ev);
0422 if(ev)
0423 {
0424
0425 return do_fail(close_code::none, ev, ec);
0426 }
0427 impl.rd_buf.consume(clamp(impl.rd_fh.len));
0428 break;
0429 }
0430
0431 read_payload:
0432
0433 while(impl.rd_buf.size() < impl.rd_remain)
0434 {
0435 impl.rd_remain -= impl.rd_buf.size();
0436 impl.rd_buf.consume(impl.rd_buf.size());
0437 impl.rd_buf.commit(
0438 impl.stream().read_some(
0439 impl.rd_buf.prepare(
0440 read_size(
0441 impl.rd_buf,
0442 impl.rd_buf.max_size())),
0443 ec));
0444 if(impl.check_stop_now(ec))
0445 return;
0446 }
0447 BOOST_ASSERT(
0448 impl.rd_buf.size() >= impl.rd_remain);
0449 impl.rd_buf.consume(clamp(impl.rd_remain));
0450 impl.rd_remain = 0;
0451 }
0452
0453 do_fail(close_code::none, error::closed, ec);
0454 if(ec == error::closed)
0455 ec = {};
0456 }
0457
0458 template<class NextLayer, bool deflateSupported>
0459 template<BOOST_BEAST_ASYNC_TPARAM1 CloseHandler>
0460 BOOST_BEAST_ASYNC_RESULT1(CloseHandler)
0461 stream<NextLayer, deflateSupported>::
0462 async_close(close_reason const& cr, CloseHandler&& handler)
0463 {
0464 static_assert(is_async_stream<next_layer_type>::value,
0465 "AsyncStream type requirements not met");
0466 return net::async_initiate<
0467 CloseHandler,
0468 void(error_code)>(
0469 run_close_op{impl_},
0470 handler,
0471 cr);
0472 }
0473
0474 }
0475 }
0476 }
0477
0478 #endif