94.44% Lines (68/72) 100.00% Functions (23/23)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2026 Steve Gerbino 2   // Copyright (c) 2026 Steve Gerbino
3   // 3   //
4   // Distributed under the Boost Software License, Version 1.0. (See accompanying 4   // Distributed under the Boost Software License, Version 1.0. (See accompanying
5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6   // 6   //
7   // Official repository: https://github.com/cppalliance/corosio 7   // Official repository: https://github.com/cppalliance/corosio
8   // 8   //
9   9  
10   #ifndef BOOST_COROSIO_NATIVE_NATIVE_LOCAL_STREAM_SOCKET_HPP 10   #ifndef BOOST_COROSIO_NATIVE_NATIVE_LOCAL_STREAM_SOCKET_HPP
11   #define BOOST_COROSIO_NATIVE_NATIVE_LOCAL_STREAM_SOCKET_HPP 11   #define BOOST_COROSIO_NATIVE_NATIVE_LOCAL_STREAM_SOCKET_HPP
12   12  
13   #include <boost/corosio/local_stream_socket.hpp> 13   #include <boost/corosio/local_stream_socket.hpp>
14   #include <boost/corosio/backend.hpp> 14   #include <boost/corosio/backend.hpp>
15   15  
16   #ifndef BOOST_COROSIO_MRDOCS 16   #ifndef BOOST_COROSIO_MRDOCS
17   #if BOOST_COROSIO_HAS_EPOLL 17   #if BOOST_COROSIO_HAS_EPOLL
18   #include <boost/corosio/native/detail/epoll/epoll_types.hpp> 18   #include <boost/corosio/native/detail/epoll/epoll_types.hpp>
19   #endif 19   #endif
20   20  
21   #if BOOST_COROSIO_HAS_SELECT 21   #if BOOST_COROSIO_HAS_SELECT
22   #include <boost/corosio/native/detail/select/select_types.hpp> 22   #include <boost/corosio/native/detail/select/select_types.hpp>
23   #endif 23   #endif
24   24  
25   #if BOOST_COROSIO_HAS_KQUEUE 25   #if BOOST_COROSIO_HAS_KQUEUE
26   #include <boost/corosio/native/detail/kqueue/kqueue_types.hpp> 26   #include <boost/corosio/native/detail/kqueue/kqueue_types.hpp>
27   #endif 27   #endif
28   28  
29   #if BOOST_COROSIO_HAS_IOCP 29   #if BOOST_COROSIO_HAS_IOCP
30   #include <boost/corosio/native/detail/iocp/win_local_stream_service.hpp> 30   #include <boost/corosio/native/detail/iocp/win_local_stream_service.hpp>
31   #endif 31   #endif
32   #endif // !BOOST_COROSIO_MRDOCS 32   #endif // !BOOST_COROSIO_MRDOCS
33   33  
34   namespace boost::corosio { 34   namespace boost::corosio {
35   35  
36   /** An asynchronous Unix stream socket with devirtualized I/O operations. 36   /** An asynchronous Unix stream socket with devirtualized I/O operations.
37   37  
38   This class template inherits from @ref local_stream_socket and 38   This class template inherits from @ref local_stream_socket and
39   shadows the async operations (`read_some`, `write_some`, 39   shadows the async operations (`read_some`, `write_some`,
40   `connect`) with versions that call the backend implementation 40   `connect`) with versions that call the backend implementation
41   directly, allowing the compiler to inline through the entire 41   directly, allowing the compiler to inline through the entire
42   call chain. 42   call chain.
43   43  
44   Non-async operations (`open`, `close`, `cancel`, socket options) 44   Non-async operations (`open`, `close`, `cancel`, socket options)
45   remain unchanged and dispatch through the compiled library. 45   remain unchanged and dispatch through the compiled library.
46   46  
47   A `native_local_stream_socket` IS-A `local_stream_socket` and 47   A `native_local_stream_socket` IS-A `local_stream_socket` and
48   can be passed to any function expecting `local_stream_socket&` 48   can be passed to any function expecting `local_stream_socket&`
49   or `io_stream&`, in which case virtual dispatch is used 49   or `io_stream&`, in which case virtual dispatch is used
50   transparently. 50   transparently.
51   51  
52   @tparam Backend A backend tag value (e.g., `epoll`) whose type 52   @tparam Backend A backend tag value (e.g., `epoll`) whose type
53   provides the concrete implementation types. 53   provides the concrete implementation types.
54   54  
55   @par Thread Safety 55   @par Thread Safety
56   Same as @ref local_stream_socket. 56   Same as @ref local_stream_socket.
57   57  
58   @par Example 58   @par Example
59   @code 59   @code
60   #include <boost/corosio/native/native_local_stream_socket.hpp> 60   #include <boost/corosio/native/native_local_stream_socket.hpp>
61   61  
62   native_io_context<epoll> ctx; 62   native_io_context<epoll> ctx;
63   native_local_stream_socket<epoll> s(ctx); 63   native_local_stream_socket<epoll> s(ctx);
64   s.open(); 64   s.open();
65   auto [ec] = co_await s.connect(local_endpoint("/tmp/my.sock")); 65   auto [ec] = co_await s.connect(local_endpoint("/tmp/my.sock"));
66   @endcode 66   @endcode
67   67  
68   @see local_stream_socket, epoll_t, iocp_t 68   @see local_stream_socket, epoll_t, iocp_t
69   */ 69   */
70   template<auto Backend> 70   template<auto Backend>
71   class native_local_stream_socket : public local_stream_socket 71   class native_local_stream_socket : public local_stream_socket
72   { 72   {
73   using backend_type = decltype(Backend); 73   using backend_type = decltype(Backend);
74   using impl_type = typename backend_type::local_stream_socket_type; 74   using impl_type = typename backend_type::local_stream_socket_type;
75   using service_type = typename backend_type::local_stream_service_type; 75   using service_type = typename backend_type::local_stream_service_type;
76   76  
HITCBC 77   20 impl_type& get_impl() noexcept 77   20 impl_type& get_impl() noexcept
78   { 78   {
HITCBC 79   20 return *static_cast<impl_type*>(h_.get()); 79   20 return *static_cast<impl_type*>(h_.get());
80   } 80   }
81   81  
82   template<class MutableBufferSequence> 82   template<class MutableBufferSequence>
83   struct native_read_awaitable 83   struct native_read_awaitable
84   { 84   {
85   native_local_stream_socket& self_; 85   native_local_stream_socket& self_;
86   MutableBufferSequence buffers_; 86   MutableBufferSequence buffers_;
87   std::stop_token token_; 87   std::stop_token token_;
88   mutable std::error_code ec_; 88   mutable std::error_code ec_;
89   mutable std::size_t bytes_transferred_ = 0; 89   mutable std::size_t bytes_transferred_ = 0;
90   90  
HITCBC 91   4 native_read_awaitable( 91   4 native_read_awaitable(
92   native_local_stream_socket& self, 92   native_local_stream_socket& self,
93   MutableBufferSequence buffers) noexcept 93   MutableBufferSequence buffers) noexcept
HITCBC 94   4 : self_(self) 94   4 : self_(self)
HITCBC 95   4 , buffers_(std::move(buffers)) 95   4 , buffers_(std::move(buffers))
96   { 96   {
HITCBC 97   4 } 97   4 }
98   98  
HITCBC 99   4 bool await_ready() const noexcept 99   4 bool await_ready() const noexcept
100   { 100   {
HITCBC 101   4 return token_.stop_requested(); 101   4 return token_.stop_requested();
102   } 102   }
103   103  
HITCBC 104   4 capy::io_result<std::size_t> await_resume() const noexcept 104   4 capy::io_result<std::size_t> await_resume() const noexcept
105   { 105   {
HITCBC 106   4 if (token_.stop_requested()) 106   4 if (token_.stop_requested())
MISUBC 107   return {make_error_code(std::errc::operation_canceled), 0}; 107   return {make_error_code(std::errc::operation_canceled), 0};
HITCBC 108   4 return {ec_, bytes_transferred_}; 108   4 return {ec_, bytes_transferred_};
109   } 109   }
110   110  
HITCBC 111   4 auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) 111   4 auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
112   -> std::coroutine_handle<> 112   -> std::coroutine_handle<>
113   { 113   {
HITCBC 114   4 token_ = env->stop_token; 114   4 token_ = env->stop_token;
HITCBC 115   12 return self_.get_impl().read_some( 115   12 return self_.get_impl().read_some(
HITCBC 116   12 h, env->executor, buffers_, token_, &ec_, &bytes_transferred_); 116   12 h, env->executor, buffers_, token_, &ec_, &bytes_transferred_);
117   } 117   }
118   }; 118   };
119   119  
120   template<class ConstBufferSequence> 120   template<class ConstBufferSequence>
121   struct native_write_awaitable 121   struct native_write_awaitable
122   { 122   {
123   native_local_stream_socket& self_; 123   native_local_stream_socket& self_;
124   ConstBufferSequence buffers_; 124   ConstBufferSequence buffers_;
125   std::stop_token token_; 125   std::stop_token token_;
126   mutable std::error_code ec_; 126   mutable std::error_code ec_;
127   mutable std::size_t bytes_transferred_ = 0; 127   mutable std::size_t bytes_transferred_ = 0;
128   128  
HITCBC 129   4 native_write_awaitable( 129   4 native_write_awaitable(
130   native_local_stream_socket& self, 130   native_local_stream_socket& self,
131   ConstBufferSequence buffers) noexcept 131   ConstBufferSequence buffers) noexcept
HITCBC 132   4 : self_(self) 132   4 : self_(self)
HITCBC 133   4 , buffers_(std::move(buffers)) 133   4 , buffers_(std::move(buffers))
134   { 134   {
HITCBC 135   4 } 135   4 }
136   136  
HITCBC 137   4 bool await_ready() const noexcept 137   4 bool await_ready() const noexcept
138   { 138   {
HITCBC 139   4 return token_.stop_requested(); 139   4 return token_.stop_requested();
140   } 140   }
141   141  
HITCBC 142   4 capy::io_result<std::size_t> await_resume() const noexcept 142   4 capy::io_result<std::size_t> await_resume() const noexcept
143   { 143   {
HITCBC 144   4 if (token_.stop_requested()) 144   4 if (token_.stop_requested())
MISUBC 145   return {make_error_code(std::errc::operation_canceled), 0}; 145   return {make_error_code(std::errc::operation_canceled), 0};
HITCBC 146   4 return {ec_, bytes_transferred_}; 146   4 return {ec_, bytes_transferred_};
147   } 147   }
148   148  
HITCBC 149   4 auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) 149   4 auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
150   -> std::coroutine_handle<> 150   -> std::coroutine_handle<>
151   { 151   {
HITCBC 152   4 token_ = env->stop_token; 152   4 token_ = env->stop_token;
HITCBC 153   12 return self_.get_impl().write_some( 153   12 return self_.get_impl().write_some(
HITCBC 154   12 h, env->executor, buffers_, token_, &ec_, &bytes_transferred_); 154   12 h, env->executor, buffers_, token_, &ec_, &bytes_transferred_);
155   } 155   }
156   }; 156   };
157   157  
158   struct native_wait_awaitable 158   struct native_wait_awaitable
159   { 159   {
160   native_local_stream_socket& self_; 160   native_local_stream_socket& self_;
161   wait_type w_; 161   wait_type w_;
162   std::stop_token token_; 162   std::stop_token token_;
163   mutable std::error_code ec_; 163   mutable std::error_code ec_;
164   164  
HITCBC 165   2 native_wait_awaitable( 165   2 native_wait_awaitable(
166   native_local_stream_socket& self, wait_type w) noexcept 166   native_local_stream_socket& self, wait_type w) noexcept
HITCBC 167   2 : self_(self) 167   2 : self_(self)
HITCBC 168   2 , w_(w) 168   2 , w_(w)
169   { 169   {
HITCBC 170   2 } 170   2 }
171   171  
HITCBC 172   2 bool await_ready() const noexcept 172   2 bool await_ready() const noexcept
173   { 173   {
HITCBC 174   2 return token_.stop_requested(); 174   2 return token_.stop_requested();
175   } 175   }
176   176  
HITCBC 177   2 capy::io_result<> await_resume() const noexcept 177   2 capy::io_result<> await_resume() const noexcept
178   { 178   {
HITCBC 179   2 if (token_.stop_requested()) 179   2 if (token_.stop_requested())
MISUBC 180   return {make_error_code(std::errc::operation_canceled)}; 180   return {make_error_code(std::errc::operation_canceled)};
HITCBC 181   2 return {ec_}; 181   2 return {ec_};
182   } 182   }
183   183  
HITCBC 184   2 auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) 184   2 auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
185   -> std::coroutine_handle<> 185   -> std::coroutine_handle<>
186   { 186   {
HITCBC 187   2 token_ = env->stop_token; 187   2 token_ = env->stop_token;
HITCBC 188   6 return self_.get_impl().wait( 188   6 return self_.get_impl().wait(
HITCBC 189   6 h, env->executor, w_, token_, &ec_); 189   6 h, env->executor, w_, token_, &ec_);
190   } 190   }
191   }; 191   };
192   192  
193   struct native_connect_awaitable 193   struct native_connect_awaitable
194   { 194   {
195   native_local_stream_socket& self_; 195   native_local_stream_socket& self_;
196   corosio::local_endpoint endpoint_; 196   corosio::local_endpoint endpoint_;
197   std::stop_token token_; 197   std::stop_token token_;
198   mutable std::error_code ec_; 198   mutable std::error_code ec_;
199   199  
HITCBC 200   10 native_connect_awaitable( 200   10 native_connect_awaitable(
201   native_local_stream_socket& self, 201   native_local_stream_socket& self,
202   corosio::local_endpoint ep) noexcept 202   corosio::local_endpoint ep) noexcept
HITCBC 203   10 : self_(self) 203   10 : self_(self)
HITCBC 204   10 , endpoint_(ep) 204   10 , endpoint_(ep)
205   { 205   {
HITCBC 206   10 } 206   10 }
207   207  
HITCBC 208   10 bool await_ready() const noexcept 208   10 bool await_ready() const noexcept
209   { 209   {
HITCBC 210   10 return token_.stop_requested(); 210   10 return token_.stop_requested();
211   } 211   }
212   212  
HITCBC 213   10 capy::io_result<> await_resume() const noexcept 213   10 capy::io_result<> await_resume() const noexcept
214   { 214   {
HITCBC 215   10 if (token_.stop_requested()) 215   10 if (token_.stop_requested())
MISUBC 216   return {make_error_code(std::errc::operation_canceled)}; 216   return {make_error_code(std::errc::operation_canceled)};
HITCBC 217   10 return {ec_}; 217   10 return {ec_};
218   } 218   }
219   219  
HITCBC 220   10 auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) 220   10 auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
221   -> std::coroutine_handle<> 221   -> std::coroutine_handle<>
222   { 222   {
HITCBC 223   10 token_ = env->stop_token; 223   10 token_ = env->stop_token;
HITCBC 224   30 return self_.get_impl().connect( 224   30 return self_.get_impl().connect(
HITCBC 225   30 h, env->executor, endpoint_, token_, &ec_); 225   30 h, env->executor, endpoint_, token_, &ec_);
226   } 226   }
227   }; 227   };
228   228  
229   public: 229   public:
230   /** Construct a native socket from an execution context. 230   /** Construct a native socket from an execution context.
231   231  
232   @param ctx The execution context that will own this socket. 232   @param ctx The execution context that will own this socket.
233   */ 233   */
HITCBC 234   28 explicit native_local_stream_socket(capy::execution_context& ctx) 234   28 explicit native_local_stream_socket(capy::execution_context& ctx)
HITCBC 235   28 : io_object(create_handle<service_type>(ctx)) 235   28 : io_object(create_handle<service_type>(ctx))
236   { 236   {
HITCBC 237   28 } 237   28 }
238   238  
239   /** Construct a native socket from an executor. 239   /** Construct a native socket from an executor.
240   240  
241   @param ex The executor whose context will own the socket. 241   @param ex The executor whose context will own the socket.
242   */ 242   */
243   template<class Ex> 243   template<class Ex>
244   requires(!std::same_as< 244   requires(!std::same_as<
245   std::remove_cvref_t<Ex>, 245   std::remove_cvref_t<Ex>,
246   native_local_stream_socket>) && 246   native_local_stream_socket>) &&
247   capy::Executor<Ex> 247   capy::Executor<Ex>
248   explicit native_local_stream_socket(Ex const& ex) 248   explicit native_local_stream_socket(Ex const& ex)
249   : native_local_stream_socket(ex.context()) 249   : native_local_stream_socket(ex.context())
250   { 250   {
251   } 251   }
252   252  
253   /// Move construct. 253   /// Move construct.
HITCBC 254   4 native_local_stream_socket(native_local_stream_socket&&) noexcept = default; 254   4 native_local_stream_socket(native_local_stream_socket&&) noexcept = default;
255   255  
256   /// Move assign. 256   /// Move assign.
257   native_local_stream_socket& 257   native_local_stream_socket&
258   operator=(native_local_stream_socket&&) noexcept = default; 258   operator=(native_local_stream_socket&&) noexcept = default;
259   259  
260   native_local_stream_socket(native_local_stream_socket const&) = delete; 260   native_local_stream_socket(native_local_stream_socket const&) = delete;
261   native_local_stream_socket& 261   native_local_stream_socket&
262   operator=(native_local_stream_socket const&) = delete; 262   operator=(native_local_stream_socket const&) = delete;
263   263  
264   /** Asynchronously read data from the socket. 264   /** Asynchronously read data from the socket.
265   265  
266   Calls the backend implementation directly, bypassing virtual 266   Calls the backend implementation directly, bypassing virtual
267   dispatch. Otherwise identical to @ref io_stream::read_some. 267   dispatch. Otherwise identical to @ref io_stream::read_some.
268   268  
269   @param buffers The buffer sequence to read into. 269   @param buffers The buffer sequence to read into.
270   270  
271   @return An awaitable yielding `(error_code, std::size_t)`. 271   @return An awaitable yielding `(error_code, std::size_t)`.
272   */ 272   */
273   template<capy::MutableBufferSequence MB> 273   template<capy::MutableBufferSequence MB>
HITCBC 274   4 auto read_some(MB const& buffers) 274   4 auto read_some(MB const& buffers)
275   { 275   {
HITCBC 276   4 return native_read_awaitable<MB>(*this, buffers); 276   4 return native_read_awaitable<MB>(*this, buffers);
277   } 277   }
278   278  
279   /** Asynchronously write data to the socket. 279   /** Asynchronously write data to the socket.
280   280  
281   Calls the backend implementation directly, bypassing virtual 281   Calls the backend implementation directly, bypassing virtual
282   dispatch. Otherwise identical to @ref io_stream::write_some. 282   dispatch. Otherwise identical to @ref io_stream::write_some.
283   283  
284   @param buffers The buffer sequence to write from. 284   @param buffers The buffer sequence to write from.
285   285  
286   @return An awaitable yielding `(error_code, std::size_t)`. 286   @return An awaitable yielding `(error_code, std::size_t)`.
287   */ 287   */
288   template<capy::ConstBufferSequence CB> 288   template<capy::ConstBufferSequence CB>
HITCBC 289   4 auto write_some(CB const& buffers) 289   4 auto write_some(CB const& buffers)
290   { 290   {
HITCBC 291   4 return native_write_awaitable<CB>(*this, buffers); 291   4 return native_write_awaitable<CB>(*this, buffers);
292   } 292   }
293   293  
294   /** Asynchronously connect to a remote endpoint. 294   /** Asynchronously connect to a remote endpoint.
295   295  
296   Calls the backend implementation directly, bypassing virtual 296   Calls the backend implementation directly, bypassing virtual
297   dispatch. Otherwise identical to @ref local_stream_socket::connect. 297   dispatch. Otherwise identical to @ref local_stream_socket::connect.
298   298  
299   If the socket is not already open, it is opened automatically. 299   If the socket is not already open, it is opened automatically.
300   300  
301   @param ep The local endpoint (path) to connect to. 301   @param ep The local endpoint (path) to connect to.
302   302  
303   @return An awaitable yielding `io_result<>`. 303   @return An awaitable yielding `io_result<>`.
304   304  
305   @throws std::system_error if the socket needs to be opened 305   @throws std::system_error if the socket needs to be opened
306   and the open fails. 306   and the open fails.
307   */ 307   */
HITCBC 308   10 auto connect(corosio::local_endpoint ep) 308   10 auto connect(corosio::local_endpoint ep)
309   { 309   {
HITCBC 310   10 if (!is_open()) 310   10 if (!is_open())
HITCBC 311   10 open(); 311   10 open();
HITCBC 312   10 return native_connect_awaitable(*this, ep); 312   10 return native_connect_awaitable(*this, ep);
313   } 313   }
314   314  
315   /** Asynchronously wait for the socket to be ready. 315   /** Asynchronously wait for the socket to be ready.
316   316  
317   Calls the backend implementation directly, bypassing virtual 317   Calls the backend implementation directly, bypassing virtual
318   dispatch. Otherwise identical to @ref local_stream_socket::wait. 318   dispatch. Otherwise identical to @ref local_stream_socket::wait.
319   319  
320   @param w The wait direction (read, write, or error). 320   @param w The wait direction (read, write, or error).
321   321  
322   @return An awaitable yielding `io_result<>`. 322   @return An awaitable yielding `io_result<>`.
323   */ 323   */
HITCBC 324   2 [[nodiscard]] auto wait(wait_type w) 324   2 [[nodiscard]] auto wait(wait_type w)
325   { 325   {
HITCBC 326   2 return native_wait_awaitable(*this, w); 326   2 return native_wait_awaitable(*this, w);
327   } 327   }
328   }; 328   };
329   329  
330   } // namespace boost::corosio 330   } // namespace boost::corosio
331   331  
332   #endif // BOOST_COROSIO_NATIVE_NATIVE_LOCAL_STREAM_SOCKET_HPP 332   #endif // BOOST_COROSIO_NATIVE_NATIVE_LOCAL_STREAM_SOCKET_HPP