91.11% Lines (123/135) 96.77% Functions (30/31)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) 2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3   // 3   //
4   // Distributed under the Boost Software License, Version 1.0. (See accompanying 4   // Distributed under the Boost Software License, Version 1.0. (See accompanying
5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6   // 6   //
7   // Official repository: https://github.com/cppalliance/capy 7   // Official repository: https://github.com/cppalliance/capy
8   // 8   //
9   9  
10   #ifndef BOOST_CAPY_IO_ANY_READ_SOURCE_HPP 10   #ifndef BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
11   #define BOOST_CAPY_IO_ANY_READ_SOURCE_HPP 11   #define BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
12   12  
13   #include <boost/capy/detail/config.hpp> 13   #include <boost/capy/detail/config.hpp>
14   #include <boost/capy/detail/await_suspend_helper.hpp> 14   #include <boost/capy/detail/await_suspend_helper.hpp>
15   #include <boost/capy/buffers.hpp> 15   #include <boost/capy/buffers.hpp>
16   #include <boost/capy/buffers/buffer_array.hpp> 16   #include <boost/capy/buffers/buffer_array.hpp>
17   #include <boost/capy/buffers/buffer_param.hpp> 17   #include <boost/capy/buffers/buffer_param.hpp>
18   #include <boost/capy/concept/io_awaitable.hpp> 18   #include <boost/capy/concept/io_awaitable.hpp>
19   #include <boost/capy/concept/read_source.hpp> 19   #include <boost/capy/concept/read_source.hpp>
20   #include <boost/capy/ex/io_env.hpp> 20   #include <boost/capy/ex/io_env.hpp>
21   #include <boost/capy/io_result.hpp> 21   #include <boost/capy/io_result.hpp>
22   #include <boost/capy/io_task.hpp> 22   #include <boost/capy/io_task.hpp>
23   23  
24   #include <concepts> 24   #include <concepts>
25   #include <coroutine> 25   #include <coroutine>
26   #include <cstddef> 26   #include <cstddef>
27   #include <exception> 27   #include <exception>
28   #include <new> 28   #include <new>
29   #include <span> 29   #include <span>
30   #include <stop_token> 30   #include <stop_token>
31   #include <system_error> 31   #include <system_error>
32   #include <utility> 32   #include <utility>
33   33  
34   namespace boost { 34   namespace boost {
35   namespace capy { 35   namespace capy {
36   36  
37   /** Type-erased wrapper for any ReadSource. 37   /** Type-erased wrapper for any ReadSource.
38   38  
39   This class provides type erasure for any type satisfying the 39   This class provides type erasure for any type satisfying the
40   @ref ReadSource concept, enabling runtime polymorphism for 40   @ref ReadSource concept, enabling runtime polymorphism for
41   source read operations. It uses cached awaitable storage to achieve 41   source read operations. It uses cached awaitable storage to achieve
42   zero steady-state allocation after construction. 42   zero steady-state allocation after construction.
43   43  
44   The wrapper supports two construction modes: 44   The wrapper supports two construction modes:
45   - **Owning**: Pass by value to transfer ownership. The wrapper 45   - **Owning**: Pass by value to transfer ownership. The wrapper
46   allocates storage and owns the source. 46   allocates storage and owns the source.
47   - **Reference**: Pass a pointer to wrap without ownership. The 47   - **Reference**: Pass a pointer to wrap without ownership. The
48   pointed-to source must outlive this wrapper. 48   pointed-to source must outlive this wrapper.
49   49  
50   @par Awaitable Preallocation 50   @par Awaitable Preallocation
51   The constructor preallocates storage for the type-erased awaitable. 51   The constructor preallocates storage for the type-erased awaitable.
52   This reserves all virtual address space at server startup 52   This reserves all virtual address space at server startup
53   so memory usage can be measured up front, rather than 53   so memory usage can be measured up front, rather than
54   allocating piecemeal as traffic arrives. 54   allocating piecemeal as traffic arrives.
55   55  
56   @par Immediate Completion 56   @par Immediate Completion
57   Operations complete immediately without suspending when the 57   Operations complete immediately without suspending when the
58   buffer sequence is empty, or when the underlying source's 58   buffer sequence is empty, or when the underlying source's
59   awaitable reports readiness via `await_ready`. 59   awaitable reports readiness via `await_ready`.
60   60  
61   @par Thread Safety 61   @par Thread Safety
62   Not thread-safe. Concurrent operations on the same wrapper 62   Not thread-safe. Concurrent operations on the same wrapper
63   are undefined behavior. 63   are undefined behavior.
64   64  
65   @par Example 65   @par Example
66   @code 66   @code
67   // Owning - takes ownership of the source 67   // Owning - takes ownership of the source
68   any_read_source rs(some_source{args...}); 68   any_read_source rs(some_source{args...});
69   69  
70   // Reference - wraps without ownership 70   // Reference - wraps without ownership
71   some_source source; 71   some_source source;
72   any_read_source rs(&source); 72   any_read_source rs(&source);
73   73  
74   mutable_buffer buf(data, size); 74   mutable_buffer buf(data, size);
75   auto [ec, n] = co_await rs.read(std::span(&buf, 1)); 75   auto [ec, n] = co_await rs.read(std::span(&buf, 1));
76   @endcode 76   @endcode
77   77  
78   @see any_read_stream, ReadSource 78   @see any_read_stream, ReadSource
79   */ 79   */
80   class any_read_source 80   class any_read_source
81   { 81   {
82   struct vtable; 82   struct vtable;
83   struct awaitable_ops; 83   struct awaitable_ops;
84   84  
85   template<ReadSource S> 85   template<ReadSource S>
86   struct vtable_for_impl; 86   struct vtable_for_impl;
87   87  
88   void* source_ = nullptr; 88   void* source_ = nullptr;
89   vtable const* vt_ = nullptr; 89   vtable const* vt_ = nullptr;
90   void* cached_awaitable_ = nullptr; 90   void* cached_awaitable_ = nullptr;
91   void* storage_ = nullptr; 91   void* storage_ = nullptr;
92   awaitable_ops const* active_ops_ = nullptr; 92   awaitable_ops const* active_ops_ = nullptr;
93   93  
94   public: 94   public:
95   /** Destructor. 95   /** Destructor.
96   96  
97   Destroys the owned source (if any) and releases the cached 97   Destroys the owned source (if any) and releases the cached
98   awaitable storage. 98   awaitable storage.
99   */ 99   */
100   ~any_read_source(); 100   ~any_read_source();
101   101  
102   /** Construct a default instance. 102   /** Construct a default instance.
103   103  
104   Constructs an empty wrapper. Operations on a default-constructed 104   Constructs an empty wrapper. Operations on a default-constructed
105   wrapper result in undefined behavior. 105   wrapper result in undefined behavior.
106   */ 106   */
107   any_read_source() = default; 107   any_read_source() = default;
108   108  
109   /** Non-copyable. 109   /** Non-copyable.
110   110  
111   The awaitable cache is per-instance and cannot be shared. 111   The awaitable cache is per-instance and cannot be shared.
112   */ 112   */
113   any_read_source(any_read_source const&) = delete; 113   any_read_source(any_read_source const&) = delete;
114   any_read_source& operator=(any_read_source const&) = delete; 114   any_read_source& operator=(any_read_source const&) = delete;
115   115  
116   /** Construct by moving. 116   /** Construct by moving.
117   117  
118   Transfers ownership of the wrapped source (if owned) and 118   Transfers ownership of the wrapped source (if owned) and
119   cached awaitable storage from `other`. After the move, `other` is 119   cached awaitable storage from `other`. After the move, `other` is
120   in a default-constructed state. 120   in a default-constructed state.
121   121  
122   @param other The wrapper to move from. 122   @param other The wrapper to move from.
123   */ 123   */
HITCBC 124   1 any_read_source(any_read_source&& other) noexcept 124   1 any_read_source(any_read_source&& other) noexcept
HITCBC 125   1 : source_(std::exchange(other.source_, nullptr)) 125   1 : source_(std::exchange(other.source_, nullptr))
HITCBC 126   1 , vt_(std::exchange(other.vt_, nullptr)) 126   1 , vt_(std::exchange(other.vt_, nullptr))
HITCBC 127   1 , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr)) 127   1 , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
HITCBC 128   1 , storage_(std::exchange(other.storage_, nullptr)) 128   1 , storage_(std::exchange(other.storage_, nullptr))
HITCBC 129   1 , active_ops_(std::exchange(other.active_ops_, nullptr)) 129   1 , active_ops_(std::exchange(other.active_ops_, nullptr))
130   { 130   {
HITCBC 131   1 } 131   1 }
132   132  
133   /** Assign by moving. 133   /** Assign by moving.
134   134  
135   Destroys any owned source and releases existing resources, 135   Destroys any owned source and releases existing resources,
136   then transfers ownership from `other`. 136   then transfers ownership from `other`.
137   137  
138   @param other The wrapper to move from. 138   @param other The wrapper to move from.
139   @return Reference to this wrapper. 139   @return Reference to this wrapper.
140   */ 140   */
141   any_read_source& 141   any_read_source&
142   operator=(any_read_source&& other) noexcept; 142   operator=(any_read_source&& other) noexcept;
143   143  
144   /** Construct by taking ownership of a ReadSource. 144   /** Construct by taking ownership of a ReadSource.
145   145  
146   Allocates storage and moves the source into this wrapper. 146   Allocates storage and moves the source into this wrapper.
147   The wrapper owns the source and will destroy it. 147   The wrapper owns the source and will destroy it.
148   148  
149   @param s The source to take ownership of. 149   @param s The source to take ownership of.
150   */ 150   */
151   template<ReadSource S> 151   template<ReadSource S>
152   requires (!std::same_as<std::decay_t<S>, any_read_source>) 152   requires (!std::same_as<std::decay_t<S>, any_read_source>)
153   any_read_source(S s); 153   any_read_source(S s);
154   154  
155   /** Construct by wrapping a ReadSource without ownership. 155   /** Construct by wrapping a ReadSource without ownership.
156   156  
157   Wraps the given source by pointer. The source must remain 157   Wraps the given source by pointer. The source must remain
158   valid for the lifetime of this wrapper. 158   valid for the lifetime of this wrapper.
159   159  
160   @param s Pointer to the source to wrap. 160   @param s Pointer to the source to wrap.
161   */ 161   */
162   template<ReadSource S> 162   template<ReadSource S>
163   any_read_source(S* s); 163   any_read_source(S* s);
164   164  
165   /** Check if the wrapper contains a valid source. 165   /** Check if the wrapper contains a valid source.
166   166  
167   @return `true` if wrapping a source, `false` if default-constructed 167   @return `true` if wrapping a source, `false` if default-constructed
168   or moved-from. 168   or moved-from.
169   */ 169   */
170   bool 170   bool
HITCBC 171   27 has_value() const noexcept 171   27 has_value() const noexcept
172   { 172   {
HITCBC 173   27 return source_ != nullptr; 173   27 return source_ != nullptr;
174   } 174   }
175   175  
176   /** Check if the wrapper contains a valid source. 176   /** Check if the wrapper contains a valid source.
177   177  
178   @return `true` if wrapping a source, `false` if default-constructed 178   @return `true` if wrapping a source, `false` if default-constructed
179   or moved-from. 179   or moved-from.
180   */ 180   */
181   explicit 181   explicit
HITCBC 182   8 operator bool() const noexcept 182   8 operator bool() const noexcept
183   { 183   {
HITCBC 184   8 return has_value(); 184   8 return has_value();
185   } 185   }
186   186  
187   /** Initiate a partial read operation. 187   /** Initiate a partial read operation.
188   188  
189   Attempt to read up to `buffer_size( buffers )` bytes into 189   Attempt to read up to `buffer_size( buffers )` bytes into
190   the provided buffer sequence. May fill less than the 190   the provided buffer sequence. May fill less than the
191   full sequence. 191   full sequence.
192   192  
193   @param buffers The buffer sequence to read into. 193   @param buffers The buffer sequence to read into.
194   194  
195   @return An awaitable that await-returns `(error_code,std::size_t)`. 195   @return An awaitable that await-returns `(error_code,std::size_t)`.
196   196  
197   @par Immediate Completion 197   @par Immediate Completion
198   The operation completes immediately without suspending 198   The operation completes immediately without suspending
199   the calling coroutine when: 199   the calling coroutine when:
200   @li The buffer sequence is empty, returning `{error_code{}, 0}`. 200   @li The buffer sequence is empty, returning `{error_code{}, 0}`.
201   @li The underlying source's awaitable reports immediate 201   @li The underlying source's awaitable reports immediate
202   readiness via `await_ready`. 202   readiness via `await_ready`.
203   203  
204   @note This is a partial operation and may not process the 204   @note This is a partial operation and may not process the
205   entire buffer sequence. Use @ref read for guaranteed 205   entire buffer sequence. Use @ref read for guaranteed
206   complete transfer. 206   complete transfer.
207   207  
208   @par Preconditions 208   @par Preconditions
209   The wrapper must contain a valid source (`has_value() == true`). 209   The wrapper must contain a valid source (`has_value() == true`).
210   The caller must not call this function again after a prior 210   The caller must not call this function again after a prior
211   call returned an error (including EOF). 211   call returned an error (including EOF).
212   */ 212   */
213   template<MutableBufferSequence MB> 213   template<MutableBufferSequence MB>
214   auto 214   auto
215   read_some(MB buffers); 215   read_some(MB buffers);
216   216  
217   /** Initiate a complete read operation. 217   /** Initiate a complete read operation.
218   218  
219   Reads data into the provided buffer sequence by forwarding 219   Reads data into the provided buffer sequence by forwarding
220   to the underlying source's `read` operation. Large buffer 220   to the underlying source's `read` operation. Large buffer
221   sequences are processed in windows, with each window 221   sequences are processed in windows, with each window
222   forwarded as a separate `read` call to the underlying source. 222   forwarded as a separate `read` call to the underlying source.
223   The operation completes when the entire buffer sequence is 223   The operation completes when the entire buffer sequence is
224   filled, end-of-file is reached, or an error occurs. 224   filled, end-of-file is reached, or an error occurs.
225   225  
226   @param buffers The buffer sequence to read into. 226   @param buffers The buffer sequence to read into.
227   227  
228   @return An awaitable that await-returns `(error_code,std::size_t)`. 228   @return An awaitable that await-returns `(error_code,std::size_t)`.
229   229  
230   @par Immediate Completion 230   @par Immediate Completion
231   The operation completes immediately without suspending 231   The operation completes immediately without suspending
232   the calling coroutine when: 232   the calling coroutine when:
233   @li The buffer sequence is empty, returning `{error_code{}, 0}`. 233   @li The buffer sequence is empty, returning `{error_code{}, 0}`.
234   @li The underlying source's `read` awaitable reports 234   @li The underlying source's `read` awaitable reports
235   immediate readiness via `await_ready`. 235   immediate readiness via `await_ready`.
236   236  
237   @par Postconditions 237   @par Postconditions
238   Exactly one of the following is true on return: 238   Exactly one of the following is true on return:
239   @li **Success**: `!ec` and `n == buffer_size(buffers)`. 239   @li **Success**: `!ec` and `n == buffer_size(buffers)`.
240   The entire buffer was filled. 240   The entire buffer was filled.
241   @li **End-of-stream or Error**: `ec` and `n` indicates 241   @li **End-of-stream or Error**: `ec` and `n` indicates
242   the number of bytes transferred before the failure. 242   the number of bytes transferred before the failure.
243   243  
244   @par Preconditions 244   @par Preconditions
245   The wrapper must contain a valid source (`has_value() == true`). 245   The wrapper must contain a valid source (`has_value() == true`).
246   The caller must not call this function again after a prior 246   The caller must not call this function again after a prior
247   call returned an error (including EOF). 247   call returned an error (including EOF).
248   */ 248   */
249   template<MutableBufferSequence MB> 249   template<MutableBufferSequence MB>
250   io_task<std::size_t> 250   io_task<std::size_t>
251   read(MB buffers); 251   read(MB buffers);
252   252  
253   protected: 253   protected:
254   /** Rebind to a new source after move. 254   /** Rebind to a new source after move.
255   255  
256   Updates the internal pointer to reference a new source object. 256   Updates the internal pointer to reference a new source object.
257   Used by owning wrappers after move assignment when the owned 257   Used by owning wrappers after move assignment when the owned
258   object has moved to a new location. 258   object has moved to a new location.
259   259  
260   @param new_source The new source to bind to. Must be the same 260   @param new_source The new source to bind to. Must be the same
261   type as the original source. 261   type as the original source.
262   262  
263   @note Terminates if called with a source of different type 263   @note Terminates if called with a source of different type
264   than the original. 264   than the original.
265   */ 265   */
266   template<ReadSource S> 266   template<ReadSource S>
267   void 267   void
268   rebind(S& new_source) noexcept 268   rebind(S& new_source) noexcept
269   { 269   {
270   if(vt_ != &vtable_for_impl<S>::value) 270   if(vt_ != &vtable_for_impl<S>::value)
271   std::terminate(); 271   std::terminate();
272   source_ = &new_source; 272   source_ = &new_source;
273   } 273   }
274   274  
275   private: 275   private:
276   auto 276   auto
277   read_(std::span<mutable_buffer const> buffers); 277   read_(std::span<mutable_buffer const> buffers);
278   }; 278   };
279   279  
280   // ordered by call sequence for cache line coherence 280   // ordered by call sequence for cache line coherence
281   struct any_read_source::awaitable_ops 281   struct any_read_source::awaitable_ops
282   { 282   {
283   bool (*await_ready)(void*); 283   bool (*await_ready)(void*);
284   std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*); 284   std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
285   io_result<std::size_t> (*await_resume)(void*); 285   io_result<std::size_t> (*await_resume)(void*);
286   void (*destroy)(void*) noexcept; 286   void (*destroy)(void*) noexcept;
287   }; 287   };
288   288  
289   // ordered by call frequency for cache line coherence 289   // ordered by call frequency for cache line coherence
290   struct any_read_source::vtable 290   struct any_read_source::vtable
291   { 291   {
292   awaitable_ops const* (*construct_read_some_awaitable)( 292   awaitable_ops const* (*construct_read_some_awaitable)(
293   void* source, 293   void* source,
294   void* storage, 294   void* storage,
295   std::span<mutable_buffer const> buffers); 295   std::span<mutable_buffer const> buffers);
296   awaitable_ops const* (*construct_read_awaitable)( 296   awaitable_ops const* (*construct_read_awaitable)(
297   void* source, 297   void* source,
298   void* storage, 298   void* storage,
299   std::span<mutable_buffer const> buffers); 299   std::span<mutable_buffer const> buffers);
300   std::size_t awaitable_size; 300   std::size_t awaitable_size;
301   std::size_t awaitable_align; 301   std::size_t awaitable_align;
302   void (*destroy)(void*) noexcept; 302   void (*destroy)(void*) noexcept;
303   }; 303   };
304   304  
305   template<ReadSource S> 305   template<ReadSource S>
306   struct any_read_source::vtable_for_impl 306   struct any_read_source::vtable_for_impl
307   { 307   {
308   using ReadSomeAwaitable = decltype(std::declval<S&>().read_some( 308   using ReadSomeAwaitable = decltype(std::declval<S&>().read_some(
309   std::span<mutable_buffer const>{})); 309   std::span<mutable_buffer const>{}));
310   using ReadAwaitable = decltype(std::declval<S&>().read( 310   using ReadAwaitable = decltype(std::declval<S&>().read(
311   std::span<mutable_buffer const>{})); 311   std::span<mutable_buffer const>{}));
312   312  
313   static void 313   static void
HITCBC 314   6 do_destroy_impl(void* source) noexcept 314   6 do_destroy_impl(void* source) noexcept
315   { 315   {
HITCBC 316   6 static_cast<S*>(source)->~S(); 316   6 static_cast<S*>(source)->~S();
HITCBC 317   6 } 317   6 }
318   318  
319   static awaitable_ops const* 319   static awaitable_ops const*
HITCBC 320   52 construct_read_some_awaitable_impl( 320   52 construct_read_some_awaitable_impl(
321   void* source, 321   void* source,
322   void* storage, 322   void* storage,
323   std::span<mutable_buffer const> buffers) 323   std::span<mutable_buffer const> buffers)
324   { 324   {
HITCBC 325   52 auto& s = *static_cast<S*>(source); 325   52 auto& s = *static_cast<S*>(source);
HITCBC 326   52 ::new(storage) ReadSomeAwaitable(s.read_some(buffers)); 326   52 ::new(storage) ReadSomeAwaitable(s.read_some(buffers));
327   327  
328   static constexpr awaitable_ops ops = { 328   static constexpr awaitable_ops ops = {
HITCBC 329   52 +[](void* p) { 329   52 +[](void* p) {
HITCBC 330   52 return static_cast<ReadSomeAwaitable*>(p)->await_ready(); 330   52 return static_cast<ReadSomeAwaitable*>(p)->await_ready();
331   }, 331   },
HITCBC 332   2 +[](void* p, std::coroutine_handle<> h, io_env const* env) { 332   2 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
HITCBC 333   2 return detail::call_await_suspend( 333   2 return detail::call_await_suspend(
HITCBC 334   2 static_cast<ReadSomeAwaitable*>(p), h, env); 334   2 static_cast<ReadSomeAwaitable*>(p), h, env);
335   }, 335   },
HITCBC 336   50 +[](void* p) { 336   50 +[](void* p) {
HITCBC 337   50 return static_cast<ReadSomeAwaitable*>(p)->await_resume(); 337   50 return static_cast<ReadSomeAwaitable*>(p)->await_resume();
338   }, 338   },
HITCBC 339   54 +[](void* p) noexcept { 339   54 +[](void* p) noexcept {
HITCBC 340   2 static_cast<ReadSomeAwaitable*>(p)->~ReadSomeAwaitable(); 340   2 static_cast<ReadSomeAwaitable*>(p)->~ReadSomeAwaitable();
341   } 341   }
342   }; 342   };
HITCBC 343   52 return &ops; 343   52 return &ops;
344   } 344   }
345   345  
346   static awaitable_ops const* 346   static awaitable_ops const*
HITCBC 347   116 construct_read_awaitable_impl( 347   116 construct_read_awaitable_impl(
348   void* source, 348   void* source,
349   void* storage, 349   void* storage,
350   std::span<mutable_buffer const> buffers) 350   std::span<mutable_buffer const> buffers)
351   { 351   {
HITCBC 352   116 auto& s = *static_cast<S*>(source); 352   116 auto& s = *static_cast<S*>(source);
HITCBC 353   116 ::new(storage) ReadAwaitable(s.read(buffers)); 353   116 ::new(storage) ReadAwaitable(s.read(buffers));
354   354  
355   static constexpr awaitable_ops ops = { 355   static constexpr awaitable_ops ops = {
HITCBC 356   116 +[](void* p) { 356   116 +[](void* p) {
HITCBC 357   116 return static_cast<ReadAwaitable*>(p)->await_ready(); 357   116 return static_cast<ReadAwaitable*>(p)->await_ready();
358   }, 358   },
MISUBC 359   +[](void* p, std::coroutine_handle<> h, io_env const* env) { 359   +[](void* p, std::coroutine_handle<> h, io_env const* env) {
MISUBC 360   return detail::call_await_suspend( 360   return detail::call_await_suspend(
MISUBC 361   static_cast<ReadAwaitable*>(p), h, env); 361   static_cast<ReadAwaitable*>(p), h, env);
362   }, 362   },
HITCBC 363   116 +[](void* p) { 363   116 +[](void* p) {
HITCBC 364   116 return static_cast<ReadAwaitable*>(p)->await_resume(); 364   116 return static_cast<ReadAwaitable*>(p)->await_resume();
365   }, 365   },
HITCBC 366   116 +[](void* p) noexcept { 366   116 +[](void* p) noexcept {
MISUBC 367   static_cast<ReadAwaitable*>(p)->~ReadAwaitable(); 367   static_cast<ReadAwaitable*>(p)->~ReadAwaitable();
368   } 368   }
369   }; 369   };
HITCBC 370   116 return &ops; 370   116 return &ops;
371   } 371   }
372   372  
373   static constexpr std::size_t max_awaitable_size = 373   static constexpr std::size_t max_awaitable_size =
374   sizeof(ReadSomeAwaitable) > sizeof(ReadAwaitable) 374   sizeof(ReadSomeAwaitable) > sizeof(ReadAwaitable)
375   ? sizeof(ReadSomeAwaitable) 375   ? sizeof(ReadSomeAwaitable)
376   : sizeof(ReadAwaitable); 376   : sizeof(ReadAwaitable);
377   static constexpr std::size_t max_awaitable_align = 377   static constexpr std::size_t max_awaitable_align =
378   alignof(ReadSomeAwaitable) > alignof(ReadAwaitable) 378   alignof(ReadSomeAwaitable) > alignof(ReadAwaitable)
379   ? alignof(ReadSomeAwaitable) 379   ? alignof(ReadSomeAwaitable)
380   : alignof(ReadAwaitable); 380   : alignof(ReadAwaitable);
381   381  
382   static constexpr vtable value = { 382   static constexpr vtable value = {
383   &construct_read_some_awaitable_impl, 383   &construct_read_some_awaitable_impl,
384   &construct_read_awaitable_impl, 384   &construct_read_awaitable_impl,
385   max_awaitable_size, 385   max_awaitable_size,
386   max_awaitable_align, 386   max_awaitable_align,
387   &do_destroy_impl 387   &do_destroy_impl
388   }; 388   };
389   }; 389   };
390   390  
391   inline 391   inline
HITCBC 392   145 any_read_source::~any_read_source() 392   145 any_read_source::~any_read_source()
393   { 393   {
HITCBC 394   145 if(storage_) 394   145 if(storage_)
395   { 395   {
HITCBC 396   6 vt_->destroy(source_); 396   6 vt_->destroy(source_);
HITCBC 397   6 ::operator delete(storage_); 397   6 ::operator delete(storage_);
398   } 398   }
HITCBC 399   145 if(cached_awaitable_) 399   145 if(cached_awaitable_)
400   { 400   {
HITCBC 401   139 if(active_ops_) 401   139 if(active_ops_)
HITCBC 402   1 active_ops_->destroy(cached_awaitable_); 402   1 active_ops_->destroy(cached_awaitable_);
HITCBC 403   139 ::operator delete(cached_awaitable_); 403   139 ::operator delete(cached_awaitable_);
404   } 404   }
HITCBC 405   145 } 405   145 }
406   406  
407   inline any_read_source& 407   inline any_read_source&
HITCBC 408   4 any_read_source::operator=(any_read_source&& other) noexcept 408   4 any_read_source::operator=(any_read_source&& other) noexcept
409   { 409   {
HITCBC 410   4 if(this != &other) 410   4 if(this != &other)
411   { 411   {
HITCBC 412   3 if(storage_) 412   3 if(storage_)
413   { 413   {
MISUBC 414   vt_->destroy(source_); 414   vt_->destroy(source_);
MISUBC 415   ::operator delete(storage_); 415   ::operator delete(storage_);
416   } 416   }
HITCBC 417   3 if(cached_awaitable_) 417   3 if(cached_awaitable_)
418   { 418   {
HITCBC 419   2 if(active_ops_) 419   2 if(active_ops_)
HITCBC 420   1 active_ops_->destroy(cached_awaitable_); 420   1 active_ops_->destroy(cached_awaitable_);
HITCBC 421   2 ::operator delete(cached_awaitable_); 421   2 ::operator delete(cached_awaitable_);
422   } 422   }
HITCBC 423   3 source_ = std::exchange(other.source_, nullptr); 423   3 source_ = std::exchange(other.source_, nullptr);
HITCBC 424   3 vt_ = std::exchange(other.vt_, nullptr); 424   3 vt_ = std::exchange(other.vt_, nullptr);
HITCBC 425   3 cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr); 425   3 cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
HITCBC 426   3 storage_ = std::exchange(other.storage_, nullptr); 426   3 storage_ = std::exchange(other.storage_, nullptr);
HITCBC 427   3 active_ops_ = std::exchange(other.active_ops_, nullptr); 427   3 active_ops_ = std::exchange(other.active_ops_, nullptr);
428   } 428   }
HITCBC 429   4 return *this; 429   4 return *this;
430   } 430   }
431   431  
432   template<ReadSource S> 432   template<ReadSource S>
433   requires (!std::same_as<std::decay_t<S>, any_read_source>) 433   requires (!std::same_as<std::decay_t<S>, any_read_source>)
HITCBC 434   6 any_read_source::any_read_source(S s) 434   6 any_read_source::any_read_source(S s)
HITCBC 435   6 : vt_(&vtable_for_impl<S>::value) 435   6 : vt_(&vtable_for_impl<S>::value)
436   { 436   {
437   struct guard { 437   struct guard {
438   any_read_source* self; 438   any_read_source* self;
439   bool committed = false; 439   bool committed = false;
HITCBC 440   6 ~guard() { 440   6 ~guard() {
HITCBC 441   6 if(!committed && self->storage_) { 441   6 if(!committed && self->storage_) {
MISUBC 442   self->vt_->destroy(self->source_); 442   self->vt_->destroy(self->source_);
MISUBC 443   ::operator delete(self->storage_); 443   ::operator delete(self->storage_);
MISUBC 444   self->storage_ = nullptr; 444   self->storage_ = nullptr;
MISUBC 445   self->source_ = nullptr; 445   self->source_ = nullptr;
446   } 446   }
HITCBC 447   6 } 447   6 }
HITCBC 448   6 } g{this}; 448   6 } g{this};
449   449  
HITCBC 450   6 storage_ = ::operator new(sizeof(S)); 450   6 storage_ = ::operator new(sizeof(S));
HITCBC 451   6 source_ = ::new(storage_) S(std::move(s)); 451   6 source_ = ::new(storage_) S(std::move(s));
452   452  
453   // Preallocate the awaitable storage 453   // Preallocate the awaitable storage
HITCBC 454   6 cached_awaitable_ = ::operator new(vt_->awaitable_size); 454   6 cached_awaitable_ = ::operator new(vt_->awaitable_size);
455   455  
HITCBC 456   6 g.committed = true; 456   6 g.committed = true;
HITCBC 457   6 } 457   6 }
458   458  
459   template<ReadSource S> 459   template<ReadSource S>
HITCBC 460   135 any_read_source::any_read_source(S* s) 460   135 any_read_source::any_read_source(S* s)
HITCBC 461   135 : source_(s) 461   135 : source_(s)
HITCBC 462   135 , vt_(&vtable_for_impl<S>::value) 462   135 , vt_(&vtable_for_impl<S>::value)
463   { 463   {
464   // Preallocate the awaitable storage 464   // Preallocate the awaitable storage
HITCBC 465   135 cached_awaitable_ = ::operator new(vt_->awaitable_size); 465   135 cached_awaitable_ = ::operator new(vt_->awaitable_size);
HITCBC 466   135 } 466   135 }
467   467  
468   template<MutableBufferSequence MB> 468   template<MutableBufferSequence MB>
469   auto 469   auto
HITCBC 470   54 any_read_source::read_some(MB buffers) 470   54 any_read_source::read_some(MB buffers)
471   { 471   {
472   struct awaitable 472   struct awaitable
473   { 473   {
474   any_read_source* self_; 474   any_read_source* self_;
475   mutable_buffer_array<detail::max_iovec_> ba_; 475   mutable_buffer_array<detail::max_iovec_> ba_;
476   476  
HITCBC 477   54 awaitable(any_read_source* self, MB const& buffers) 477   54 awaitable(any_read_source* self, MB const& buffers)
HITCBC 478   54 : self_(self) 478   54 : self_(self)
HITCBC 479   54 , ba_(buffers) 479   54 , ba_(buffers)
480   { 480   {
HITCBC 481   54 } 481   54 }
482   482  
483   bool 483   bool
HITCBC 484   54 await_ready() const noexcept 484   54 await_ready() const noexcept
485   { 485   {
HITCBC 486   54 return ba_.to_span().empty(); 486   54 return ba_.to_span().empty();
487   } 487   }
488   488  
489   std::coroutine_handle<> 489   std::coroutine_handle<>
HITCBC 490   52 await_suspend(std::coroutine_handle<> h, io_env const* env) 490   52 await_suspend(std::coroutine_handle<> h, io_env const* env)
491   { 491   {
HITCBC 492   52 self_->active_ops_ = self_->vt_->construct_read_some_awaitable( 492   52 self_->active_ops_ = self_->vt_->construct_read_some_awaitable(
HITCBC 493   52 self_->source_, 493   52 self_->source_,
HITCBC 494   52 self_->cached_awaitable_, 494   52 self_->cached_awaitable_,
HITCBC 495   52 ba_.to_span()); 495   52 ba_.to_span());
496   496  
HITCBC 497   52 if(self_->active_ops_->await_ready(self_->cached_awaitable_)) 497   52 if(self_->active_ops_->await_ready(self_->cached_awaitable_))
HITCBC 498   50 return h; 498   50 return h;
499   499  
HITCBC 500   2 return self_->active_ops_->await_suspend( 500   2 return self_->active_ops_->await_suspend(
HITCBC 501   2 self_->cached_awaitable_, h, env); 501   2 self_->cached_awaitable_, h, env);
502   } 502   }
503   503  
504   io_result<std::size_t> 504   io_result<std::size_t>
HITCBC 505   52 await_resume() 505   52 await_resume()
506   { 506   {
HITCBC 507   52 if(ba_.to_span().empty()) 507   52 if(ba_.to_span().empty())
HITCBC 508   2 return {{}, 0}; 508   2 return {{}, 0};
509   509  
510   struct guard { 510   struct guard {
511   any_read_source* self; 511   any_read_source* self;
HITCBC 512   50 ~guard() { 512   50 ~guard() {
HITCBC 513   50 self->active_ops_->destroy(self->cached_awaitable_); 513   50 self->active_ops_->destroy(self->cached_awaitable_);
HITCBC 514   50 self->active_ops_ = nullptr; 514   50 self->active_ops_ = nullptr;
HITCBC 515   50 } 515   50 }
HITCBC 516   50 } g{self_}; 516   50 } g{self_};
HITCBC 517   50 return self_->active_ops_->await_resume( 517   50 return self_->active_ops_->await_resume(
HITCBC 518   50 self_->cached_awaitable_); 518   50 self_->cached_awaitable_);
HITCBC 519   50 } 519   50 }
520   }; 520   };
HITCBC 521   54 return awaitable(this, buffers); 521   54 return awaitable(this, buffers);
522   } 522   }
523   523  
524   inline auto 524   inline auto
HITCBC 525   116 any_read_source::read_(std::span<mutable_buffer const> buffers) 525   116 any_read_source::read_(std::span<mutable_buffer const> buffers)
526   { 526   {
527   struct awaitable 527   struct awaitable
528   { 528   {
529   any_read_source* self_; 529   any_read_source* self_;
530   std::span<mutable_buffer const> buffers_; 530   std::span<mutable_buffer const> buffers_;
531   531  
532   bool 532   bool
HITCBC 533   116 await_ready() const noexcept 533   116 await_ready() const noexcept
534   { 534   {
HITCBC 535   116 return false; 535   116 return false;
536   } 536   }
537   537  
538   std::coroutine_handle<> 538   std::coroutine_handle<>
HITCBC 539   116 await_suspend(std::coroutine_handle<> h, io_env const* env) 539   116 await_suspend(std::coroutine_handle<> h, io_env const* env)
540   { 540   {
HITCBC 541   232 self_->active_ops_ = self_->vt_->construct_read_awaitable( 541   232 self_->active_ops_ = self_->vt_->construct_read_awaitable(
HITCBC 542   116 self_->source_, 542   116 self_->source_,
HITCBC 543   116 self_->cached_awaitable_, 543   116 self_->cached_awaitable_,
544   buffers_); 544   buffers_);
545   545  
HITCBC 546   116 if(self_->active_ops_->await_ready(self_->cached_awaitable_)) 546   116 if(self_->active_ops_->await_ready(self_->cached_awaitable_))
HITCBC 547   116 return h; 547   116 return h;
548   548  
MISUBC 549   return self_->active_ops_->await_suspend( 549   return self_->active_ops_->await_suspend(
MISUBC 550   self_->cached_awaitable_, h, env); 550   self_->cached_awaitable_, h, env);
551   } 551   }
552   552  
553   io_result<std::size_t> 553   io_result<std::size_t>
HITCBC 554   116 await_resume() 554   116 await_resume()
555   { 555   {
556   struct guard { 556   struct guard {
557   any_read_source* self; 557   any_read_source* self;
HITCBC 558   116 ~guard() { 558   116 ~guard() {
HITCBC 559   116 self->active_ops_->destroy(self->cached_awaitable_); 559   116 self->active_ops_->destroy(self->cached_awaitable_);
HITCBC 560   116 self->active_ops_ = nullptr; 560   116 self->active_ops_ = nullptr;
HITCBC 561   116 } 561   116 }
HITCBC 562   116 } g{self_}; 562   116 } g{self_};
HITCBC 563   116 return self_->active_ops_->await_resume( 563   116 return self_->active_ops_->await_resume(
HITCBC 564   200 self_->cached_awaitable_); 564   200 self_->cached_awaitable_);
HITCBC 565   116 } 565   116 }
566   }; 566   };
HITCBC 567   116 return awaitable{this, buffers}; 567   116 return awaitable{this, buffers};
568   } 568   }
569   569  
570   template<MutableBufferSequence MB> 570   template<MutableBufferSequence MB>
571   io_task<std::size_t> 571   io_task<std::size_t>
HITCBC 572   110 any_read_source::read(MB buffers) 572   110 any_read_source::read(MB buffers)
573   { 573   {
574   buffer_param bp(buffers); 574   buffer_param bp(buffers);
575   std::size_t total = 0; 575   std::size_t total = 0;
576   576  
577   for(;;) 577   for(;;)
578   { 578   {
579   auto bufs = bp.data(); 579   auto bufs = bp.data();
580   if(bufs.empty()) 580   if(bufs.empty())
581   break; 581   break;
582   582  
583   auto [ec, n] = co_await read_(bufs); 583   auto [ec, n] = co_await read_(bufs);
584   total += n; 584   total += n;
585   if(ec) 585   if(ec)
586   co_return {ec, total}; 586   co_return {ec, total};
587   bp.consume(n); 587   bp.consume(n);
588   } 588   }
589   589  
590   co_return {{}, total}; 590   co_return {{}, total};
HITCBC 591   220 } 591   220 }
592   592  
593   } // namespace capy 593   } // namespace capy
594   } // namespace boost 594   } // namespace boost
595   595  
596   #endif 596   #endif