File indexing completed on 2025-01-18 09:29:33
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 #ifndef BOOST_BEAST_WEBSOCKET_IMPL_CLOSE_HPP
0011 #define BOOST_BEAST_WEBSOCKET_IMPL_CLOSE_HPP
0012
0013 #include <boost/beast/websocket/teardown.hpp>
0014 #include <boost/beast/websocket/detail/mask.hpp>
0015 #include <boost/beast/websocket/impl/stream_impl.hpp>
0016 #include <boost/beast/core/async_base.hpp>
0017 #include <boost/beast/core/flat_static_buffer.hpp>
0018 #include <boost/beast/core/stream_traits.hpp>
0019 #include <boost/beast/core/detail/bind_continuation.hpp>
0020 #include <boost/asio/coroutine.hpp>
0021 #include <boost/asio/dispatch.hpp>
0022 #include <boost/throw_exception.hpp>
0023 #include <memory>
0024
0025 namespace boost {
0026 namespace beast {
0027 namespace websocket {
0028
0029
0030
0031
0032
0033
0034
0035
0036 template<class NextLayer, bool deflateSupported>
0037 template<class Handler>
0038 class stream<NextLayer, deflateSupported>::close_op
0039 : public beast::stable_async_base<
0040 Handler, beast::executor_type<stream>>
0041 , public asio::coroutine
0042 {
0043 boost::weak_ptr<impl_type> wp_;
0044 error_code ev_;
0045 detail::frame_buffer& fb_;
0046
0047 public:
0048 static constexpr int id = 5;
0049
0050 template<class Handler_>
0051 close_op(
0052 Handler_&& h,
0053 boost::shared_ptr<impl_type> const& sp,
0054 close_reason const& cr)
0055 : stable_async_base<Handler,
0056 beast::executor_type<stream>>(
0057 std::forward<Handler_>(h),
0058 sp->stream().get_executor())
0059 , wp_(sp)
0060 , fb_(beast::allocate_stable<
0061 detail::frame_buffer>(*this))
0062 {
0063
0064 sp->template write_close<
0065 flat_static_buffer_base>(fb_, cr);
0066 (*this)({}, 0, false);
0067 }
0068
0069 void
0070 operator()(
0071 error_code ec = {},
0072 std::size_t bytes_transferred = 0,
0073 bool cont = true)
0074 {
0075 using beast::detail::clamp;
0076 auto sp = wp_.lock();
0077 if(! sp)
0078 {
0079 BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
0080 return this->complete(cont, ec);
0081 }
0082 auto& impl = *sp;
0083 BOOST_ASIO_CORO_REENTER(*this)
0084 {
0085
0086 if(! impl.wr_block.try_lock(this))
0087 {
0088 BOOST_ASIO_CORO_YIELD
0089 {
0090 BOOST_ASIO_HANDLER_LOCATION((
0091 __FILE__, __LINE__,
0092 "websocket::async_close"));
0093 this->set_allowed_cancellation(net::cancellation_type::all);
0094 impl.op_close.emplace(std::move(*this),
0095 net::cancellation_type::all);
0096 }
0097
0098 if (ec == net::error::operation_aborted)
0099 return this->complete(cont, ec);
0100 this->set_allowed_cancellation(net::cancellation_type::terminal);
0101
0102 impl.wr_block.lock(this);
0103 BOOST_ASIO_CORO_YIELD
0104 {
0105 BOOST_ASIO_HANDLER_LOCATION((
0106 __FILE__, __LINE__,
0107 "websocket::async_close"));
0108
0109 const auto ex = this->get_immediate_executor();
0110 net::dispatch(ex, std::move(*this));
0111 }
0112 BOOST_ASSERT(impl.wr_block.is_locked(this));
0113 }
0114 if(impl.check_stop_now(ec))
0115 goto upcall;
0116
0117
0118
0119 BOOST_ASSERT(! impl.wr_close);
0120
0121
0122 impl.wr_close = true;
0123 impl.change_status(status::closing);
0124 impl.update_timer(this->get_executor());
0125 BOOST_ASIO_CORO_YIELD
0126 {
0127 BOOST_ASIO_HANDLER_LOCATION((
0128 __FILE__, __LINE__,
0129 "websocket::async_close"));
0130
0131 net::async_write(impl.stream(), fb_.data(),
0132 beast::detail::bind_continuation(std::move(*this)));
0133 }
0134 if(impl.check_stop_now(ec))
0135 goto upcall;
0136
0137 if(impl.rd_close)
0138 {
0139
0140
0141
0142 goto teardown;
0143 }
0144
0145
0146 if(! impl.rd_block.try_lock(this))
0147 {
0148 BOOST_ASIO_CORO_YIELD
0149 {
0150 BOOST_ASIO_HANDLER_LOCATION((
0151 __FILE__, __LINE__,
0152 "websocket::async_close"));
0153
0154 impl.op_r_close.emplace(std::move(*this));
0155 }
0156 if (ec == net::error::operation_aborted)
0157 {
0158
0159 impl.change_status(status::closed);
0160 close_socket(get_lowest_layer(impl.stream()));
0161 return this->complete(cont, ec);
0162 }
0163
0164 impl.rd_block.lock(this);
0165 BOOST_ASIO_CORO_YIELD
0166 {
0167 BOOST_ASIO_HANDLER_LOCATION((
0168 __FILE__, __LINE__,
0169 "websocket::async_close"));
0170
0171 const auto ex = this->get_immediate_executor();
0172 net::dispatch(ex, std::move(*this));
0173 }
0174 BOOST_ASSERT(impl.rd_block.is_locked(this));
0175 if(impl.check_stop_now(ec))
0176 goto upcall;
0177 BOOST_ASSERT(! impl.rd_close);
0178 }
0179
0180
0181
0182 if(impl.rd_remain > 0)
0183 goto read_payload;
0184 for(;;)
0185 {
0186
0187 while(! impl.parse_fh(
0188 impl.rd_fh, impl.rd_buf, ev_))
0189 {
0190 if(ev_)
0191 goto teardown;
0192 BOOST_ASIO_CORO_YIELD
0193 {
0194 BOOST_ASIO_HANDLER_LOCATION((
0195 __FILE__, __LINE__,
0196 "websocket::async_close"));
0197
0198 impl.stream().async_read_some(
0199 impl.rd_buf.prepare(read_size(
0200 impl.rd_buf, impl.rd_buf.max_size())),
0201 beast::detail::bind_continuation(std::move(*this)));
0202 }
0203 impl.rd_buf.commit(bytes_transferred);
0204 if(impl.check_stop_now(ec))
0205 goto upcall;
0206 }
0207 if(detail::is_control(impl.rd_fh.op))
0208 {
0209
0210 if(impl.rd_fh.op != detail::opcode::close)
0211 {
0212 impl.rd_buf.consume(clamp(impl.rd_fh.len));
0213 continue;
0214 }
0215
0216
0217
0218 BOOST_ASSERT(! impl.rd_close);
0219 impl.rd_close = true;
0220 auto const mb = buffers_prefix(
0221 clamp(impl.rd_fh.len),
0222 impl.rd_buf.data());
0223 if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
0224 detail::mask_inplace(mb, impl.rd_key);
0225 detail::read_close(impl.cr, mb, ev_);
0226 if(ev_)
0227 goto teardown;
0228 impl.rd_buf.consume(clamp(impl.rd_fh.len));
0229 goto teardown;
0230 }
0231
0232 read_payload:
0233
0234 while(impl.rd_buf.size() < impl.rd_remain)
0235 {
0236 impl.rd_remain -= impl.rd_buf.size();
0237 impl.rd_buf.consume(impl.rd_buf.size());
0238 BOOST_ASIO_CORO_YIELD
0239 {
0240 BOOST_ASIO_HANDLER_LOCATION((
0241 __FILE__, __LINE__,
0242 "websocket::async_close"));
0243
0244 impl.stream().async_read_some(
0245 impl.rd_buf.prepare(read_size(
0246 impl.rd_buf, impl.rd_buf.max_size())),
0247 beast::detail::bind_continuation(std::move(*this)));
0248 }
0249 impl.rd_buf.commit(bytes_transferred);
0250 if(impl.check_stop_now(ec))
0251 goto upcall;
0252 }
0253 BOOST_ASSERT(impl.rd_buf.size() >= impl.rd_remain);
0254 impl.rd_buf.consume(clamp(impl.rd_remain));
0255 impl.rd_remain = 0;
0256 }
0257
0258 teardown:
0259
0260 BOOST_ASSERT(impl.wr_block.is_locked(this));
0261 using beast::websocket::async_teardown;
0262 BOOST_ASIO_CORO_YIELD
0263 {
0264 BOOST_ASIO_HANDLER_LOCATION((
0265 __FILE__, __LINE__,
0266 "websocket::async_close"));
0267
0268 async_teardown(impl.role, impl.stream(),
0269 beast::detail::bind_continuation(std::move(*this)));
0270 }
0271 BOOST_ASSERT(impl.wr_block.is_locked(this));
0272 if(ec == net::error::eof)
0273 {
0274
0275
0276 ec = {};
0277 }
0278 if(! ec)
0279 {
0280 BOOST_BEAST_ASSIGN_EC(ec, ev_);
0281 }
0282 if(ec)
0283 impl.change_status(status::failed);
0284 else
0285 impl.change_status(status::closed);
0286 impl.close();
0287
0288 upcall:
0289 impl.wr_block.unlock(this);
0290 impl.rd_block.try_unlock(this)
0291 && impl.op_r_rd.maybe_invoke();
0292 impl.op_rd.maybe_invoke()
0293 || impl.op_idle_ping.maybe_invoke()
0294 || impl.op_ping.maybe_invoke()
0295 || impl.op_wr.maybe_invoke();
0296 this->complete(cont, ec);
0297 }
0298 }
0299 };
0300
0301 template<class NextLayer, bool deflateSupported>
0302 struct stream<NextLayer, deflateSupported>::
0303 run_close_op
0304 {
0305 template<class CloseHandler>
0306 void
0307 operator()(
0308 CloseHandler&& h,
0309 boost::shared_ptr<impl_type> const& sp,
0310 close_reason const& cr)
0311 {
0312
0313
0314
0315
0316 static_assert(
0317 beast::detail::is_invocable<CloseHandler,
0318 void(error_code)>::value,
0319 "CloseHandler type requirements not met");
0320
0321 close_op<
0322 typename std::decay<CloseHandler>::type>(
0323 std::forward<CloseHandler>(h),
0324 sp,
0325 cr);
0326 }
0327 };
0328
0329
0330
0331 template<class NextLayer, bool deflateSupported>
0332 void
0333 stream<NextLayer, deflateSupported>::
0334 close(close_reason const& cr)
0335 {
0336 static_assert(is_sync_stream<next_layer_type>::value,
0337 "SyncStream type requirements not met");
0338 error_code ec;
0339 close(cr, ec);
0340 if(ec)
0341 BOOST_THROW_EXCEPTION(system_error{ec});
0342 }
0343
0344 template<class NextLayer, bool deflateSupported>
0345 void
0346 stream<NextLayer, deflateSupported>::
0347 close(close_reason const& cr, error_code& ec)
0348 {
0349 static_assert(is_sync_stream<next_layer_type>::value,
0350 "SyncStream type requirements not met");
0351 using beast::detail::clamp;
0352 auto& impl = *impl_;
0353 ec = {};
0354 if(impl.check_stop_now(ec))
0355 return;
0356 BOOST_ASSERT(! impl.rd_close);
0357
0358
0359
0360 BOOST_ASSERT(! impl.wr_close);
0361
0362
0363 {
0364 impl.wr_close = true;
0365 impl.change_status(status::closing);
0366 detail::frame_buffer fb;
0367 impl.template write_close<flat_static_buffer_base>(fb, cr);
0368 net::write(impl.stream(), fb.data(), ec);
0369 if(impl.check_stop_now(ec))
0370 return;
0371 }
0372
0373
0374 error_code ev;
0375 if(impl.rd_remain > 0)
0376 goto read_payload;
0377 for(;;)
0378 {
0379
0380 while(! impl.parse_fh(
0381 impl.rd_fh, impl.rd_buf, ev))
0382 {
0383 if(ev)
0384 {
0385
0386 return do_fail(close_code::none, ev, ec);
0387 }
0388 impl.rd_buf.commit(impl.stream().read_some(
0389 impl.rd_buf.prepare(read_size(
0390 impl.rd_buf, impl.rd_buf.max_size())), ec));
0391 if(impl.check_stop_now(ec))
0392 return;
0393 }
0394
0395 if(detail::is_control(impl.rd_fh.op))
0396 {
0397
0398 if(impl.rd_fh.op != detail::opcode::close)
0399 {
0400 impl.rd_buf.consume(clamp(impl.rd_fh.len));
0401 continue;
0402 }
0403
0404
0405
0406 BOOST_ASSERT(! impl.rd_close);
0407 impl.rd_close = true;
0408 auto const mb = buffers_prefix(
0409 clamp(impl.rd_fh.len),
0410 impl.rd_buf.data());
0411 if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
0412 detail::mask_inplace(mb, impl.rd_key);
0413 detail::read_close(impl.cr, mb, ev);
0414 if(ev)
0415 {
0416
0417 return do_fail(close_code::none, ev, ec);
0418 }
0419 impl.rd_buf.consume(clamp(impl.rd_fh.len));
0420 break;
0421 }
0422
0423 read_payload:
0424
0425 while(impl.rd_buf.size() < impl.rd_remain)
0426 {
0427 impl.rd_remain -= impl.rd_buf.size();
0428 impl.rd_buf.consume(impl.rd_buf.size());
0429 impl.rd_buf.commit(
0430 impl.stream().read_some(
0431 impl.rd_buf.prepare(
0432 read_size(
0433 impl.rd_buf,
0434 impl.rd_buf.max_size())),
0435 ec));
0436 if(impl.check_stop_now(ec))
0437 return;
0438 }
0439 BOOST_ASSERT(
0440 impl.rd_buf.size() >= impl.rd_remain);
0441 impl.rd_buf.consume(clamp(impl.rd_remain));
0442 impl.rd_remain = 0;
0443 }
0444
0445 do_fail(close_code::none, error::closed, ec);
0446 if(ec == error::closed)
0447 ec = {};
0448 }
0449
0450 template<class NextLayer, bool deflateSupported>
0451 template<BOOST_BEAST_ASYNC_TPARAM1 CloseHandler>
0452 BOOST_BEAST_ASYNC_RESULT1(CloseHandler)
0453 stream<NextLayer, deflateSupported>::
0454 async_close(close_reason const& cr, CloseHandler&& handler)
0455 {
0456 static_assert(is_async_stream<next_layer_type>::value,
0457 "AsyncStream type requirements not met");
0458 return net::async_initiate<
0459 CloseHandler,
0460 void(error_code)>(
0461 run_close_op{},
0462 handler,
0463 impl_,
0464 cr);
0465 }
0466
0467 }
0468 }
0469 }
0470
0471 #endif