TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Michael Vandeberg
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_RANDOM_ACCESS_FILE_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_RANDOM_ACCESS_FILE_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_POSIX
16 :
17 : #include <boost/corosio/native/detail/posix/posix_random_access_file.hpp>
18 : #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
19 : #include <boost/corosio/detail/random_access_file_service.hpp>
20 : #include <boost/corosio/detail/thread_pool.hpp>
21 :
22 : #include <limits>
23 : #include <mutex>
24 : #include <unordered_map>
25 :
26 : namespace boost::corosio::detail {
27 :
28 : /** Random-access file service for POSIX backends. */
29 : class BOOST_COROSIO_DECL posix_random_access_file_service final
30 : : public random_access_file_service
31 : {
32 : public:
33 HIT 1018 : posix_random_access_file_service(
34 : capy::execution_context& ctx, scheduler& sched)
35 2036 : : sched_(&sched)
36 1018 : , pool_(get_or_create_pool(ctx))
37 : {
38 1018 : }
39 :
40 2036 : ~posix_random_access_file_service() override = default;
41 :
42 : posix_random_access_file_service(
43 : posix_random_access_file_service const&) = delete;
44 : posix_random_access_file_service& operator=(
45 : posix_random_access_file_service const&) = delete;
46 :
47 43 : io_object::implementation* construct() override
48 : {
49 43 : auto ptr = std::make_shared<posix_random_access_file>(*this);
50 43 : auto* impl = ptr.get();
51 :
52 : {
53 43 : std::lock_guard<std::mutex> lock(mutex_);
54 43 : file_list_.push_back(impl);
55 43 : file_ptrs_[impl] = std::move(ptr);
56 43 : }
57 :
58 43 : return impl;
59 43 : }
60 :
61 43 : void destroy(io_object::implementation* p) override
62 : {
63 43 : auto& impl = static_cast<posix_random_access_file&>(*p);
64 43 : impl.cancel();
65 43 : impl.close_file();
66 43 : destroy_impl(impl);
67 43 : }
68 :
69 77 : void close(io_object::handle& h) override
70 : {
71 77 : if (h.get())
72 : {
73 77 : auto& impl = static_cast<posix_random_access_file&>(*h.get());
74 77 : impl.cancel();
75 77 : impl.close_file();
76 : }
77 77 : }
78 :
79 36 : std::error_code open_file(
80 : random_access_file::implementation& impl,
81 : std::filesystem::path const& path,
82 : file_base::flags mode) override
83 : {
84 36 : if (static_cast<reactor_scheduler const*>(sched_)->is_single_threaded())
85 MIS 0 : return std::make_error_code(std::errc::operation_not_supported);
86 HIT 36 : return static_cast<posix_random_access_file&>(impl).open_file(
87 36 : path, mode);
88 : }
89 :
90 1018 : void shutdown() override
91 : {
92 1018 : std::lock_guard<std::mutex> lock(mutex_);
93 1018 : for (auto* impl = file_list_.pop_front(); impl != nullptr;
94 MIS 0 : impl = file_list_.pop_front())
95 : {
96 0 : impl->cancel();
97 0 : impl->close_file();
98 : }
99 HIT 1018 : file_ptrs_.clear();
100 1018 : }
101 :
102 43 : void destroy_impl(posix_random_access_file& impl)
103 : {
104 43 : std::lock_guard<std::mutex> lock(mutex_);
105 43 : file_list_.remove(&impl);
106 43 : file_ptrs_.erase(&impl);
107 43 : }
108 :
109 150 : void post(scheduler_op* op)
110 : {
111 150 : sched_->post(op);
112 150 : }
113 :
114 : void work_started() noexcept
115 : {
116 : sched_->work_started();
117 : }
118 :
119 : void work_finished() noexcept
120 : {
121 : sched_->work_finished();
122 : }
123 :
124 150 : thread_pool& pool() noexcept
125 : {
126 150 : return pool_;
127 : }
128 :
129 : private:
130 1018 : static thread_pool& get_or_create_pool(capy::execution_context& ctx)
131 : {
132 1018 : auto* p = ctx.find_service<thread_pool>();
133 1018 : if (p)
134 1018 : return *p;
135 MIS 0 : return ctx.make_service<thread_pool>();
136 : }
137 :
138 : scheduler* sched_;
139 : thread_pool& pool_;
140 : std::mutex mutex_;
141 : intrusive_list<posix_random_access_file> file_list_;
142 : std::unordered_map<
143 : posix_random_access_file*,
144 : std::shared_ptr<posix_random_access_file>>
145 : file_ptrs_;
146 : };
147 :
148 : /** Get or create the random-access file service for the given context. */
149 : inline posix_random_access_file_service&
150 HIT 1018 : get_random_access_file_service(capy::execution_context& ctx, scheduler& sched)
151 : {
152 1018 : return ctx.make_service<posix_random_access_file_service>(sched);
153 : }
154 :
155 : // ---------------------------------------------------------------------------
156 : // posix_random_access_file inline implementations (require complete service)
157 : // ---------------------------------------------------------------------------
158 :
159 : inline std::coroutine_handle<>
160 138 : posix_random_access_file::read_some_at(
161 : std::uint64_t offset,
162 : std::coroutine_handle<> h,
163 : capy::executor_ref ex,
164 : buffer_param param,
165 : std::stop_token token,
166 : std::error_code* ec,
167 : std::size_t* bytes_out)
168 : {
169 138 : capy::mutable_buffer bufs[max_buffers];
170 138 : auto count = param.copy_to(bufs, max_buffers);
171 :
172 138 : if (count == 0)
173 : {
174 1 : *ec = {};
175 1 : *bytes_out = 0;
176 1 : return h;
177 : }
178 :
179 137 : auto* op = new raf_op();
180 137 : op->is_read = true;
181 137 : op->offset = offset;
182 :
183 137 : op->iovec_count = static_cast<int>(count);
184 274 : for (int i = 0; i < op->iovec_count; ++i)
185 : {
186 137 : op->iovecs[i].iov_base = bufs[i].data();
187 137 : op->iovecs[i].iov_len = bufs[i].size();
188 : }
189 :
190 137 : op->h = h;
191 137 : op->ex = ex;
192 137 : op->ec_out = ec;
193 137 : op->bytes_out = bytes_out;
194 137 : op->file_ = this;
195 137 : op->file_ref = this->shared_from_this();
196 137 : op->start(token);
197 :
198 137 : op->ex.on_work_started();
199 :
200 : {
201 137 : std::lock_guard<std::mutex> lock(ops_mutex_);
202 137 : outstanding_ops_.push_back(op);
203 137 : }
204 :
205 137 : static_cast<pool_work_item*>(op)->func_ = &raf_op::do_work;
206 137 : if (!svc_.pool().post(static_cast<pool_work_item*>(op)))
207 : {
208 MIS 0 : op->cancelled.store(true, std::memory_order_release);
209 0 : svc_.post(static_cast<scheduler_op*>(op));
210 : }
211 HIT 137 : return std::noop_coroutine();
212 : }
213 :
214 : inline std::coroutine_handle<>
215 14 : posix_random_access_file::write_some_at(
216 : std::uint64_t offset,
217 : std::coroutine_handle<> h,
218 : capy::executor_ref ex,
219 : buffer_param param,
220 : std::stop_token token,
221 : std::error_code* ec,
222 : std::size_t* bytes_out)
223 : {
224 14 : capy::mutable_buffer bufs[max_buffers];
225 14 : auto count = param.copy_to(bufs, max_buffers);
226 :
227 14 : if (count == 0)
228 : {
229 1 : *ec = {};
230 1 : *bytes_out = 0;
231 1 : return h;
232 : }
233 :
234 13 : auto* op = new raf_op();
235 13 : op->is_read = false;
236 13 : op->offset = offset;
237 :
238 13 : op->iovec_count = static_cast<int>(count);
239 26 : for (int i = 0; i < op->iovec_count; ++i)
240 : {
241 13 : op->iovecs[i].iov_base = bufs[i].data();
242 13 : op->iovecs[i].iov_len = bufs[i].size();
243 : }
244 :
245 13 : op->h = h;
246 13 : op->ex = ex;
247 13 : op->ec_out = ec;
248 13 : op->bytes_out = bytes_out;
249 13 : op->file_ = this;
250 13 : op->file_ref = this->shared_from_this();
251 13 : op->start(token);
252 :
253 13 : op->ex.on_work_started();
254 :
255 : {
256 13 : std::lock_guard<std::mutex> lock(ops_mutex_);
257 13 : outstanding_ops_.push_back(op);
258 13 : }
259 :
260 13 : static_cast<pool_work_item*>(op)->func_ = &raf_op::do_work;
261 13 : if (!svc_.pool().post(static_cast<pool_work_item*>(op)))
262 : {
263 MIS 0 : op->cancelled.store(true, std::memory_order_release);
264 0 : svc_.post(static_cast<scheduler_op*>(op));
265 : }
266 HIT 13 : return std::noop_coroutine();
267 : }
268 :
269 : // -- raf_op thread-pool work function --
270 :
271 : inline void
272 150 : posix_random_access_file::raf_op::do_work(pool_work_item* w) noexcept
273 : {
274 150 : auto* op = static_cast<raf_op*>(w);
275 150 : auto* self = op->file_;
276 :
277 150 : if (op->cancelled.load(std::memory_order_acquire))
278 : {
279 1 : op->errn = ECANCELED;
280 1 : op->bytes_transferred = 0;
281 : }
282 298 : else if (op->offset >
283 149 : static_cast<std::uint64_t>(std::numeric_limits<off_t>::max()))
284 : {
285 MIS 0 : op->errn = EOVERFLOW;
286 0 : op->bytes_transferred = 0;
287 : }
288 : else
289 : {
290 : ssize_t n;
291 HIT 149 : if (op->is_read)
292 : {
293 : do
294 : {
295 272 : n = ::preadv(self->fd_, op->iovecs, op->iovec_count,
296 136 : static_cast<off_t>(op->offset));
297 : }
298 136 : while (n < 0 && errno == EINTR);
299 : }
300 : else
301 : {
302 : do
303 : {
304 26 : n = ::pwritev(self->fd_, op->iovecs, op->iovec_count,
305 13 : static_cast<off_t>(op->offset));
306 : }
307 13 : while (n < 0 && errno == EINTR);
308 : }
309 :
310 149 : if (n >= 0)
311 : {
312 149 : op->errn = 0;
313 149 : op->bytes_transferred = static_cast<std::size_t>(n);
314 : }
315 : else
316 : {
317 MIS 0 : op->errn = errno;
318 0 : op->bytes_transferred = 0;
319 : }
320 : }
321 :
322 HIT 150 : self->svc_.post(static_cast<scheduler_op*>(op));
323 150 : }
324 :
325 : } // namespace boost::corosio::detail
326 :
327 : #endif // BOOST_COROSIO_POSIX
328 :
329 : #endif // BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_RANDOM_ACCESS_FILE_SERVICE_HPP
|