TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
12 :
13 : #include <boost/corosio/tcp_socket.hpp>
14 : #include <boost/corosio/shutdown_type.hpp>
15 : #include <boost/corosio/wait_type.hpp>
16 : #include <boost/corosio/native/detail/reactor/reactor_basic_socket.hpp>
17 : #include <boost/corosio/native/detail/reactor/reactor_descriptor_state.hpp>
18 : #include <boost/corosio/detail/dispatch_coro.hpp>
19 : #include <boost/capy/buffers.hpp>
20 :
21 : #include <coroutine>
22 :
23 : #include <errno.h>
24 : #include <sys/socket.h>
25 : #include <sys/uio.h>
26 :
27 : namespace boost::corosio::detail {
28 :
29 : /** CRTP base for reactor-backed stream socket implementations.
30 :
31 : Inherits shared data members and cancel/close/register logic
32 : from reactor_basic_socket. Adds the stream-specific remote
33 : endpoint, shutdown, and I/O dispatch (connect, read, write, wait).
34 :
35 : @tparam Derived The concrete socket type (CRTP).
36 : @tparam Service The backend's socket service type.
37 : @tparam ConnOp The backend's connect op type.
38 : @tparam ReadOp The backend's read op type.
39 : @tparam WriteOp The backend's write op type.
40 : @tparam WaitOp The backend's wait op type.
41 : @tparam DescState The backend's descriptor_state type.
42 : @tparam ImplBase The public vtable base
43 : (tcp_socket::implementation or
44 : local_stream_socket::implementation).
45 : @tparam Endpoint The endpoint type (endpoint or local_endpoint).
46 : */
47 : template<
48 : class Derived,
49 : class Service,
50 : class ConnOp,
51 : class ReadOp,
52 : class WriteOp,
53 : class WaitOp,
54 : class DescState,
55 : class ImplBase = tcp_socket::implementation,
56 : class Endpoint = endpoint>
57 : class reactor_stream_socket
58 : : public reactor_basic_socket<
59 : Derived,
60 : ImplBase,
61 : Service,
62 : DescState,
63 : Endpoint>
64 : {
65 : using base_type = reactor_basic_socket<
66 : Derived,
67 : ImplBase,
68 : Service,
69 : DescState,
70 : Endpoint>;
71 : using self_type = reactor_stream_socket<
72 : Derived, Service, ConnOp, ReadOp, WriteOp, WaitOp,
73 : DescState, ImplBase, Endpoint>;
74 : friend base_type;
75 : friend Derived;
76 :
77 : protected:
78 : // NOLINTNEXTLINE(bugprone-crtp-constructor-accessibility)
79 HIT 13584 : explicit reactor_stream_socket(Service& svc) noexcept : base_type(svc) {}
80 :
81 : protected:
82 : Endpoint remote_endpoint_;
83 :
84 : public:
85 : /// Pending connect operation slot.
86 : ConnOp conn_;
87 :
88 : /// Pending read operation slot.
89 : ReadOp rd_;
90 :
91 : /// Pending write operation slot.
92 : WriteOp wr_;
93 :
94 : /// Pending wait-for-read operation slot.
95 : WaitOp wait_rd_;
96 :
97 : /// Pending wait-for-write operation slot.
98 : WaitOp wait_wr_;
99 :
100 : /// Pending wait-for-error operation slot.
101 : WaitOp wait_er_;
102 :
103 13584 : ~reactor_stream_socket() override = default;
104 :
105 : /// Return the cached remote endpoint.
106 50 : Endpoint remote_endpoint() const noexcept override
107 : {
108 50 : return remote_endpoint_;
109 : }
110 :
111 : // --- Virtual method overrides (satisfy ImplBase pure virtuals) ---
112 :
113 4478 : std::coroutine_handle<> connect(
114 : std::coroutine_handle<> h,
115 : capy::executor_ref ex,
116 : Endpoint ep,
117 : std::stop_token token,
118 : std::error_code* ec) override
119 : {
120 4478 : return do_connect(h, ex, ep, token, ec);
121 : }
122 :
123 216989 : std::coroutine_handle<> read_some(
124 : std::coroutine_handle<> h,
125 : capy::executor_ref ex,
126 : buffer_param param,
127 : std::stop_token token,
128 : std::error_code* ec,
129 : std::size_t* bytes_out) override
130 : {
131 216989 : return do_read_some(h, ex, param, token, ec, bytes_out);
132 : }
133 :
134 216511 : std::coroutine_handle<> write_some(
135 : std::coroutine_handle<> h,
136 : capy::executor_ref ex,
137 : buffer_param param,
138 : std::stop_token token,
139 : std::error_code* ec,
140 : std::size_t* bytes_out) override
141 : {
142 216511 : return do_write_some(h, ex, param, token, ec, bytes_out);
143 : }
144 :
145 26 : std::coroutine_handle<> wait(
146 : std::coroutine_handle<> h,
147 : capy::executor_ref ex,
148 : wait_type w,
149 : std::stop_token token,
150 : std::error_code* ec) override
151 : {
152 26 : return do_wait(h, ex, w, token, ec);
153 : }
154 :
155 : std::error_code
156 14 : shutdown(corosio::shutdown_type what) noexcept override
157 : {
158 14 : return do_shutdown(static_cast<int>(what));
159 : }
160 :
161 193 : void cancel() noexcept override
162 : {
163 193 : this->do_cancel();
164 193 : }
165 :
166 : // --- End virtual overrides ---
167 :
168 : /// Close the socket (non-virtual, called by the service).
169 : void close_socket() noexcept
170 : {
171 : this->do_close_socket();
172 : }
173 :
174 : /** Shut down part or all of the full-duplex connection.
175 :
176 : @param what 0 = receive, 1 = send, 2 = both.
177 : */
178 14 : std::error_code do_shutdown(int what) noexcept
179 : {
180 : int how;
181 14 : switch (what)
182 : {
183 4 : case 0: // shutdown_receive
184 4 : how = SHUT_RD;
185 4 : break;
186 6 : case 1: // shutdown_send
187 6 : how = SHUT_WR;
188 6 : break;
189 4 : case 2: // shutdown_both
190 4 : how = SHUT_RDWR;
191 4 : break;
192 MIS 0 : default:
193 0 : return make_err(EINVAL);
194 : }
195 HIT 14 : if (::shutdown(this->fd_, how) != 0)
196 MIS 0 : return make_err(errno);
197 HIT 14 : return {};
198 : }
199 :
200 : /// Cache local and remote endpoints.
201 8942 : void set_endpoints(Endpoint local, Endpoint remote) noexcept
202 : {
203 8942 : this->local_endpoint_ = std::move(local);
204 8942 : remote_endpoint_ = std::move(remote);
205 8942 : }
206 :
207 : /** Shared connect dispatch.
208 :
209 : Tries the connect syscall speculatively. On synchronous
210 : completion, returns via inline budget or posts through queue.
211 : On EINPROGRESS, registers with the reactor.
212 : */
213 : std::coroutine_handle<> do_connect(
214 : std::coroutine_handle<>,
215 : capy::executor_ref,
216 : Endpoint const&,
217 : std::stop_token const&,
218 : std::error_code*);
219 :
220 : /** Shared scatter-read dispatch.
221 :
222 : Tries readv() speculatively. On success or hard error,
223 : returns via inline budget or posts through queue.
224 : On EAGAIN, registers with the reactor.
225 : */
226 : std::coroutine_handle<> do_read_some(
227 : std::coroutine_handle<>,
228 : capy::executor_ref,
229 : buffer_param,
230 : std::stop_token const&,
231 : std::error_code*,
232 : std::size_t*);
233 :
234 : /** Shared gather-write dispatch.
235 :
236 : Tries the write via WriteOp::write_policy speculatively.
237 : On success or hard error, returns via inline budget or
238 : posts through queue. On EAGAIN, registers with the reactor.
239 : */
240 : std::coroutine_handle<> do_write_some(
241 : std::coroutine_handle<>,
242 : capy::executor_ref,
243 : buffer_param,
244 : std::stop_token const&,
245 : std::error_code*,
246 : std::size_t*);
247 :
248 : /** Shared readiness-wait dispatch.
249 :
250 : Registers a wait op for the requested direction. Does not
251 : perform any I/O syscall — completion is signalled when the
252 : reactor delivers the matching edge event.
253 : */
254 : std::coroutine_handle<> do_wait(
255 : std::coroutine_handle<>,
256 : capy::executor_ref,
257 : wait_type,
258 : std::stop_token const&,
259 : std::error_code*);
260 :
261 : /** Close the socket and cancel pending operations.
262 :
263 : Extends the base do_close_socket() to also reset
264 : the remote endpoint.
265 : */
266 40785 : void do_close_socket() noexcept
267 : {
268 40785 : base_type::do_close_socket();
269 40785 : remote_endpoint_ = Endpoint{};
270 40785 : }
271 :
272 : private:
273 : // CRTP callbacks for reactor_basic_socket cancel/close
274 :
275 : template<class Op>
276 194 : reactor_op_base** op_to_desc_slot(Op& op) noexcept
277 : {
278 194 : if (&op == static_cast<void*>(&conn_))
279 MIS 0 : return &this->desc_state_.connect_op;
280 HIT 194 : if (&op == static_cast<void*>(&rd_))
281 190 : return &this->desc_state_.read_op;
282 4 : if (&op == static_cast<void*>(&wr_))
283 MIS 0 : return &this->desc_state_.write_op;
284 HIT 4 : if (&op == static_cast<void*>(&wait_rd_))
285 2 : return &this->desc_state_.wait_read_op;
286 2 : if (&op == static_cast<void*>(&wait_wr_))
287 MIS 0 : return &this->desc_state_.wait_write_op;
288 HIT 2 : if (&op == static_cast<void*>(&wait_er_))
289 2 : return &this->desc_state_.wait_error_op;
290 MIS 0 : return nullptr;
291 : }
292 :
293 : template<class Op>
294 0 : bool* op_to_cancel_flag(Op& op) noexcept
295 : {
296 0 : if (&op == static_cast<void*>(&conn_))
297 0 : return &this->desc_state_.connect_cancel_pending;
298 0 : if (&op == static_cast<void*>(&rd_))
299 0 : return &this->desc_state_.read_cancel_pending;
300 0 : if (&op == static_cast<void*>(&wr_))
301 0 : return &this->desc_state_.write_cancel_pending;
302 0 : if (&op == static_cast<void*>(&wait_rd_))
303 0 : return &this->desc_state_.wait_read_cancel_pending;
304 0 : if (&op == static_cast<void*>(&wait_wr_))
305 0 : return &this->desc_state_.wait_write_cancel_pending;
306 0 : if (&op == static_cast<void*>(&wait_er_))
307 0 : return &this->desc_state_.wait_error_cancel_pending;
308 0 : return nullptr;
309 : }
310 :
311 : template<class Fn>
312 HIT 40980 : void for_each_op(Fn fn) noexcept
313 : {
314 40980 : fn(conn_);
315 40980 : fn(rd_);
316 40980 : fn(wr_);
317 40980 : fn(wait_rd_);
318 40980 : fn(wait_wr_);
319 40980 : fn(wait_er_);
320 40980 : }
321 :
322 : template<class Fn>
323 40980 : void for_each_desc_entry(Fn fn) noexcept
324 : {
325 40980 : fn(conn_, this->desc_state_.connect_op);
326 40980 : fn(rd_, this->desc_state_.read_op);
327 40980 : fn(wr_, this->desc_state_.write_op);
328 40980 : fn(wait_rd_, this->desc_state_.wait_read_op);
329 40980 : fn(wait_wr_, this->desc_state_.wait_write_op);
330 40980 : fn(wait_er_, this->desc_state_.wait_error_op);
331 40980 : }
332 : };
333 :
334 : template<
335 : class Derived,
336 : class Service,
337 : class ConnOp,
338 : class ReadOp,
339 : class WriteOp,
340 : class WaitOp,
341 : class DescState,
342 : class ImplBase,
343 : class Endpoint>
344 : std::coroutine_handle<>
345 4478 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, WaitOp, DescState, ImplBase, Endpoint>::
346 : do_connect(
347 : std::coroutine_handle<> h,
348 : capy::executor_ref ex,
349 : Endpoint const& ep,
350 : std::stop_token const& token,
351 : std::error_code* ec)
352 : {
353 4478 : auto& op = conn_;
354 :
355 4478 : sockaddr_storage storage{};
356 4478 : socklen_t addrlen = to_sockaddr(ep, socket_family(this->fd_), storage);
357 : int result =
358 4478 : ::connect(this->fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
359 :
360 4478 : if (result == 0)
361 : {
362 18 : sockaddr_storage local_storage{};
363 18 : socklen_t local_len = sizeof(local_storage);
364 18 : if (::getsockname(
365 : this->fd_, reinterpret_cast<sockaddr*>(&local_storage),
366 18 : &local_len) == 0)
367 MIS 0 : this->local_endpoint_ =
368 HIT 18 : from_sockaddr_as(local_storage, local_len, Endpoint{});
369 18 : remote_endpoint_ = ep;
370 : }
371 :
372 4478 : if (result == 0 || errno != EINPROGRESS)
373 : {
374 22 : int err = (result < 0) ? errno : 0;
375 22 : if (this->svc_.scheduler().try_consume_inline_budget())
376 : {
377 MIS 0 : *ec = err ? make_err(err) : std::error_code{};
378 0 : op.cont_op.cont.h = h;
379 0 : return dispatch_coro(ex, op.cont_op.cont);
380 : }
381 HIT 22 : op.reset();
382 22 : op.h = h;
383 22 : op.ex = ex;
384 22 : op.ec_out = ec;
385 22 : op.fd = this->fd_;
386 22 : op.target_endpoint = ep;
387 22 : op.start(token, static_cast<Derived*>(this));
388 22 : op.impl_ptr = this->shared_from_this();
389 22 : op.complete(err, 0);
390 22 : this->svc_.post(&op);
391 22 : return std::noop_coroutine();
392 : }
393 :
394 : // EINPROGRESS — register with reactor
395 4456 : op.reset();
396 4456 : op.h = h;
397 4456 : op.ex = ex;
398 4456 : op.ec_out = ec;
399 4456 : op.fd = this->fd_;
400 4456 : op.target_endpoint = ep;
401 4456 : op.start(token, static_cast<Derived*>(this));
402 4456 : op.impl_ptr = this->shared_from_this();
403 :
404 4456 : this->register_op(
405 4456 : op, this->desc_state_.connect_op, this->desc_state_.write_ready,
406 4456 : this->desc_state_.connect_cancel_pending, true);
407 4456 : return std::noop_coroutine();
408 : }
409 :
410 : template<
411 : class Derived,
412 : class Service,
413 : class ConnOp,
414 : class ReadOp,
415 : class WriteOp,
416 : class WaitOp,
417 : class DescState,
418 : class ImplBase,
419 : class Endpoint>
420 : std::coroutine_handle<>
421 216989 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, WaitOp, DescState, ImplBase, Endpoint>::
422 : do_read_some(
423 : std::coroutine_handle<> h,
424 : capy::executor_ref ex,
425 : buffer_param param,
426 : std::stop_token const& token,
427 : std::error_code* ec,
428 : std::size_t* bytes_out)
429 : {
430 216989 : auto& op = rd_;
431 216989 : op.reset();
432 :
433 216989 : capy::mutable_buffer bufs[ReadOp::max_buffers];
434 216989 : op.iovec_count = static_cast<int>(param.copy_to(bufs, ReadOp::max_buffers));
435 :
436 216989 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
437 : {
438 2 : op.empty_buffer_read = true;
439 2 : op.h = h;
440 2 : op.ex = ex;
441 2 : op.ec_out = ec;
442 2 : op.bytes_out = bytes_out;
443 2 : op.start(token, static_cast<Derived*>(this));
444 2 : op.impl_ptr = this->shared_from_this();
445 2 : op.complete(0, 0);
446 2 : this->svc_.post(&op);
447 2 : return std::noop_coroutine();
448 : }
449 :
450 433982 : for (int i = 0; i < op.iovec_count; ++i)
451 : {
452 216995 : op.iovecs[i].iov_base = bufs[i].data();
453 216995 : op.iovecs[i].iov_len = bufs[i].size();
454 : }
455 :
456 : // Speculative read; for the single-buffer case use recv() so the
457 : // kernel skips the readv iov_iter setup.
458 : ssize_t n;
459 216987 : if (op.iovec_count == 1)
460 : {
461 : do
462 : {
463 216983 : n = ::recv(this->fd_, bufs[0].data(), bufs[0].size(), 0);
464 : }
465 216983 : while (n < 0 && errno == EINTR);
466 : }
467 : else
468 : {
469 : do
470 : {
471 4 : n = ::readv(this->fd_, op.iovecs, op.iovec_count);
472 : }
473 4 : while (n < 0 && errno == EINTR);
474 : }
475 :
476 216987 : if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
477 : {
478 216423 : int err = (n < 0) ? errno : 0;
479 216423 : auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
480 :
481 216423 : if (this->svc_.scheduler().try_consume_inline_budget())
482 : {
483 173169 : if (err)
484 MIS 0 : *ec = make_err(err);
485 HIT 173169 : else if (n == 0)
486 10 : *ec = capy::error::eof;
487 : else
488 173159 : *ec = {};
489 173169 : *bytes_out = bytes;
490 173169 : op.cont_op.cont.h = h;
491 173169 : return dispatch_coro(ex, op.cont_op.cont);
492 : }
493 43254 : op.h = h;
494 43254 : op.ex = ex;
495 43254 : op.ec_out = ec;
496 43254 : op.bytes_out = bytes_out;
497 43254 : op.start(token, static_cast<Derived*>(this));
498 43254 : op.impl_ptr = this->shared_from_this();
499 43254 : op.complete(err, bytes);
500 43254 : this->svc_.post(&op);
501 43254 : return std::noop_coroutine();
502 : }
503 :
504 : // EAGAIN — register with reactor
505 564 : op.h = h;
506 564 : op.ex = ex;
507 564 : op.ec_out = ec;
508 564 : op.bytes_out = bytes_out;
509 564 : op.fd = this->fd_;
510 564 : op.start(token, static_cast<Derived*>(this));
511 564 : op.impl_ptr = this->shared_from_this();
512 :
513 564 : this->register_op(
514 564 : op, this->desc_state_.read_op, this->desc_state_.read_ready,
515 564 : this->desc_state_.read_cancel_pending);
516 564 : return std::noop_coroutine();
517 : }
518 :
519 : template<
520 : class Derived,
521 : class Service,
522 : class ConnOp,
523 : class ReadOp,
524 : class WriteOp,
525 : class WaitOp,
526 : class DescState,
527 : class ImplBase,
528 : class Endpoint>
529 : std::coroutine_handle<>
530 216511 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, WaitOp, DescState, ImplBase, Endpoint>::
531 : do_write_some(
532 : std::coroutine_handle<> h,
533 : capy::executor_ref ex,
534 : buffer_param param,
535 : std::stop_token const& token,
536 : std::error_code* ec,
537 : std::size_t* bytes_out)
538 : {
539 216511 : auto& op = wr_;
540 216511 : op.reset();
541 :
542 216511 : capy::mutable_buffer bufs[WriteOp::max_buffers];
543 216511 : op.iovec_count =
544 216511 : static_cast<int>(param.copy_to(bufs, WriteOp::max_buffers));
545 :
546 216511 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
547 : {
548 2 : op.h = h;
549 2 : op.ex = ex;
550 2 : op.ec_out = ec;
551 2 : op.bytes_out = bytes_out;
552 2 : op.start(token, static_cast<Derived*>(this));
553 2 : op.impl_ptr = this->shared_from_this();
554 2 : op.complete(0, 0);
555 2 : this->svc_.post(&op);
556 2 : return std::noop_coroutine();
557 : }
558 :
559 433026 : for (int i = 0; i < op.iovec_count; ++i)
560 : {
561 216517 : op.iovecs[i].iov_base = bufs[i].data();
562 216517 : op.iovecs[i].iov_len = bufs[i].size();
563 : }
564 :
565 : // Speculative write; the single-buffer case dispatches to a
566 : // backend-specific fast path so the kernel skips msghdr/iov_iter
567 : // setup (and so each backend can pick the right SIGPIPE strategy).
568 : ssize_t n;
569 216509 : if (op.iovec_count == 1)
570 : {
571 433010 : n = WriteOp::write_policy::write_one(
572 216505 : this->fd_, bufs[0].data(), bufs[0].size());
573 : }
574 : else
575 : {
576 4 : n = WriteOp::write_policy::write(
577 4 : this->fd_, op.iovecs, op.iovec_count);
578 : }
579 :
580 216509 : if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
581 : {
582 216496 : int err = (n < 0) ? errno : 0;
583 216496 : auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
584 :
585 216496 : if (this->svc_.scheduler().try_consume_inline_budget())
586 : {
587 173199 : *ec = err ? make_err(err) : std::error_code{};
588 173199 : *bytes_out = bytes;
589 173199 : op.cont_op.cont.h = h;
590 173199 : return dispatch_coro(ex, op.cont_op.cont);
591 : }
592 43297 : op.h = h;
593 43297 : op.ex = ex;
594 43297 : op.ec_out = ec;
595 43297 : op.bytes_out = bytes_out;
596 43297 : op.start(token, static_cast<Derived*>(this));
597 43297 : op.impl_ptr = this->shared_from_this();
598 43297 : op.complete(err, bytes);
599 43297 : this->svc_.post(&op);
600 43297 : return std::noop_coroutine();
601 : }
602 :
603 : // EAGAIN — register with reactor
604 13 : op.h = h;
605 13 : op.ex = ex;
606 13 : op.ec_out = ec;
607 13 : op.bytes_out = bytes_out;
608 13 : op.fd = this->fd_;
609 13 : op.start(token, static_cast<Derived*>(this));
610 13 : op.impl_ptr = this->shared_from_this();
611 :
612 13 : this->register_op(
613 13 : op, this->desc_state_.write_op, this->desc_state_.write_ready,
614 13 : this->desc_state_.write_cancel_pending, true);
615 13 : return std::noop_coroutine();
616 : }
617 :
618 : template<
619 : class Derived,
620 : class Service,
621 : class ConnOp,
622 : class ReadOp,
623 : class WriteOp,
624 : class WaitOp,
625 : class DescState,
626 : class ImplBase,
627 : class Endpoint>
628 : std::coroutine_handle<>
629 26 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, WaitOp, DescState, ImplBase, Endpoint>::
630 : do_wait(
631 : std::coroutine_handle<> h,
632 : capy::executor_ref ex,
633 : wait_type w,
634 : std::stop_token const& token,
635 : std::error_code* ec)
636 : {
637 : // wait_type::write completes immediately on a connected socket,
638 : // matching asio's behavior on IOCP. Corosio's reactor backends use
639 : // edge-triggered EPOLLOUT, which would never fire on an already-
640 : // writable socket; an immediate completion is also a more useful
641 : // contract than parking until a non-writable -> writable transition.
642 26 : if (w == wait_type::write)
643 : {
644 8 : auto& op = wait_wr_;
645 8 : if (this->svc_.scheduler().try_consume_inline_budget())
646 : {
647 MIS 0 : *ec = std::error_code{};
648 0 : op.cont_op.cont.h = h;
649 0 : return dispatch_coro(ex, op.cont_op.cont);
650 : }
651 HIT 8 : op.reset();
652 8 : op.wait_event = reactor_event_write;
653 8 : op.h = h;
654 8 : op.ex = ex;
655 8 : op.ec_out = ec;
656 8 : op.fd = this->fd_;
657 8 : op.start(token, static_cast<Derived*>(this));
658 8 : op.impl_ptr = this->shared_from_this();
659 8 : op.complete(0, 0);
660 8 : this->svc_.post(&op);
661 8 : return std::noop_coroutine();
662 : }
663 :
664 : // Pick refs up-front to avoid duplicating the register_op call.
665 : WaitOp* op_ptr;
666 : reactor_op_base** desc_slot_ptr;
667 : bool* ready_flag_ptr;
668 : bool* cancel_flag_ptr;
669 : std::uint32_t event;
670 :
671 18 : bool dummy_ready = false; // placeholder for error waits (no cached edge)
672 :
673 18 : if (w == wait_type::read)
674 : {
675 8 : op_ptr = &wait_rd_;
676 8 : desc_slot_ptr = &this->desc_state_.wait_read_op;
677 8 : ready_flag_ptr = &this->desc_state_.read_ready;
678 8 : cancel_flag_ptr = &this->desc_state_.wait_read_cancel_pending;
679 8 : event = reactor_event_read;
680 : }
681 : else // wait_type::error
682 : {
683 10 : op_ptr = &wait_er_;
684 10 : desc_slot_ptr = &this->desc_state_.wait_error_op;
685 10 : ready_flag_ptr = &dummy_ready;
686 10 : cancel_flag_ptr = &this->desc_state_.wait_error_cancel_pending;
687 10 : event = reactor_event_error;
688 : }
689 :
690 18 : auto& op = *op_ptr;
691 18 : op.reset();
692 18 : op.wait_event = event;
693 18 : op.h = h;
694 18 : op.ex = ex;
695 18 : op.ec_out = ec;
696 18 : op.fd = this->fd_;
697 18 : op.start(token, static_cast<Derived*>(this));
698 18 : op.impl_ptr = this->shared_from_this();
699 :
700 18 : this->register_op(op, *desc_slot_ptr, *ready_flag_ptr, *cancel_flag_ptr,
701 : false);
702 18 : return std::noop_coroutine();
703 : }
704 :
705 : } // namespace boost::corosio::detail
706 :
707 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
|