include/boost/corosio/tcp_server.hpp

93.5% Lines (130/139) 97.1% List of functions (33/34)
tcp_server.hpp
f(x) Functions (34)
Function Calls Lines Blocks
boost::corosio::tcp_server::idle_push(boost::corosio::tcp_server::worker_base*) :182 77x 100.0% 100.0% boost::corosio::tcp_server::idle_pop() :188 18x 100.0% 100.0% boost::corosio::tcp_server::idle_empty() const :196 21x 100.0% 100.0% boost::corosio::tcp_server::active_push(boost::corosio::tcp_server::worker_base*) :202 9x 100.0% 100.0% boost::corosio::tcp_server::active_remove(boost::corosio::tcp_server::worker_base*) :213 21x 100.0% 91.0% boost::corosio::tcp_server::launch_wrapper<boost::corosio::io_context::executor_type>::promise_type::promise_type<boost::corosio::tcp_server::launch_coro<boost::corosio::io_context::executor_type>&, boost::corosio::io_context::executor_type, std::stop_token, boost::corosio::tcp_server*&, boost::capy::task<void>&, boost::corosio::tcp_server::worker_base*&>(boost::corosio::tcp_server::launch_coro<boost::corosio::io_context::executor_type>&&, boost::corosio::io_context::executor_type, std::stop_token, boost::corosio::tcp_server*&, boost::capy::task<void>&, boost::corosio::tcp_server::worker_base*&) :252 9x 100.0% 100.0% boost::corosio::tcp_server::launch_wrapper<boost::corosio::io_context::executor_type>::promise_type::get_return_object() :260 9x 100.0% 100.0% boost::corosio::tcp_server::launch_wrapper<boost::corosio::io_context::executor_type>::promise_type::initial_suspend() :265 9x 100.0% 100.0% boost::corosio::tcp_server::launch_wrapper<boost::corosio::io_context::executor_type>::promise_type::final_suspend() :269 9x 100.0% 100.0% boost::corosio::tcp_server::launch_wrapper<boost::corosio::io_context::executor_type>::promise_type::return_void() :273 9x 100.0% 100.0% boost::corosio::tcp_server::launch_wrapper<boost::corosio::io_context::executor_type>::promise_type::unhandled_exception() :274 0 0.0% 0.0% auto boost::corosio::tcp_server::launch_wrapper<boost::corosio::io_context::executor_type>::promise_type::await_transform<boost::capy::task<void> >(boost::capy::task<void>&&) :281 9x 100.0% 100.0% auto boost::corosio::tcp_server::launch_wrapper<boost::corosio::io_context::executor_type>::promise_type::await_transform<boost::corosio::tcp_server::push_awaitable>(boost::corosio::tcp_server::push_awaitable&&) :281 9x 100.0% 100.0% boost::corosio::tcp_server::launch_wrapper<boost::corosio::io_context::executor_type>::launch_wrapper(std::__n4861::coroutine_handle<boost::corosio::tcp_server::launch_wrapper<boost::corosio::io_context::executor_type>::promise_type>) :309 9x 100.0% 100.0% boost::corosio::tcp_server::launch_wrapper<boost::corosio::io_context::executor_type>::~launch_wrapper() :314 9x 75.0% 75.0% boost::corosio::tcp_server::launch_coro<boost::corosio::io_context::executor_type>::operator()(boost::corosio::io_context::executor_type, std::stop_token, boost::corosio::tcp_server*, boost::capy::task<void>, boost::corosio::tcp_server::worker_base*) :334 9x 100.0% 46.0% boost::corosio::tcp_server::push_awaitable::push_awaitable(boost::corosio::tcp_server&, boost::corosio::tcp_server::worker_base&) :354 19x 100.0% 100.0% boost::corosio::tcp_server::push_awaitable::await_ready() const :360 19x 100.0% 100.0% boost::corosio::tcp_server::push_awaitable::await_suspend(std::__n4861::coroutine_handle<void>, boost::capy::io_env const*) :366 19x 100.0% 100.0% boost::corosio::tcp_server::push_awaitable::await_resume() :373 19x 100.0% 100.0% boost::corosio::tcp_server::pop_awaitable::pop_awaitable(boost::corosio::tcp_server&) :399 21x 100.0% 100.0% boost::corosio::tcp_server::pop_awaitable::await_ready() const :401 21x 100.0% 100.0% boost::corosio::tcp_server::pop_awaitable::await_suspend(std::__n4861::coroutine_handle<void>, boost::capy::io_env const*) :407 3x 100.0% 100.0% boost::corosio::tcp_server::pop_awaitable::await_resume() :417 21x 100.0% 100.0% boost::corosio::tcp_server::push(boost::corosio::tcp_server::worker_base&) :426 19x 100.0% 100.0% boost::corosio::tcp_server::push_sync(boost::corosio::tcp_server::worker_base&) :433 2x 50.0% 80.0% boost::corosio::tcp_server::pop() :450 21x 100.0% 100.0% boost::corosio::tcp_server::launcher::launcher(boost::corosio::tcp_server&, boost::corosio::tcp_server::worker_base&) :515 11x 100.0% 100.0% boost::corosio::tcp_server::launcher::~launcher() :521 11x 100.0% 100.0% void boost::corosio::tcp_server::launcher::operator()<boost::corosio::io_context::executor_type>(boost::corosio::io_context::executor_type const&, boost::capy::task<void>) :548 10x 100.0% 58.0% boost::corosio::tcp_server::launcher::operator()<boost::corosio::io_context::executor_type>(boost::corosio::io_context::executor_type const&, boost::capy::task<void>)::guard_t::~guard_t() :563 9x 91.7% 67.0% boost::corosio::tcp_server::tcp_server<boost::corosio::io_context, boost::corosio::io_context::executor_type>(boost::corosio::io_context&, boost::corosio::io_context::executor_type) :600 17x 100.0% 100.0% void boost::corosio::tcp_server::set_workers<std::vector<std::unique_ptr<boost::corosio::tcp_server::worker_base, std::default_delete<boost::corosio::tcp_server::worker_base> >, std::allocator<std::unique_ptr<boost::corosio::tcp_server::worker_base, std::default_delete<boost::corosio::tcp_server::worker_base> > > > >(std::vector<std::unique_ptr<boost::corosio::tcp_server::worker_base, std::default_delete<boost::corosio::tcp_server::worker_base> >, std::allocator<std::unique_ptr<boost::corosio::tcp_server::worker_base, std::default_delete<boost::corosio::tcp_server::worker_base> > > >&&) :664 17x 100.0% 100.0% boost::corosio::tcp_server::set_workers<std::vector<std::unique_ptr<boost::corosio::tcp_server::worker_base, std::default_delete<boost::corosio::tcp_server::worker_base> >, std::allocator<std::unique_ptr<boost::corosio::tcp_server::worker_base, std::default_delete<boost::corosio::tcp_server::worker_base> > > > >(std::vector<std::unique_ptr<boost::corosio::tcp_server::worker_base, std::default_delete<boost::corosio::tcp_server::worker_base> >, std::allocator<std::unique_ptr<boost::corosio::tcp_server::worker_base, std::default_delete<boost::corosio::tcp_server::worker_base> > > >&&)::{lambda(void*)#1}::operator()(void*) const :676 17x 100.0% 100.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_TCP_SERVER_HPP
11 #define BOOST_COROSIO_TCP_SERVER_HPP
12
13 #include <boost/corosio/detail/config.hpp>
14 #include <boost/corosio/detail/except.hpp>
15 #include <boost/corosio/tcp_acceptor.hpp>
16 #include <boost/corosio/tcp_socket.hpp>
17 #include <boost/corosio/io_context.hpp>
18 #include <boost/corosio/endpoint.hpp>
19 #include <boost/capy/task.hpp>
20 #include <boost/capy/concept/execution_context.hpp>
21 #include <boost/capy/concept/io_awaitable.hpp>
22 #include <boost/capy/concept/executor.hpp>
23 #include <boost/capy/ex/any_executor.hpp>
24 #include <boost/capy/ex/frame_allocator.hpp>
25 #include <boost/capy/ex/io_env.hpp>
26 #include <boost/capy/ex/run_async.hpp>
27
28 #include <coroutine>
29 #include <memory>
30 #include <ranges>
31 #include <vector>
32
33 namespace boost::corosio {
34
35 #ifdef _MSC_VER
36 #pragma warning(push)
37 #pragma warning(disable : 4251) // class needs to have dll-interface
38 #endif
39
40 /** TCP server with pooled workers.
41
42 This class manages a pool of reusable worker objects that handle
43 incoming connections. When a connection arrives, an idle worker
44 is dispatched to handle it. After the connection completes, the
45 worker returns to the pool for reuse, avoiding allocation overhead
46 per connection.
47
48 Workers are set via @ref set_workers as a forward range of
49 pointer-like objects (e.g., `unique_ptr<worker_base>`). The server
50 takes ownership of the container via type erasure.
51
52 @par Thread Safety
53 Distinct objects: Safe.
54 Shared objects: Unsafe.
55
56 @par Lifecycle
57 The server operates in three states:
58
59 - **Stopped**: Initial state, or after @ref join completes.
60 - **Running**: After @ref start, actively accepting connections.
61 - **Stopping**: After @ref stop, draining active work.
62
63 State transitions:
64 @code
65 [Stopped] --start()--> [Running] --stop()--> [Stopping] --join()--> [Stopped]
66 @endcode
67
68 @par Running the Server
69 @code
70 io_context ioc;
71 tcp_server srv(ioc, ioc.get_executor());
72 srv.set_workers(make_workers(ioc, 100));
73 srv.bind(endpoint{address_v4::any(), 8080});
74 srv.start();
75 ioc.run(); // Blocks until all work completes
76 @endcode
77
78 @par Graceful Shutdown
79 To shut down gracefully, call @ref stop then drain the io_context:
80 @code
81 // From a signal handler or timer callback:
82 srv.stop();
83
84 // ioc.run() returns after pending work drains.
85 // Then from the thread that called ioc.run():
86 srv.join(); // Wait for accept loops to finish
87 @endcode
88
89 @par Restart After Stop
90 The server can be restarted after a complete shutdown cycle.
91 You must drain the io_context and call @ref join before restarting:
92 @code
93 srv.start();
94 ioc.run_for( 10s ); // Run for a while
95 srv.stop(); // Signal shutdown
96 ioc.run(); // REQUIRED: drain pending completions
97 srv.join(); // REQUIRED: wait for accept loops
98
99 // Now safe to restart
100 srv.start();
101 ioc.run();
102 @endcode
103
104 @par WARNING: What NOT to Do
105 - Do NOT call @ref join from inside a worker coroutine (deadlock).
106 - Do NOT call @ref join from a thread running `ioc.run()` (deadlock).
107 - Do NOT call @ref start without completing @ref join after @ref stop.
108 - Do NOT call `ioc.stop()` for graceful shutdown; use @ref stop instead.
109
110 @par Example
111 @code
112 class my_worker : public tcp_server::worker_base
113 {
114 corosio::tcp_socket sock_;
115 capy::any_executor ex_;
116 public:
117 my_worker(io_context& ctx)
118 : sock_(ctx)
119 , ex_(ctx.get_executor())
120 {
121 }
122
123 corosio::tcp_socket& socket() override { return sock_; }
124
125 void run(launcher launch) override
126 {
127 launch(ex_, [](corosio::tcp_socket* sock) -> capy::task<>
128 {
129 // handle connection using sock
130 co_return;
131 }(&sock_));
132 }
133 };
134
135 auto make_workers(io_context& ctx, int n)
136 {
137 std::vector<std::unique_ptr<tcp_server::worker_base>> v;
138 v.reserve(n);
139 for(int i = 0; i < n; ++i)
140 v.push_back(std::make_unique<my_worker>(ctx));
141 return v;
142 }
143
144 io_context ioc;
145 tcp_server srv(ioc, ioc.get_executor());
146 srv.set_workers(make_workers(ioc, 100));
147 @endcode
148
149 @see worker_base, set_workers, launcher
150 */
151 class BOOST_COROSIO_DECL tcp_server
152 {
153 public:
154 class worker_base; ///< Abstract base for connection handlers.
155 class launcher; ///< Move-only handle to launch worker coroutines.
156
157 private:
158 struct waiter
159 {
160 waiter* next;
161 std::coroutine_handle<> h;
162 detail::continuation_op cont_op;
163 worker_base* w;
164 };
165
166 struct impl;
167
168 static impl* make_impl(capy::execution_context& ctx);
169
170 impl* impl_;
171 capy::any_executor ex_;
172 waiter* waiters_ = nullptr;
173 worker_base* idle_head_ = nullptr; // Forward list: available workers
174 worker_base* active_head_ =
175 nullptr; // Doubly linked: workers handling connections
176 worker_base* active_tail_ = nullptr; // Tail for O(1) push_back
177 std::size_t active_accepts_ = 0; // Number of active do_accept coroutines
178 std::shared_ptr<void> storage_; // Owns the worker container (type-erased)
179 bool running_ = false;
180
181 // Idle list (forward/singly linked) - push front, pop front
182 77x void idle_push(worker_base* w) noexcept
183 {
184 77x w->next_ = idle_head_;
185 77x idle_head_ = w;
186 77x }
187
188 18x worker_base* idle_pop() noexcept
189 {
190 18x auto* w = idle_head_;
191 18x if (w)
192 18x idle_head_ = w->next_;
193 18x return w;
194 }
195
196 21x bool idle_empty() const noexcept
197 {
198 21x return idle_head_ == nullptr;
199 }
200
201 // Active list (doubly linked) - push back, remove anywhere
202 9x void active_push(worker_base* w) noexcept
203 {
204 9x w->next_ = nullptr;
205 9x w->prev_ = active_tail_;
206 9x if (active_tail_)
207 2x active_tail_->next_ = w;
208 else
209 7x active_head_ = w;
210 9x active_tail_ = w;
211 9x }
212
213 21x void active_remove(worker_base* w) noexcept
214 {
215 // Skip if not in active list (e.g., after failed accept)
216 21x if (w != active_head_ && w->prev_ == nullptr)
217 12x return;
218 9x if (w->prev_)
219 2x w->prev_->next_ = w->next_;
220 else
221 7x active_head_ = w->next_;
222 9x if (w->next_)
223 1x w->next_->prev_ = w->prev_;
224 else
225 8x active_tail_ = w->prev_;
226 9x w->prev_ = nullptr; // Mark as not in active list
227 }
228
229 template<capy::Executor Ex>
230 struct launch_wrapper
231 {
232 struct promise_type
233 {
234 Ex ex; // Executor stored directly in frame (outlives child tasks)
235 capy::io_env env_;
236
237 // For regular coroutines: first arg is executor, second is stop token
238 template<class E, class S, class... Args>
239 requires capy::Executor<std::decay_t<E>>
240 promise_type(E e, S s, Args&&...)
241 : ex(std::move(e))
242 , env_{
243 capy::executor_ref(ex), std::move(s),
244 capy::get_current_frame_allocator()}
245 {
246 }
247
248 // For lambda coroutines: first arg is closure, second is executor, third is stop token
249 template<class Closure, class E, class S, class... Args>
250 requires(!capy::Executor<std::decay_t<Closure>> &&
251 capy::Executor<std::decay_t<E>>)
252 9x promise_type(Closure&&, E e, S s, Args&&...)
253 9x : ex(std::move(e))
254 9x , env_{
255 9x capy::executor_ref(ex), std::move(s),
256 9x capy::get_current_frame_allocator()}
257 {
258 9x }
259
260 9x launch_wrapper get_return_object() noexcept
261 {
262 return {
263 9x std::coroutine_handle<promise_type>::from_promise(*this)};
264 }
265 9x std::suspend_always initial_suspend() noexcept
266 {
267 9x return {};
268 }
269 9x std::suspend_never final_suspend() noexcept
270 {
271 9x return {};
272 }
273 9x void return_void() noexcept {}
274 void unhandled_exception()
275 {
276 std::terminate();
277 }
278
279 // Inject io_env for IoAwaitable
280 template<capy::IoAwaitable Awaitable>
281 18x auto await_transform(Awaitable&& a)
282 {
283 using AwaitableT = std::decay_t<Awaitable>;
284 struct adapter
285 {
286 AwaitableT aw;
287 capy::io_env const* env;
288
289 bool await_ready()
290 {
291 return aw.await_ready();
292 }
293 decltype(auto) await_resume()
294 {
295 return aw.await_resume();
296 }
297
298 auto await_suspend(std::coroutine_handle<promise_type> h)
299 {
300 return aw.await_suspend(h, env);
301 }
302 };
303 36x return adapter{std::forward<Awaitable>(a), &env_};
304 18x }
305 };
306
307 std::coroutine_handle<promise_type> h;
308
309 9x launch_wrapper(std::coroutine_handle<promise_type> handle) noexcept
310 9x : h(handle)
311 {
312 9x }
313
314 9x ~launch_wrapper()
315 {
316 9x if (h)
317 h.destroy();
318 9x }
319
320 launch_wrapper(launch_wrapper&& o) noexcept
321 : h(std::exchange(o.h, nullptr))
322 {
323 }
324
325 launch_wrapper(launch_wrapper const&) = delete;
326 launch_wrapper& operator=(launch_wrapper const&) = delete;
327 launch_wrapper& operator=(launch_wrapper&&) = delete;
328 };
329
330 // Named functor to avoid incomplete lambda type in coroutine promise
331 template<class Executor>
332 struct launch_coro
333 {
334 9x launch_wrapper<Executor> operator()(
335 Executor,
336 std::stop_token,
337 tcp_server* self,
338 capy::task<void> t,
339 worker_base* wp)
340 {
341 // Executor and stop token stored in promise via constructor
342 co_await std::move(t);
343 co_await self->push(*wp); // worker goes back to idle list
344 18x }
345 };
346
347 class push_awaitable
348 {
349 tcp_server& self_;
350 worker_base& w_;
351 detail::continuation_op cont_op_;
352
353 public:
354 19x push_awaitable(tcp_server& self, worker_base& w) noexcept
355 19x : self_(self)
356 19x , w_(w)
357 {
358 19x }
359
360 19x bool await_ready() const noexcept
361 {
362 19x return false;
363 }
364
365 std::coroutine_handle<>
366 19x await_suspend(std::coroutine_handle<> h, capy::io_env const*) noexcept
367 {
368 // Symmetric transfer to server's executor
369 19x cont_op_.cont.h = h;
370 19x return self_.ex_.dispatch(cont_op_.cont);
371 }
372
373 19x void await_resume() noexcept
374 {
375 // Running on server executor - safe to modify lists
376 // Remove from active (if present), then wake waiter or add to idle
377 19x self_.active_remove(&w_);
378 19x if (self_.waiters_)
379 {
380 3x auto* wait = self_.waiters_;
381 3x self_.waiters_ = wait->next;
382 3x wait->w = &w_;
383 3x wait->cont_op.cont.h = wait->h;
384 3x self_.ex_.post(wait->cont_op.cont);
385 }
386 else
387 {
388 16x self_.idle_push(&w_);
389 }
390 19x }
391 };
392
393 class pop_awaitable
394 {
395 tcp_server& self_;
396 waiter wait_;
397
398 public:
399 21x pop_awaitable(tcp_server& self) noexcept : self_(self), wait_{} {}
400
401 21x bool await_ready() const noexcept
402 {
403 21x return !self_.idle_empty();
404 }
405
406 bool
407 3x await_suspend(std::coroutine_handle<> h, capy::io_env const*) noexcept
408 {
409 // Running on server executor (do_accept runs there)
410 3x wait_.h = h;
411 3x wait_.w = nullptr;
412 3x wait_.next = self_.waiters_;
413 3x self_.waiters_ = &wait_;
414 3x return true;
415 }
416
417 21x worker_base& await_resume() noexcept
418 {
419 // Running on server executor
420 21x if (wait_.w)
421 3x return *wait_.w; // Woken by push_awaitable
422 18x return *self_.idle_pop();
423 }
424 };
425
426 19x push_awaitable push(worker_base& w)
427 {
428 19x return push_awaitable{*this, w};
429 }
430
431 // Synchronous version for destructor/guard paths
432 // Must be called from server executor context
433 2x void push_sync(worker_base& w) noexcept
434 {
435 2x active_remove(&w);
436 2x if (waiters_)
437 {
438 auto* wait = waiters_;
439 waiters_ = wait->next;
440 wait->w = &w;
441 wait->cont_op.cont.h = wait->h;
442 ex_.post(wait->cont_op.cont);
443 }
444 else
445 {
446 2x idle_push(&w);
447 }
448 2x }
449
450 21x pop_awaitable pop()
451 {
452 21x return pop_awaitable{*this};
453 }
454
455 capy::task<void> do_accept(tcp_acceptor& acc);
456
457 public:
458 /** Abstract base class for connection handlers.
459
460 Derive from this class to implement custom connection handling.
461 Each worker owns a socket and is reused across multiple
462 connections to avoid per-connection allocation.
463
464 @see tcp_server, launcher
465 */
466 class BOOST_COROSIO_DECL worker_base
467 {
468 // Ordered largest to smallest for optimal packing
469 std::stop_source stop_; // ~16 bytes
470 worker_base* next_ = nullptr; // 8 bytes - used by idle and active lists
471 worker_base* prev_ = nullptr; // 8 bytes - used only by active list
472
473 friend class tcp_server;
474
475 public:
476 /// Construct a worker.
477 worker_base();
478
479 /// Destroy the worker.
480 virtual ~worker_base();
481
482 /** Handle an accepted connection.
483
484 Called when this worker is dispatched to handle a new
485 connection. The implementation must invoke the launcher
486 exactly once to start the handling coroutine.
487
488 @param launch Handle to launch the connection coroutine.
489 */
490 virtual void run(launcher launch) = 0;
491
492 /// Return the socket used for connections.
493 virtual corosio::tcp_socket& socket() = 0;
494 };
495
496 /** Move-only handle to launch a worker coroutine.
497
498 Passed to @ref worker_base::run to start the connection-handling
499 coroutine. The launcher ensures the worker returns to the idle
500 pool when the coroutine completes or if launching fails.
501
502 The launcher must be invoked exactly once via `operator()`.
503 If destroyed without invoking, the worker is returned to the
504 idle pool automatically.
505
506 @see worker_base::run
507 */
508 class BOOST_COROSIO_DECL launcher
509 {
510 tcp_server* srv_;
511 worker_base* w_;
512
513 friend class tcp_server;
514
515 11x launcher(tcp_server& srv, worker_base& w) noexcept : srv_(&srv), w_(&w)
516 {
517 11x }
518
519 public:
520 /// Return the worker to the pool if not launched.
521 11x ~launcher()
522 {
523 11x if (w_)
524 2x srv_->push_sync(*w_);
525 11x }
526
527 launcher(launcher&& o) noexcept
528 : srv_(o.srv_)
529 , w_(std::exchange(o.w_, nullptr))
530 {
531 }
532 launcher(launcher const&) = delete;
533 launcher& operator=(launcher const&) = delete;
534 launcher& operator=(launcher&&) = delete;
535
536 /** Launch the connection-handling coroutine.
537
538 Starts the given coroutine on the specified executor. When
539 the coroutine completes, the worker is automatically returned
540 to the idle pool.
541
542 @param ex The executor to run the coroutine on.
543 @param task The coroutine to execute.
544
545 @throws std::logic_error If this launcher was already invoked.
546 */
547 template<class Executor>
548 10x void operator()(Executor const& ex, capy::task<void> task)
549 {
550 10x if (!w_)
551 1x detail::throw_logic_error(); // launcher already invoked
552
553 9x auto* w = std::exchange(w_, nullptr);
554
555 // Worker is being dispatched - add to active list
556 9x srv_->active_push(w);
557
558 // Return worker to pool if coroutine setup throws
559 struct guard_t
560 {
561 tcp_server* srv;
562 worker_base* w;
563 9x ~guard_t()
564 {
565 9x if (w)
566 srv->push_sync(*w);
567 9x }
568 9x } guard{srv_, w};
569
570 // Reset worker's stop source for this connection
571 9x w->stop_ = {};
572 9x auto st = w->stop_.get_token();
573
574 9x auto wrapper =
575 9x launch_coro<Executor>{}(ex, st, srv_, std::move(task), w);
576
577 // Executor and stop token stored in promise via constructor
578 9x ex.post(std::exchange(wrapper.h, nullptr)); // Release before post
579 9x guard.w = nullptr; // Success - dismiss guard
580 9x }
581 };
582
583 /** Construct a TCP server.
584
585 @tparam Ctx Execution context type satisfying ExecutionContext.
586 @tparam Ex Executor type satisfying Executor.
587
588 @param ctx The execution context for socket operations.
589 @param ex The executor for dispatching coroutines.
590
591 @par Example
592 @code
593 tcp_server srv(ctx, ctx.get_executor());
594 srv.set_workers(make_workers(ctx, 100));
595 srv.bind(endpoint{...});
596 srv.start();
597 @endcode
598 */
599 template<capy::ExecutionContext Ctx, capy::Executor Ex>
600 17x tcp_server(Ctx& ctx, Ex ex) : impl_(make_impl(ctx))
601 17x , ex_(std::move(ex))
602 {
603 17x }
604
605 public:
606 /// Destroy the server, stopping all accept loops.
607 ~tcp_server();
608
609 tcp_server(tcp_server const&) = delete;
610 tcp_server& operator=(tcp_server const&) = delete;
611
612 /** Move construct from another server.
613
614 @param o The source server. After the move, @p o is
615 in a valid but unspecified state.
616 */
617 tcp_server(tcp_server&& o) noexcept;
618
619 /** Move assign from another server.
620
621 @param o The source server. After the move, @p o is
622 in a valid but unspecified state.
623
624 @return `*this`.
625 */
626 tcp_server& operator=(tcp_server&& o) noexcept;
627
628 /** Bind to a local endpoint.
629
630 Creates an acceptor listening on the specified endpoint.
631 Multiple endpoints can be bound by calling this method
632 multiple times before @ref start.
633
634 @param ep The local endpoint to bind to.
635
636 @return The error code if binding fails.
637 */
638 std::error_code bind(endpoint ep);
639
640 /** Set the worker pool.
641
642 Replaces any existing workers with the given range. Any
643 previous workers are released and the idle/active lists
644 are cleared before populating with new workers.
645
646 @tparam Range Forward range of pointer-like objects to worker_base.
647
648 @param workers Range of workers to manage. Each element must
649 support `std::to_address()` yielding `worker_base*`.
650
651 @par Example
652 @code
653 std::vector<std::unique_ptr<my_worker>> workers;
654 for(int i = 0; i < 100; ++i)
655 workers.push_back(std::make_unique<my_worker>(ctx));
656 srv.set_workers(std::move(workers));
657 @endcode
658 */
659 template<std::ranges::forward_range Range>
660 requires std::convertible_to<
661 decltype(std::to_address(
662 std::declval<std::ranges::range_value_t<Range>&>())),
663 worker_base*>
664 17x void set_workers(Range&& workers)
665 {
666 // Clear existing state
667 17x storage_.reset();
668 17x idle_head_ = nullptr;
669 17x active_head_ = nullptr;
670 17x active_tail_ = nullptr;
671
672 // Take ownership and populate idle list
673 using StorageType = std::decay_t<Range>;
674 17x auto* p = new StorageType(std::forward<Range>(workers));
675 17x storage_ = std::shared_ptr<void>(
676 17x p, [](void* ptr) { delete static_cast<StorageType*>(ptr); });
677 76x for (auto&& elem : *static_cast<StorageType*>(p))
678 59x idle_push(std::to_address(elem));
679 17x }
680
681 /** Start accepting connections.
682
683 Launches accept loops for all bound endpoints. Incoming
684 connections are dispatched to idle workers from the pool.
685
686 Calling `start()` on an already-running server has no effect.
687
688 @par Preconditions
689 - At least one endpoint bound via @ref bind.
690 - Workers provided to the constructor.
691 - If restarting, @ref join must have completed first.
692
693 @par Effects
694 Creates one accept coroutine per bound endpoint. Each coroutine
695 runs on the server's executor, waiting for connections and
696 dispatching them to idle workers.
697
698 @par Restart Sequence
699 To restart after stopping, complete the full shutdown cycle:
700 @code
701 srv.start();
702 ioc.run_for( 1s );
703 srv.stop(); // 1. Signal shutdown
704 ioc.run(); // 2. Drain remaining completions
705 srv.join(); // 3. Wait for accept loops
706
707 // Now safe to restart
708 srv.start();
709 ioc.run();
710 @endcode
711
712 @par Thread Safety
713 Not thread safe.
714
715 @throws std::logic_error If a previous session has not been
716 joined (accept loops still active).
717 */
718 void start();
719
720 /** Return the local endpoint for the i-th bound port.
721
722 @param index Zero-based index into the list of bound ports.
723
724 @return The local endpoint, or a default-constructed endpoint
725 if @p index is out of range or the acceptor is not open.
726 */
727 endpoint local_endpoint(std::size_t index = 0) const noexcept;
728
729 /** Stop accepting connections.
730
731 Signals all listening ports to stop accepting new connections
732 and requests cancellation of active workers via their stop tokens.
733
734 This function returns immediately; it does not wait for workers
735 to finish. Pending I/O operations complete asynchronously.
736
737 Calling `stop()` on a non-running server has no effect.
738
739 @par Effects
740 - Closes all acceptors (pending accepts complete with error).
741 - Requests stop on each active worker's stop token.
742 - Workers observing their stop token should exit promptly.
743
744 @par Postconditions
745 No new connections will be accepted. Active workers continue
746 until they observe their stop token or complete naturally.
747
748 @par What Happens Next
749 After calling `stop()`:
750 1. Let `ioc.run()` return (drains pending completions).
751 2. Call @ref join to wait for accept loops to finish.
752 3. Only then is it safe to restart or destroy the server.
753
754 @par Thread Safety
755 Not thread safe.
756
757 @see join, start
758 */
759 void stop();
760
761 /** Block until all accept loops complete.
762
763 Blocks the calling thread until all accept coroutines launched
764 by @ref start have finished executing. This synchronizes the
765 shutdown sequence, ensuring the server is fully stopped before
766 restarting or destroying it.
767
768 @par Preconditions
769 @ref stop has been called and `ioc.run()` has returned.
770
771 @par Postconditions
772 All accept loops have completed. The server is in the stopped
773 state and may be restarted via @ref start.
774
775 @par Example (Correct Usage)
776 @code
777 // main thread
778 srv.start();
779 ioc.run(); // Blocks until work completes
780 srv.join(); // Safe: called after ioc.run() returns
781 @endcode
782
783 @par WARNING: Deadlock Scenarios
784 Calling `join()` from the wrong context causes deadlock:
785
786 @code
787 // WRONG: calling join() from inside a worker coroutine
788 void run( launcher launch ) override
789 {
790 launch( ex, [this]() -> capy::task<>
791 {
792 srv_.join(); // DEADLOCK: blocks the executor
793 co_return;
794 }());
795 }
796
797 // WRONG: calling join() while ioc.run() is still active
798 std::thread t( [&]{ ioc.run(); } );
799 srv.stop();
800 srv.join(); // DEADLOCK: ioc.run() still running in thread t
801 @endcode
802
803 @par Thread Safety
804 May be called from any thread, but will deadlock if called
805 from within the io_context event loop or from a worker coroutine.
806
807 @see stop, start
808 */
809 void join();
810
811 private:
812 capy::task<> do_stop();
813 };
814
815 #ifdef _MSC_VER
816 #pragma warning(pop)
817 #endif
818
819 } // namespace boost::corosio
820
821 #endif
822