TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
12 :
13 : #include <boost/corosio/detail/dispatch_coro.hpp>
14 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
15 : #include <boost/corosio/native/detail/make_err.hpp>
16 : #include <boost/corosio/io/io_object.hpp>
17 :
18 : #include <coroutine>
19 : #include <mutex>
20 : #include <utility>
21 :
22 : #include <netinet/in.h>
23 : #include <sys/socket.h>
24 : #include <unistd.h>
25 :
26 : namespace boost::corosio::detail {
27 :
28 : /** Complete a base read/write operation.
29 :
30 : Translates the recorded errno and cancellation state into
31 : an error_code, stores the byte count, then resumes the
32 : caller via symmetric transfer.
33 :
34 : @tparam Op The concrete operation type.
35 : @param op The operation to complete.
36 : */
37 : template<typename Op>
38 : void
39 HIT 87184 : complete_io_op(Op& op)
40 : {
41 87184 : op.stop_cb.reset();
42 87184 : op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
43 :
44 87184 : if (op.cancelled.load(std::memory_order_acquire))
45 303 : *op.ec_out = capy::error::canceled;
46 86881 : else if (op.errn != 0)
47 MIS 0 : *op.ec_out = make_err(op.errn);
48 HIT 86881 : else if (op.is_read_operation() && op.bytes_transferred == 0)
49 3 : *op.ec_out = capy::error::eof;
50 : else
51 86878 : *op.ec_out = {};
52 :
53 87184 : *op.bytes_out = op.bytes_transferred;
54 :
55 87184 : op.cont_op.cont.h = op.h;
56 87184 : capy::executor_ref saved_ex(op.ex);
57 87184 : auto prevent = std::move(op.impl_ptr);
58 87184 : dispatch_coro(saved_ex, op.cont_op.cont).resume();
59 87184 : }
60 :
61 : /** Complete a datagram recv operation (connected mode).
62 :
63 : Like complete_io_op but does not translate zero bytes into
64 : EOF. Zero-length datagrams are valid and should be reported
65 : as success with 0 bytes transferred.
66 :
67 : @param op The operation to complete.
68 : */
69 : template<typename Op>
70 : void
71 : complete_dgram_recv_op(Op& op)
72 : {
73 : op.stop_cb.reset();
74 : op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
75 :
76 : if (op.cancelled.load(std::memory_order_acquire))
77 : *op.ec_out = capy::error::canceled;
78 : else if (op.errn != 0)
79 : *op.ec_out = make_err(op.errn);
80 : else
81 : *op.ec_out = {};
82 :
83 : *op.bytes_out = op.bytes_transferred;
84 :
85 : op.cont_op.cont.h = op.h;
86 : capy::executor_ref saved_ex(op.ex);
87 : auto prevent = std::move(op.impl_ptr);
88 : dispatch_coro(saved_ex, op.cont_op.cont).resume();
89 : }
90 :
91 : /** Complete a wait operation.
92 :
93 : Wait operations report only an error_code — no bytes_transferred,
94 : no EOF translation. Used for socket and acceptor wait() awaitables;
95 : picks the impl pointer set by start() to reach the scheduler.
96 :
97 : @tparam Op The concrete wait operation type.
98 : @param op The operation to complete.
99 : */
100 : template<typename Op>
101 : void
102 56 : complete_wait_op(Op& op)
103 : {
104 56 : op.stop_cb.reset();
105 56 : if (op.socket_impl_)
106 44 : op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
107 : else
108 12 : op.acceptor_impl_->desc_state_.scheduler_->reset_inline_budget();
109 :
110 56 : if (op.cancelled.load(std::memory_order_acquire))
111 26 : *op.ec_out = capy::error::canceled;
112 30 : else if (op.errn != 0)
113 MIS 0 : *op.ec_out = make_err(op.errn);
114 : else
115 HIT 30 : *op.ec_out = {};
116 :
117 56 : op.cont_op.cont.h = op.h;
118 56 : capy::executor_ref saved_ex(op.ex);
119 56 : auto prevent = std::move(op.impl_ptr);
120 56 : dispatch_coro(saved_ex, op.cont_op.cont).resume();
121 56 : }
122 :
123 : /** Complete a connect operation with endpoint caching.
124 :
125 : On success, queries the local endpoint via getsockname and
126 : caches both endpoints in the socket impl. Then resumes the
127 : caller via symmetric transfer.
128 :
129 : @tparam Op The concrete connect operation type.
130 : @param op The operation to complete.
131 : */
132 : template<typename Op>
133 : void
134 4502 : complete_connect_op(Op& op)
135 : {
136 4502 : op.stop_cb.reset();
137 4502 : op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
138 :
139 4502 : bool success =
140 4502 : (op.errn == 0 && !op.cancelled.load(std::memory_order_acquire));
141 :
142 4502 : if (success && op.socket_impl_)
143 : {
144 : using ep_type = decltype(op.target_endpoint);
145 4475 : ep_type local_ep;
146 4475 : sockaddr_storage local_storage{};
147 4475 : socklen_t local_len = sizeof(local_storage);
148 4475 : if (::getsockname(
149 : op.fd, reinterpret_cast<sockaddr*>(&local_storage),
150 4475 : &local_len) == 0)
151 4455 : local_ep =
152 4475 : from_sockaddr_as(local_storage, local_len, ep_type{});
153 4475 : op.socket_impl_->set_endpoints(local_ep, op.target_endpoint);
154 : }
155 :
156 4502 : if (op.cancelled.load(std::memory_order_acquire))
157 2 : *op.ec_out = capy::error::canceled;
158 4500 : else if (op.errn != 0)
159 25 : *op.ec_out = make_err(op.errn);
160 : else
161 4475 : *op.ec_out = {};
162 :
163 4502 : op.cont_op.cont.h = op.h;
164 4502 : capy::executor_ref saved_ex(op.ex);
165 4502 : auto prevent = std::move(op.impl_ptr);
166 4502 : dispatch_coro(saved_ex, op.cont_op.cont).resume();
167 4502 : }
168 :
169 : /** Construct and register a peer socket from an accepted fd.
170 :
171 : Creates a new socket impl via the acceptor's associated
172 : socket service, registers it with the scheduler, and caches
173 : the local and remote endpoints.
174 :
175 : @tparam SocketImpl The concrete socket implementation type.
176 : @tparam AcceptorImpl The concrete acceptor implementation type.
177 : @param acceptor_impl The acceptor that accepted the connection.
178 : @param accepted_fd The accepted file descriptor (set to -1 on success).
179 : @param peer_storage The peer address from accept().
180 : @param impl_out Output pointer for the new socket impl.
181 : @param ec_out Output pointer for any error.
182 : @return True on success, false on failure.
183 : */
184 : template<typename SocketImpl, typename AcceptorImpl>
185 : bool
186 4445 : setup_accepted_socket(
187 : AcceptorImpl* acceptor_impl,
188 : int& accepted_fd,
189 : sockaddr_storage const& peer_storage,
190 : socklen_t peer_addrlen,
191 : io_object::implementation** impl_out,
192 : std::error_code* ec_out)
193 : {
194 4445 : auto* socket_svc = acceptor_impl->service().stream_service();
195 4445 : if (!socket_svc)
196 : {
197 MIS 0 : *ec_out = make_err(ENOENT);
198 0 : return false;
199 : }
200 :
201 HIT 4445 : auto& impl = static_cast<SocketImpl&>(*socket_svc->construct());
202 4445 : impl.set_socket(accepted_fd);
203 :
204 4445 : impl.desc_state_.fd = accepted_fd;
205 : {
206 4445 : std::lock_guard lock(impl.desc_state_.mutex);
207 4445 : impl.desc_state_.read_op = nullptr;
208 4445 : impl.desc_state_.write_op = nullptr;
209 4445 : impl.desc_state_.connect_op = nullptr;
210 4445 : }
211 4445 : socket_svc->scheduler().register_descriptor(accepted_fd, &impl.desc_state_);
212 :
213 : using ep_type = decltype(acceptor_impl->local_endpoint());
214 4445 : impl.set_endpoints(
215 : acceptor_impl->local_endpoint(),
216 4445 : from_sockaddr_as(
217 : peer_storage,
218 : peer_addrlen,
219 : ep_type{}));
220 :
221 4445 : if (impl_out)
222 4445 : *impl_out = &impl;
223 4445 : accepted_fd = -1;
224 4445 : return true;
225 : }
226 :
227 : /** Complete an accept operation.
228 :
229 : Sets up the peer socket on success, or closes the accepted
230 : fd on failure. Then resumes the caller via symmetric transfer.
231 :
232 : @tparam SocketImpl The concrete socket implementation type.
233 : @tparam Op The concrete accept operation type.
234 : @param op The operation to complete.
235 : */
236 : template<typename SocketImpl, typename Op>
237 : void
238 4465 : complete_accept_op(Op& op)
239 : {
240 4465 : op.stop_cb.reset();
241 4465 : op.acceptor_impl_->desc_state_.scheduler_->reset_inline_budget();
242 :
243 4465 : bool success =
244 4465 : (op.errn == 0 && !op.cancelled.load(std::memory_order_acquire));
245 :
246 4465 : if (op.cancelled.load(std::memory_order_acquire))
247 20 : *op.ec_out = capy::error::canceled;
248 4445 : else if (op.errn != 0)
249 MIS 0 : *op.ec_out = make_err(op.errn);
250 : else
251 HIT 4445 : *op.ec_out = {};
252 :
253 4465 : if (success && op.accepted_fd >= 0 && op.acceptor_impl_)
254 : {
255 4445 : if (!setup_accepted_socket<SocketImpl>(
256 4445 : op.acceptor_impl_, op.accepted_fd, op.peer_storage,
257 : op.peer_addrlen, op.impl_out, op.ec_out))
258 MIS 0 : success = false;
259 : }
260 :
261 HIT 4465 : if (!success || !op.acceptor_impl_)
262 : {
263 20 : if (op.accepted_fd >= 0)
264 : {
265 MIS 0 : ::close(op.accepted_fd);
266 0 : op.accepted_fd = -1;
267 : }
268 HIT 20 : if (op.impl_out)
269 20 : *op.impl_out = nullptr;
270 : }
271 :
272 4465 : op.cont_op.cont.h = op.h;
273 4465 : capy::executor_ref saved_ex(op.ex);
274 4465 : auto prevent = std::move(op.impl_ptr);
275 4465 : dispatch_coro(saved_ex, op.cont_op.cont).resume();
276 4465 : }
277 :
278 : /** Complete a datagram operation (send_to or recv_from).
279 :
280 : For recv_from operations, writes the source endpoint from the
281 : recorded sockaddr_storage into the caller's endpoint pointer.
282 : Then resumes the caller via symmetric transfer.
283 :
284 : @tparam Op The concrete datagram operation type.
285 : @param op The operation to complete.
286 : */
287 : template<typename Op>
288 : void
289 18 : complete_datagram_op(Op& op)
290 : {
291 18 : op.stop_cb.reset();
292 18 : op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
293 :
294 18 : if (op.cancelled.load(std::memory_order_acquire))
295 6 : *op.ec_out = capy::error::canceled;
296 12 : else if (op.errn != 0)
297 MIS 0 : *op.ec_out = make_err(op.errn);
298 : else
299 HIT 12 : *op.ec_out = {};
300 :
301 18 : *op.bytes_out = op.bytes_transferred;
302 :
303 18 : op.cont_op.cont.h = op.h;
304 18 : capy::executor_ref saved_ex(op.ex);
305 18 : auto prevent = std::move(op.impl_ptr);
306 18 : dispatch_coro(saved_ex, op.cont_op.cont).resume();
307 18 : }
308 :
309 : /** Complete a datagram operation with source endpoint capture.
310 :
311 : For recv_from operations, writes the source endpoint from the
312 : recorded sockaddr_storage into the caller's endpoint pointer.
313 : Then resumes the caller via symmetric transfer.
314 :
315 : @tparam Op The concrete datagram operation type.
316 : @param op The operation to complete.
317 : @param source_out Optional pointer to store source endpoint
318 : (non-null for recv_from, null for send_to).
319 : */
320 : template<typename Op, typename Endpoint>
321 : void
322 26 : complete_datagram_op(Op& op, Endpoint* source_out)
323 : {
324 26 : op.stop_cb.reset();
325 26 : op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
326 :
327 26 : if (op.cancelled.load(std::memory_order_acquire))
328 12 : *op.ec_out = capy::error::canceled;
329 14 : else if (op.errn != 0)
330 MIS 0 : *op.ec_out = make_err(op.errn);
331 : else
332 HIT 14 : *op.ec_out = {};
333 :
334 26 : *op.bytes_out = op.bytes_transferred;
335 :
336 36 : if (source_out && !op.cancelled.load(std::memory_order_acquire) &&
337 10 : op.errn == 0)
338 20 : *source_out = from_sockaddr_as(
339 10 : op.source_storage,
340 : op.source_addrlen,
341 : Endpoint{});
342 :
343 26 : op.cont_op.cont.h = op.h;
344 26 : capy::executor_ref saved_ex(op.ex);
345 26 : auto prevent = std::move(op.impl_ptr);
346 26 : dispatch_coro(saved_ex, op.cont_op.cont).resume();
347 26 : }
348 :
349 : } // namespace boost::corosio::detail
350 :
351 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
|