76.56% Lines (49/64) 92.31% Functions (12/13)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) 2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3   // 3   //
4   // Distributed under the Boost Software License, Version 1.0. (See accompanying 4   // Distributed under the Boost Software License, Version 1.0. (See accompanying
5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6   // 6   //
7   // Official repository: https://github.com/cppalliance/capy 7   // Official repository: https://github.com/cppalliance/capy
8   // 8   //
9   9  
10   #ifndef BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP 10   #ifndef BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
11   #define BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP 11   #define BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
12   12  
13   #include <boost/capy/detail/config.hpp> 13   #include <boost/capy/detail/config.hpp>
14   #include <boost/capy/ex/frame_allocator.hpp> 14   #include <boost/capy/ex/frame_allocator.hpp>
15   15  
16   #include <coroutine> 16   #include <coroutine>
17   #include <cstddef> 17   #include <cstddef>
18   #include <exception> 18   #include <exception>
19   19  
20   namespace boost { 20   namespace boost {
21   namespace capy { 21   namespace capy {
22   namespace detail { 22   namespace detail {
23   23  
24   class strand_queue; 24   class strand_queue;
25   25  
26   //---------------------------------------------------------- 26   //----------------------------------------------------------
27   27  
28   // Metadata stored before the coroutine frame 28   // Metadata stored before the coroutine frame
29   struct frame_prefix 29   struct frame_prefix
30   { 30   {
31   frame_prefix* next; 31   frame_prefix* next;
32   strand_queue* queue; 32   strand_queue* queue;
33   std::size_t alloc_size; 33   std::size_t alloc_size;
34   }; 34   };
35   35  
36   //---------------------------------------------------------- 36   //----------------------------------------------------------
37   37  
38   /** Wrapper coroutine for strand queue dispatch operations. 38   /** Wrapper coroutine for strand queue dispatch operations.
39   39  
40   This coroutine wraps a target coroutine handle and resumes 40   This coroutine wraps a target coroutine handle and resumes
41   it when dispatched. The wrapper ensures control returns to 41   it when dispatched. The wrapper ensures control returns to
42   the dispatch loop after the target suspends or completes. 42   the dispatch loop after the target suspends or completes.
43   43  
44   The promise contains an intrusive list node for queue 44   The promise contains an intrusive list node for queue
45   storage and supports a custom allocator that recycles 45   storage and supports a custom allocator that recycles
46   coroutine frames via a free list. 46   coroutine frames via a free list.
47   */ 47   */
48   struct strand_op 48   struct strand_op
49   { 49   {
50   struct promise_type 50   struct promise_type
51   { 51   {
52   promise_type* next = nullptr; 52   promise_type* next = nullptr;
53   53  
54   void* 54   void*
55   operator new( 55   operator new(
56   std::size_t size, 56   std::size_t size,
57   strand_queue& q, 57   strand_queue& q,
58   std::coroutine_handle<void>); 58   std::coroutine_handle<void>);
59   59  
60   void 60   void
61   operator delete(void* p, std::size_t); 61   operator delete(void* p, std::size_t);
62   62  
63   strand_op 63   strand_op
HITCBC 64   332 get_return_object() noexcept 64   335 get_return_object() noexcept
65   { 65   {
HITCBC 66   332 return {std::coroutine_handle<promise_type>::from_promise(*this)}; 66   335 return {std::coroutine_handle<promise_type>::from_promise(*this)};
67   } 67   }
68   68  
69   std::suspend_always 69   std::suspend_always
HITCBC 70   332 initial_suspend() noexcept 70   335 initial_suspend() noexcept
71   { 71   {
HITCBC 72   332 return {}; 72   335 return {};
73   } 73   }
74   74  
75   std::suspend_always 75   std::suspend_always
HITCBC 76   332 final_suspend() noexcept 76   335 final_suspend() noexcept
77   { 77   {
HITCBC 78   332 return {}; 78   335 return {};
79   } 79   }
80   80  
81   void 81   void
HITCBC 82   332 return_void() noexcept 82   335 return_void() noexcept
83   { 83   {
HITCBC 84   332 } 84   335 }
85   85  
86   void 86   void
87   unhandled_exception() 87   unhandled_exception()
88   { 88   {
89   std::terminate(); 89   std::terminate();
90   } 90   }
91   }; 91   };
92   92  
93   std::coroutine_handle<promise_type> h_; 93   std::coroutine_handle<promise_type> h_;
94   }; 94   };
95   95  
96   //---------------------------------------------------------- 96   //----------------------------------------------------------
97   97  
98   /** Single-threaded dispatch queue for coroutine handles. 98   /** Single-threaded dispatch queue for coroutine handles.
99   99  
100   This queue stores coroutine handles and resumes them 100   This queue stores coroutine handles and resumes them
101   sequentially when dispatch() is called. Each pushed 101   sequentially when dispatch() is called. Each pushed
102   handle is wrapped in a strand_op coroutine that ensures 102   handle is wrapped in a strand_op coroutine that ensures
103   control returns to the dispatch loop after the target 103   control returns to the dispatch loop after the target
104   suspends or completes. 104   suspends or completes.
105   105  
106   The queue uses an intrusive singly-linked list through 106   The queue uses an intrusive singly-linked list through
107   the promise type to avoid separate node allocations. 107   the promise type to avoid separate node allocations.
108   A free list recycles wrapper coroutine frames to reduce 108   A free list recycles wrapper coroutine frames to reduce
109   allocation overhead during repeated push/dispatch cycles. 109   allocation overhead during repeated push/dispatch cycles.
110   110  
111   @par Thread Safety 111   @par Thread Safety
112   This class is not thread-safe. All operations must be 112   This class is not thread-safe. All operations must be
113   called from a single thread. 113   called from a single thread.
114   */ 114   */
115   class strand_queue 115   class strand_queue
116   { 116   {
117   using promise_type = strand_op::promise_type; 117   using promise_type = strand_op::promise_type;
118   118  
119   promise_type* head_ = nullptr; 119   promise_type* head_ = nullptr;
120   promise_type* tail_ = nullptr; 120   promise_type* tail_ = nullptr;
121   frame_prefix* free_list_ = nullptr; 121   frame_prefix* free_list_ = nullptr;
122   122  
123   friend struct strand_op::promise_type; 123   friend struct strand_op::promise_type;
124   124  
125   static 125   static
126   strand_op 126   strand_op
HITCBC 127   332 make_strand_op( 127   335 make_strand_op(
128   strand_queue& q, 128   strand_queue& q,
129   std::coroutine_handle<void> target) 129   std::coroutine_handle<void> target)
130   { 130   {
131   (void)q; 131   (void)q;
132   safe_resume(target); 132   safe_resume(target);
133   co_return; 133   co_return;
HITCBC 134   664 } 134   670 }
135   135  
136   public: 136   public:
HITCBC 137   4853 strand_queue() = default; 137   5275 strand_queue() = default;
138   138  
139   strand_queue(strand_queue const&) = delete; 139   strand_queue(strand_queue const&) = delete;
140   strand_queue& operator=(strand_queue const&) = delete; 140   strand_queue& operator=(strand_queue const&) = delete;
141   141  
142   /** Destructor. 142   /** Destructor.
143   143  
144   Destroys any pending wrappers without resuming them, 144   Destroys any pending wrappers without resuming them,
145   then frees all memory in the free list. 145   then frees all memory in the free list.
146   */ 146   */
HITCBC 147   4853 ~strand_queue() 147   5275 ~strand_queue()
148   { 148   {
149   // Destroy pending wrappers 149   // Destroy pending wrappers
HITCBC 150   4853 while(head_) 150   5275 while(head_)
151   { 151   {
MISUBC 152   promise_type* p = head_; 152   promise_type* p = head_;
MISUBC 153   head_ = p->next; 153   head_ = p->next;
154   154  
MISUBC 155   auto h = std::coroutine_handle<promise_type>::from_promise(*p); 155   auto h = std::coroutine_handle<promise_type>::from_promise(*p);
MISUBC 156   h.destroy(); 156   h.destroy();
157   } 157   }
158   158  
159   // Free the free list memory 159   // Free the free list memory
HITCBC 160   4853 while(free_list_) 160   5275 while(free_list_)
161   { 161   {
MISUBC 162   frame_prefix* prefix = free_list_; 162   frame_prefix* prefix = free_list_;
MISUBC 163   free_list_ = prefix->next; 163   free_list_ = prefix->next;
MISUBC 164   ::operator delete(prefix); 164   ::operator delete(prefix);
165   } 165   }
HITCBC 166   4853 } 166   5275 }
167   167  
168   /** Returns true if there are no pending operations. 168   /** Returns true if there are no pending operations.
169   */ 169   */
170   bool 170   bool
HITCBC 171   15 empty() const noexcept 171   22 empty() const noexcept
172   { 172   {
HITCBC 173   15 return head_ == nullptr; 173   22 return head_ == nullptr;
174   } 174   }
175   175  
176   /** Push a coroutine handle to the queue. 176   /** Push a coroutine handle to the queue.
177   177  
178   Creates a wrapper coroutine and appends it to the 178   Creates a wrapper coroutine and appends it to the
179   queue. The wrapper will resume the target handle 179   queue. The wrapper will resume the target handle
180   when dispatch() processes it. 180   when dispatch() processes it.
181   181  
182   @param h The coroutine handle to dispatch. 182   @param h The coroutine handle to dispatch.
183   */ 183   */
184   void 184   void
HITCBC 185   332 push(std::coroutine_handle<void> h) 185   335 push(std::coroutine_handle<void> h)
186   { 186   {
HITCBC 187   332 strand_op op = make_strand_op(*this, h); 187   335 strand_op op = make_strand_op(*this, h);
188   188  
HITCBC 189   332 promise_type* p = &op.h_.promise(); 189   335 promise_type* p = &op.h_.promise();
HITCBC 190   332 p->next = nullptr; 190   335 p->next = nullptr;
191   191  
HITCBC 192   332 if(tail_) 192   335 if(tail_)
HITCBC 193   317 tail_->next = p; 193   313 tail_->next = p;
194   else 194   else
HITCBC 195   15 head_ = p; 195   22 head_ = p;
HITCBC 196   332 tail_ = p; 196   335 tail_ = p;
HITCBC 197   332 } 197   335 }
198   198  
199   /** Resume all queued coroutines in sequence. 199   /** Resume all queued coroutines in sequence.
200   200  
201   Processes each wrapper in FIFO order, resuming its 201   Processes each wrapper in FIFO order, resuming its
202   target coroutine. After each target suspends or 202   target coroutine. After each target suspends or
203   completes, the wrapper is destroyed and its frame 203   completes, the wrapper is destroyed and its frame
204   is added to the free list for reuse. 204   is added to the free list for reuse.
205   205  
206   Coroutines resumed during dispatch may push new 206   Coroutines resumed during dispatch may push new
207   handles, which will also be processed in the same 207   handles, which will also be processed in the same
208   dispatch call. 208   dispatch call.
209   209  
210   @warning Not thread-safe. Do not call while another 210   @warning Not thread-safe. Do not call while another
211   thread may be calling push(). 211   thread may be calling push().
212   */ 212   */
213   void 213   void
214   dispatch() 214   dispatch()
215   { 215   {
216   while(head_) 216   while(head_)
217   { 217   {
218   promise_type* p = head_; 218   promise_type* p = head_;
219   head_ = p->next; 219   head_ = p->next;
220   if(!head_) 220   if(!head_)
221   tail_ = nullptr; 221   tail_ = nullptr;
222   222  
223   auto h = std::coroutine_handle<promise_type>::from_promise(*p); 223   auto h = std::coroutine_handle<promise_type>::from_promise(*p);
224   safe_resume(h); 224   safe_resume(h);
225   h.destroy(); 225   h.destroy();
226   } 226   }
227   } 227   }
228   228  
229   /** Batch of taken items for thread-safe dispatch. */ 229   /** Batch of taken items for thread-safe dispatch. */
230   struct taken_batch 230   struct taken_batch
231   { 231   {
232   promise_type* head = nullptr; 232   promise_type* head = nullptr;
233   promise_type* tail = nullptr; 233   promise_type* tail = nullptr;
234   }; 234   };
235   235  
236   /** Take all pending items atomically. 236   /** Take all pending items atomically.
237   237  
238   Removes all items from the queue and returns them 238   Removes all items from the queue and returns them
239   as a batch. The queue is left empty. 239   as a batch. The queue is left empty.
240   240  
241   @return The batch of taken items. 241   @return The batch of taken items.
242   */ 242   */
243   taken_batch 243   taken_batch
HITCBC 244   15 take_all() noexcept 244   22 take_all() noexcept
245   { 245   {
HITCBC 246   15 taken_batch batch{head_, tail_}; 246   22 taken_batch batch{head_, tail_};
HITCBC 247   15 head_ = tail_ = nullptr; 247   22 head_ = tail_ = nullptr;
HITCBC 248   15 return batch; 248   22 return batch;
249   } 249   }
250   250  
251   /** Dispatch a batch of taken items. 251   /** Dispatch a batch of taken items.
252   252  
253   @param batch The batch to dispatch. 253   @param batch The batch to dispatch.
254   254  
255   @note This is thread-safe w.r.t. push() because it doesn't 255   @note This is thread-safe w.r.t. push() because it doesn't
256   access the queue's free_list_. Frames are deleted directly 256   access the queue's free_list_. Frames are deleted directly
257   rather than recycled. 257   rather than recycled.
258   */ 258   */
259   static 259   static
260   void 260   void
HITCBC 261   15 dispatch_batch(taken_batch& batch) 261   22 dispatch_batch(taken_batch& batch)
262   { 262   {
HITCBC 263   347 while(batch.head) 263   357 while(batch.head)
264   { 264   {
HITCBC 265   332 promise_type* p = batch.head; 265   335 promise_type* p = batch.head;
HITCBC 266   332 batch.head = p->next; 266   335 batch.head = p->next;
267   267  
HITCBC 268   332 auto h = std::coroutine_handle<promise_type>::from_promise(*p); 268   335 auto h = std::coroutine_handle<promise_type>::from_promise(*p);
HITCBC 269   332 safe_resume(h); 269   335 safe_resume(h);
270   // Don't use h.destroy() - it would call operator delete which 270   // Don't use h.destroy() - it would call operator delete which
271   // accesses the queue's free_list_ (race with push). 271   // accesses the queue's free_list_ (race with push).
272   // Instead, manually free the frame without recycling. 272   // Instead, manually free the frame without recycling.
273   // h.address() returns the frame base (what operator new returned). 273   // h.address() returns the frame base (what operator new returned).
HITCBC 274   332 frame_prefix* prefix = static_cast<frame_prefix*>(h.address()) - 1; 274   335 frame_prefix* prefix = static_cast<frame_prefix*>(h.address()) - 1;
HITCBC 275   332 ::operator delete(prefix); 275   335 ::operator delete(prefix);
276   } 276   }
HITCBC 277   15 batch.tail = nullptr; 277   22 batch.tail = nullptr;
HITCBC 278   15 } 278   22 }
279   }; 279   };
280   280  
281   //---------------------------------------------------------- 281   //----------------------------------------------------------
282   282  
283   inline 283   inline
284   void* 284   void*
HITCBC 285   332 strand_op::promise_type::operator new( 285   335 strand_op::promise_type::operator new(
286   std::size_t size, 286   std::size_t size,
287   strand_queue& q, 287   strand_queue& q,
288   std::coroutine_handle<void>) 288   std::coroutine_handle<void>)
289   { 289   {
290   // Total size includes prefix 290   // Total size includes prefix
HITCBC 291   332 std::size_t alloc_size = size + sizeof(frame_prefix); 291   335 std::size_t alloc_size = size + sizeof(frame_prefix);
292   void* raw; 292   void* raw;
293   293  
294   // Try to reuse from free list 294   // Try to reuse from free list
HITCBC 295   332 if(q.free_list_) 295   335 if(q.free_list_)
296   { 296   {
MISUBC 297   frame_prefix* prefix = q.free_list_; 297   frame_prefix* prefix = q.free_list_;
MISUBC 298   q.free_list_ = prefix->next; 298   q.free_list_ = prefix->next;
MISUBC 299   raw = prefix; 299   raw = prefix;
300   } 300   }
301   else 301   else
302   { 302   {
HITCBC 303   332 raw = ::operator new(alloc_size); 303   335 raw = ::operator new(alloc_size);
304   } 304   }
305   305  
306   // Initialize prefix 306   // Initialize prefix
HITCBC 307   332 frame_prefix* prefix = static_cast<frame_prefix*>(raw); 307   335 frame_prefix* prefix = static_cast<frame_prefix*>(raw);
HITCBC 308   332 prefix->next = nullptr; 308   335 prefix->next = nullptr;
HITCBC 309   332 prefix->queue = &q; 309   335 prefix->queue = &q;
HITCBC 310   332 prefix->alloc_size = alloc_size; 310   335 prefix->alloc_size = alloc_size;
311   311  
312   // Return pointer AFTER the prefix (this is where coroutine frame goes) 312   // Return pointer AFTER the prefix (this is where coroutine frame goes)
HITCBC 313   332 return prefix + 1; 313   335 return prefix + 1;
314   } 314   }
315   315  
316   inline 316   inline
317   void 317   void
MISUBC 318   strand_op::promise_type::operator delete(void* p, std::size_t) 318   strand_op::promise_type::operator delete(void* p, std::size_t)
319   { 319   {
320   // Calculate back to get the prefix 320   // Calculate back to get the prefix
MISUBC 321   frame_prefix* prefix = static_cast<frame_prefix*>(p) - 1; 321   frame_prefix* prefix = static_cast<frame_prefix*>(p) - 1;
322   322  
323   // Add to free list 323   // Add to free list
MISUBC 324   prefix->next = prefix->queue->free_list_; 324   prefix->next = prefix->queue->free_list_;
MISUBC 325   prefix->queue->free_list_ = prefix; 325   prefix->queue->free_list_ = prefix;
MISUBC 326   } 326   }
327   327  
328   } // namespace detail 328   } // namespace detail
329   } // namespace capy 329   } // namespace capy
330   } // namespace boost 330   } // namespace boost
331   331  
332   #endif 332   #endif