TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BASIC_SOCKET_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BASIC_SOCKET_HPP
12 :
13 : #include <boost/corosio/detail/intrusive.hpp>
14 : #include <boost/corosio/detail/native_handle.hpp>
15 : #include <boost/corosio/endpoint.hpp>
16 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
17 : #include <boost/corosio/native/detail/make_err.hpp>
18 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
19 :
20 : #include <memory>
21 : #include <mutex>
22 : #include <utility>
23 :
24 : #include <errno.h>
25 : #include <netinet/in.h>
26 : #include <sys/socket.h>
27 : #include <unistd.h>
28 :
29 : namespace boost::corosio::detail {
30 :
31 : /** CRTP base for reactor-backed socket implementations.
32 :
33 : Extracts the shared data members, virtual overrides, and
34 : cancel/close/register logic that is identical across TCP
35 : (reactor_stream_socket) and UDP (reactor_datagram_socket).
36 :
37 : Derived classes provide CRTP callbacks that enumerate their
38 : specific op slots so cancel/close can iterate them generically.
39 :
40 : @tparam Derived The concrete socket type (CRTP).
41 : @tparam ImplBase The public vtable base (tcp_socket::implementation
42 : or udp_socket::implementation).
43 : @tparam Service The backend's service type.
44 : @tparam DescState The backend's descriptor_state type.
45 : @tparam Endpoint The endpoint type (endpoint or local_endpoint).
46 : */
47 : template<
48 : class Derived,
49 : class ImplBase,
50 : class Service,
51 : class DescState,
52 : class Endpoint = endpoint>
53 : class reactor_basic_socket
54 : : public ImplBase
55 : , public std::enable_shared_from_this<Derived>
56 : , public intrusive_list<Derived>::node
57 : {
58 : friend Derived;
59 :
60 : template<class, class, class, class, class, class, class, class, class>
61 : friend class reactor_stream_socket;
62 :
63 : template<class, class, class, class, class, class, class, class, class, class, class>
64 : friend class reactor_datagram_socket;
65 :
66 HIT 13896 : explicit reactor_basic_socket(Service& svc) noexcept : svc_(svc) {}
67 :
68 : protected:
69 : Service& svc_;
70 : int fd_ = -1;
71 : Endpoint local_endpoint_;
72 :
73 : public:
74 : /// Per-descriptor state for persistent reactor registration.
75 : DescState desc_state_;
76 :
77 13896 : ~reactor_basic_socket() override = default;
78 :
79 : /// Return the underlying file descriptor.
80 42573 : native_handle_type native_handle() const noexcept override
81 : {
82 42573 : return fd_;
83 : }
84 :
85 : /// Return the cached local endpoint.
86 106 : Endpoint local_endpoint() const noexcept override
87 : {
88 106 : return local_endpoint_;
89 : }
90 :
91 : /// Return true if the socket has an open file descriptor.
92 : bool is_open() const noexcept
93 : {
94 : return fd_ >= 0;
95 : }
96 :
97 : /// Set a socket option.
98 66 : std::error_code set_option(
99 : int level,
100 : int optname,
101 : void const* data,
102 : std::size_t size) noexcept override
103 : {
104 66 : if (::setsockopt(
105 66 : fd_, level, optname, data, static_cast<socklen_t>(size)) != 0)
106 2 : return make_err(errno);
107 64 : return {};
108 : }
109 :
110 : /// Get a socket option.
111 : std::error_code
112 102 : get_option(int level, int optname, void* data, std::size_t* size)
113 : const noexcept override
114 : {
115 102 : socklen_t len = static_cast<socklen_t>(*size);
116 102 : if (::getsockopt(fd_, level, optname, data, &len) != 0)
117 MIS 0 : return make_err(errno);
118 HIT 102 : *size = static_cast<std::size_t>(len);
119 102 : return {};
120 : }
121 :
122 : /// Assign the file descriptor.
123 4449 : void set_socket(int fd) noexcept
124 : {
125 4449 : fd_ = fd;
126 4449 : }
127 :
128 : /// Cache the local endpoint.
129 : void set_local_endpoint(Endpoint ep) noexcept
130 : {
131 : local_endpoint_ = ep;
132 : }
133 :
134 : /** Bind the socket to a local endpoint.
135 :
136 : Calls ::bind() and caches the resulting local endpoint
137 : via getsockname().
138 :
139 : @param ep The endpoint to bind to.
140 : @return Error code on failure, empty on success.
141 : */
142 136 : std::error_code do_bind(Endpoint const& ep) noexcept
143 : {
144 136 : sockaddr_storage storage{};
145 136 : socklen_t addrlen = to_sockaddr(ep, socket_family(fd_), storage);
146 136 : if (::bind(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen) != 0)
147 10 : return make_err(errno);
148 :
149 126 : sockaddr_storage local_storage{};
150 126 : socklen_t local_len = sizeof(local_storage);
151 126 : if (::getsockname(
152 126 : fd_, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
153 : 0)
154 96 : local_endpoint_ =
155 126 : from_sockaddr_as(local_storage, local_len, Endpoint{});
156 :
157 126 : return {};
158 : }
159 :
160 : /// Assign the fd, initialize descriptor state, and register with the reactor.
161 4850 : void init_and_register(int fd) noexcept
162 : {
163 4850 : fd_ = fd;
164 4850 : desc_state_.fd = fd;
165 : {
166 4850 : std::lock_guard lock(desc_state_.mutex);
167 4850 : desc_state_.read_op = nullptr;
168 4850 : desc_state_.write_op = nullptr;
169 4850 : desc_state_.connect_op = nullptr;
170 4850 : }
171 4850 : svc_.scheduler().register_descriptor(fd, &desc_state_);
172 4850 : }
173 :
174 : /** Register an op with the reactor.
175 :
176 : Handles cached edge events and deferred cancellation.
177 : Called on the EAGAIN/EINPROGRESS path when speculative
178 : I/O failed.
179 : */
180 : template<class Op>
181 : void register_op(
182 : Op& op,
183 : reactor_op_base*& desc_slot,
184 : bool& ready_flag,
185 : bool& cancel_flag,
186 : bool is_write_direction = false) noexcept;
187 :
188 : /** Cancel a single pending operation.
189 :
190 : Claims the operation from its descriptor_state slot under
191 : the mutex and posts it to the scheduler as cancelled.
192 : Derived must implement:
193 : op_to_desc_slot(Op&) -> reactor_op_base**
194 : op_to_cancel_flag(Op&) -> bool*
195 : */
196 : template<class Op>
197 : void cancel_single_op(Op& op) noexcept;
198 :
199 : /** Cancel all pending operations.
200 :
201 : Invoked by the derived class's cancel() override.
202 : Derived must implement:
203 : for_each_op(auto fn)
204 : for_each_desc_entry(auto fn)
205 : */
206 : void do_cancel() noexcept;
207 :
208 : /** Close the socket and cancel pending operations.
209 :
210 : Invoked by the derived class's close_socket(). The
211 : derived class may add backend-specific cleanup after
212 : calling this method.
213 : Derived must implement:
214 : for_each_op(auto fn)
215 : for_each_desc_entry(auto fn)
216 : */
217 : void do_close_socket() noexcept;
218 :
219 : /** Release the socket without closing the fd.
220 :
221 : Like do_close_socket() but does not call ::close().
222 : Returns the fd so the caller can take ownership.
223 : */
224 : native_handle_type do_release_socket() noexcept;
225 : };
226 :
227 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
228 : template<class Op>
229 : void
230 5085 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::register_op(
231 : Op& op,
232 : reactor_op_base*& desc_slot,
233 : bool& ready_flag,
234 : bool& cancel_flag,
235 : bool is_write_direction) noexcept
236 : {
237 5085 : svc_.work_started();
238 :
239 5085 : std::lock_guard lock(desc_state_.mutex);
240 5085 : bool io_done = false;
241 5085 : if (ready_flag)
242 : {
243 185 : ready_flag = false;
244 185 : op.perform_io();
245 185 : io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
246 185 : if (!io_done)
247 185 : op.errn = 0;
248 : }
249 :
250 5085 : if (cancel_flag)
251 : {
252 MIS 0 : cancel_flag = false;
253 0 : op.cancelled.store(true, std::memory_order_relaxed);
254 : }
255 :
256 HIT 5085 : if (io_done || op.cancelled.load(std::memory_order_acquire))
257 : {
258 MIS 0 : svc_.post(&op);
259 0 : svc_.work_finished();
260 : }
261 : else
262 : {
263 HIT 5085 : desc_slot = &op;
264 :
265 : // Select must rebuild its fd_sets when a write-direction op
266 : // is parked, so select() watches for writability. Compiled
267 : // away to nothing for epoll and kqueue.
268 : if constexpr (requires { Service::needs_write_notification; })
269 : {
270 : if constexpr (Service::needs_write_notification)
271 : {
272 2476 : if (is_write_direction)
273 2177 : svc_.scheduler().notify_reactor();
274 : }
275 : }
276 : }
277 5085 : }
278 :
279 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
280 : template<class Op>
281 : void
282 200 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::cancel_single_op(
283 : Op& op) noexcept
284 : {
285 200 : auto self = this->weak_from_this().lock();
286 200 : if (!self)
287 MIS 0 : return;
288 :
289 HIT 200 : op.request_cancel();
290 :
291 200 : auto* d = static_cast<Derived*>(this);
292 200 : reactor_op_base** desc_op_ptr = d->op_to_desc_slot(op);
293 :
294 200 : if (desc_op_ptr)
295 : {
296 200 : reactor_op_base* claimed = nullptr;
297 : {
298 200 : std::lock_guard lock(desc_state_.mutex);
299 200 : if (*desc_op_ptr == &op)
300 200 : claimed = std::exchange(*desc_op_ptr, nullptr);
301 : else
302 : {
303 MIS 0 : bool* cflag = d->op_to_cancel_flag(op);
304 0 : if (cflag)
305 0 : *cflag = true;
306 : }
307 HIT 200 : }
308 200 : if (claimed)
309 : {
310 200 : op.impl_ptr = self;
311 200 : svc_.post(&op);
312 200 : svc_.work_finished();
313 : }
314 : }
315 200 : }
316 :
317 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
318 : void
319 207 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::
320 : do_cancel() noexcept
321 : {
322 207 : auto self = this->weak_from_this().lock();
323 207 : if (!self)
324 MIS 0 : return;
325 :
326 HIT 207 : auto* d = static_cast<Derived*>(this);
327 :
328 1477 : d->for_each_op([](auto& op) { op.request_cancel(); });
329 :
330 : // Claim ops under a single lock acquisition
331 : struct claimed_entry
332 : {
333 : reactor_op_base* op = nullptr;
334 : reactor_op_base* base = nullptr;
335 : };
336 : // Max 8 ops: conn, rd, wr, wait_rd, wait_wr, wait_er, recv_rd, send_wr
337 207 : claimed_entry claimed[8];
338 207 : int count = 0;
339 :
340 : {
341 207 : std::lock_guard lock(desc_state_.mutex);
342 2747 : d->for_each_desc_entry([&](auto& op, reactor_op_base*& desc_slot) {
343 1270 : if (desc_slot == &op)
344 : {
345 121 : claimed[count].op = std::exchange(desc_slot, nullptr);
346 121 : claimed[count].base = &op;
347 121 : ++count;
348 : }
349 : });
350 207 : }
351 :
352 328 : for (int i = 0; i < count; ++i)
353 : {
354 121 : claimed[i].base->impl_ptr = self;
355 121 : svc_.post(claimed[i].base);
356 121 : svc_.work_finished();
357 : }
358 207 : }
359 :
360 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
361 : void
362 41941 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::
363 : do_close_socket() noexcept
364 : {
365 41941 : auto self = this->weak_from_this().lock();
366 41941 : if (self)
367 : {
368 41941 : auto* d = static_cast<Derived*>(this);
369 :
370 295899 : d->for_each_op([](auto& op) { op.request_cancel(); });
371 :
372 : struct claimed_entry
373 : {
374 : reactor_op_base* base = nullptr;
375 : };
376 41941 : claimed_entry claimed[8];
377 41941 : int count = 0;
378 :
379 : {
380 41941 : std::lock_guard lock(desc_state_.mutex);
381 41941 : d->for_each_desc_entry(
382 507916 : [&](auto& /*op*/, reactor_op_base*& desc_slot) {
383 253958 : auto* c = std::exchange(desc_slot, nullptr);
384 253958 : if (c)
385 : {
386 8 : claimed[count].base = c;
387 8 : ++count;
388 : }
389 : });
390 41941 : desc_state_.read_ready = false;
391 41941 : desc_state_.write_ready = false;
392 41941 : desc_state_.read_cancel_pending = false;
393 41941 : desc_state_.write_cancel_pending = false;
394 41941 : desc_state_.connect_cancel_pending = false;
395 41941 : desc_state_.wait_read_cancel_pending = false;
396 41941 : desc_state_.wait_write_cancel_pending = false;
397 41941 : desc_state_.wait_error_cancel_pending = false;
398 :
399 41941 : if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
400 406 : desc_state_.impl_ref_ = self;
401 41941 : }
402 :
403 41949 : for (int i = 0; i < count; ++i)
404 : {
405 8 : claimed[i].base->impl_ptr = self;
406 8 : svc_.post(claimed[i].base);
407 8 : svc_.work_finished();
408 : }
409 : }
410 :
411 41941 : if (fd_ >= 0)
412 : {
413 9295 : if (desc_state_.registered_events != 0)
414 9295 : svc_.scheduler().deregister_descriptor(fd_);
415 9295 : ::close(fd_);
416 9295 : fd_ = -1;
417 : }
418 :
419 41941 : desc_state_.fd = -1;
420 41941 : desc_state_.registered_events = 0;
421 :
422 41941 : local_endpoint_ = Endpoint{};
423 41941 : }
424 :
425 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
426 : native_handle_type
427 4 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::
428 : do_release_socket() noexcept
429 : {
430 : // Cancel pending ops (same as do_close_socket)
431 4 : auto self = this->weak_from_this().lock();
432 4 : if (self)
433 : {
434 4 : auto* d = static_cast<Derived*>(this);
435 :
436 32 : d->for_each_op([](auto& op) { op.request_cancel(); });
437 :
438 : struct claimed_entry
439 : {
440 : reactor_op_base* base = nullptr;
441 : };
442 4 : claimed_entry claimed[8];
443 4 : int count = 0;
444 :
445 : {
446 4 : std::lock_guard lock(desc_state_.mutex);
447 4 : d->for_each_desc_entry(
448 56 : [&](auto& /*op*/, reactor_op_base*& desc_slot) {
449 28 : auto* c = std::exchange(desc_slot, nullptr);
450 28 : if (c)
451 : {
452 MIS 0 : claimed[count].base = c;
453 0 : ++count;
454 : }
455 : });
456 HIT 4 : desc_state_.read_ready = false;
457 4 : desc_state_.write_ready = false;
458 4 : desc_state_.read_cancel_pending = false;
459 4 : desc_state_.write_cancel_pending = false;
460 4 : desc_state_.connect_cancel_pending = false;
461 4 : desc_state_.wait_read_cancel_pending = false;
462 4 : desc_state_.wait_write_cancel_pending = false;
463 4 : desc_state_.wait_error_cancel_pending = false;
464 :
465 4 : if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
466 MIS 0 : desc_state_.impl_ref_ = self;
467 HIT 4 : }
468 :
469 4 : for (int i = 0; i < count; ++i)
470 : {
471 MIS 0 : claimed[i].base->impl_ptr = self;
472 0 : svc_.post(claimed[i].base);
473 0 : svc_.work_finished();
474 : }
475 : }
476 :
477 HIT 4 : native_handle_type released = fd_;
478 :
479 4 : if (fd_ >= 0)
480 : {
481 4 : if (desc_state_.registered_events != 0)
482 4 : svc_.scheduler().deregister_descriptor(fd_);
483 : // Do NOT close -- caller takes ownership
484 4 : fd_ = -1;
485 : }
486 :
487 4 : desc_state_.fd = -1;
488 4 : desc_state_.registered_events = 0;
489 :
490 4 : local_endpoint_ = Endpoint{};
491 :
492 8 : return released;
493 4 : }
494 :
495 : } // namespace boost::corosio::detail
496 :
497 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BASIC_SOCKET_HPP
|