97.94% Lines (95/97) 95.65% Functions (22/23)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) 2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3   // 3   //
4   // Distributed under the Boost Software License, Version 1.0. (See accompanying 4   // Distributed under the Boost Software License, Version 1.0. (See accompanying
5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6   // 6   //
7   // Official repository: https://github.com/cppalliance/capy 7   // Official repository: https://github.com/cppalliance/capy
8   // 8   //
9   9  
10   #include "src/ex/detail/strand_queue.hpp" 10   #include "src/ex/detail/strand_queue.hpp"
11   #include <boost/capy/ex/detail/strand_service.hpp> 11   #include <boost/capy/ex/detail/strand_service.hpp>
12   #include <boost/capy/continuation.hpp> 12   #include <boost/capy/continuation.hpp>
13   #include <atomic> 13   #include <atomic>
14   #include <coroutine> 14   #include <coroutine>
15   #include <mutex> 15   #include <mutex>
16   #include <thread> 16   #include <thread>
17   #include <utility> 17   #include <utility>
18   18  
19   namespace boost { 19   namespace boost {
20   namespace capy { 20   namespace capy {
21   namespace detail { 21   namespace detail {
22   22  
23   //---------------------------------------------------------- 23   //----------------------------------------------------------
24   24  
25   /** Implementation state for a strand. 25   /** Implementation state for a strand.
26   26  
27   Each strand_impl provides serialization for coroutines 27   Each strand_impl provides serialization for coroutines
28   dispatched through strands that share it. 28   dispatched through strands that share it.
29   */ 29   */
30   // Sentinel stored in cached_frame_ after shutdown to prevent 30   // Sentinel stored in cached_frame_ after shutdown to prevent
31   // in-flight invokers from repopulating a freed cache slot. 31   // in-flight invokers from repopulating a freed cache slot.
32   inline void* const kCacheClosed = reinterpret_cast<void*>(1); 32   inline void* const kCacheClosed = reinterpret_cast<void*>(1);
33   33  
34   struct strand_impl 34   struct strand_impl
35   { 35   {
36   std::mutex mutex_; 36   std::mutex mutex_;
37   strand_queue pending_; 37   strand_queue pending_;
38   bool locked_ = false; 38   bool locked_ = false;
39   std::atomic<std::thread::id> dispatch_thread_{}; 39   std::atomic<std::thread::id> dispatch_thread_{};
40   std::atomic<void*> cached_frame_{nullptr}; 40   std::atomic<void*> cached_frame_{nullptr};
41   }; 41   };
42   42  
43   //---------------------------------------------------------- 43   //----------------------------------------------------------
44   44  
45   /** Invoker coroutine for strand dispatch. 45   /** Invoker coroutine for strand dispatch.
46   46  
47   Uses custom allocator to recycle frame - one allocation 47   Uses custom allocator to recycle frame - one allocation
48   per strand_impl lifetime, stored in trailer for recovery. 48   per strand_impl lifetime, stored in trailer for recovery.
49   */ 49   */
50   struct strand_invoker 50   struct strand_invoker
51   { 51   {
52   struct promise_type 52   struct promise_type
53   { 53   {
54   // Used to post the invoker through the inner executor. 54   // Used to post the invoker through the inner executor.
55   // Lives in the coroutine frame (heap-allocated), so has 55   // Lives in the coroutine frame (heap-allocated), so has
56   // a stable address for the duration of the queue residency. 56   // a stable address for the duration of the queue residency.
57   continuation self_; 57   continuation self_;
58   58  
HITCBC 59   12 void* operator new(std::size_t n, strand_impl& impl) 59   16 void* operator new(std::size_t n, strand_impl& impl)
60   { 60   {
HITCBC 61   12 constexpr auto A = alignof(strand_impl*); 61   16 constexpr auto A = alignof(strand_impl*);
HITCBC 62   12 std::size_t padded = (n + A - 1) & ~(A - 1); 62   16 std::size_t padded = (n + A - 1) & ~(A - 1);
HITCBC 63   12 std::size_t total = padded + sizeof(strand_impl*); 63   16 std::size_t total = padded + sizeof(strand_impl*);
64   64  
HITCBC 65   12 void* p = impl.cached_frame_.exchange( 65   16 void* p = impl.cached_frame_.exchange(
66   nullptr, std::memory_order_acquire); 66   nullptr, std::memory_order_acquire);
HITCBC 67   12 if(!p || p == kCacheClosed) 67   16 if(!p || p == kCacheClosed)
HITCBC 68   11 p = ::operator new(total); 68   13 p = ::operator new(total);
69   69  
70   // Trailer lets delete recover impl 70   // Trailer lets delete recover impl
HITCBC 71   12 *reinterpret_cast<strand_impl**>( 71   16 *reinterpret_cast<strand_impl**>(
HITCBC 72   12 static_cast<char*>(p) + padded) = &impl; 72   16 static_cast<char*>(p) + padded) = &impl;
HITCBC 73   12 return p; 73   16 return p;
74   } 74   }
75   75  
HITCBC 76   12 void operator delete(void* p, std::size_t n) noexcept 76   16 void operator delete(void* p, std::size_t n) noexcept
77   { 77   {
HITCBC 78   12 constexpr auto A = alignof(strand_impl*); 78   16 constexpr auto A = alignof(strand_impl*);
HITCBC 79   12 std::size_t padded = (n + A - 1) & ~(A - 1); 79   16 std::size_t padded = (n + A - 1) & ~(A - 1);
80   80  
HITCBC 81   12 auto* impl = *reinterpret_cast<strand_impl**>( 81   16 auto* impl = *reinterpret_cast<strand_impl**>(
82   static_cast<char*>(p) + padded); 82   static_cast<char*>(p) + padded);
83   83  
HITCBC 84   12 void* expected = nullptr; 84   16 void* expected = nullptr;
HITCBC 85   12 if(!impl->cached_frame_.compare_exchange_strong( 85   16 if(!impl->cached_frame_.compare_exchange_strong(
86   expected, p, std::memory_order_release)) 86   expected, p, std::memory_order_release))
MISUBC 87   ::operator delete(p); 87   ::operator delete(p);
HITCBC 88   12 } 88   16 }
89   89  
HITCBC 90   12 strand_invoker get_return_object() noexcept 90   16 strand_invoker get_return_object() noexcept
HITCBC 91   12 { return {std::coroutine_handle<promise_type>::from_promise(*this)}; } 91   16 { return {std::coroutine_handle<promise_type>::from_promise(*this)}; }
92   92  
HITCBC 93   12 std::suspend_always initial_suspend() noexcept { return {}; } 93   16 std::suspend_always initial_suspend() noexcept { return {}; }
HITCBC 94   12 std::suspend_never final_suspend() noexcept { return {}; } 94   16 std::suspend_never final_suspend() noexcept { return {}; }
HITCBC 95   12 void return_void() noexcept {} 95   16 void return_void() noexcept {}
MISUBC 96   void unhandled_exception() { std::terminate(); } 96   void unhandled_exception() { std::terminate(); }
97   }; 97   };
98   98  
99   std::coroutine_handle<promise_type> h_; 99   std::coroutine_handle<promise_type> h_;
100   }; 100   };
101   101  
102   //---------------------------------------------------------- 102   //----------------------------------------------------------
103   103  
104   /** Concrete implementation of strand_service. 104   /** Concrete implementation of strand_service.
105   105  
106   Holds the fixed pool of strand_impl objects. 106   Holds the fixed pool of strand_impl objects.
107   */ 107   */
108   class strand_service_impl : public strand_service 108   class strand_service_impl : public strand_service
109   { 109   {
110   static constexpr std::size_t num_impls = 211; 110   static constexpr std::size_t num_impls = 211;
111   111  
112   strand_impl impls_[num_impls]; 112   strand_impl impls_[num_impls];
113   std::size_t salt_ = 0; 113   std::size_t salt_ = 0;
114   std::mutex mutex_; 114   std::mutex mutex_;
115   115  
116   public: 116   public:
117   explicit 117   explicit
HITCBC 118   23 strand_service_impl(execution_context&) 118   25 strand_service_impl(execution_context&)
HITCBC 119   4876 { 119   5300 {
HITCBC 120   23 } 120   25 }
121   121  
122   strand_impl* 122   strand_impl*
HITCBC 123   27 get_implementation() override 123   29 get_implementation() override
124   { 124   {
HITCBC 125   27 std::lock_guard<std::mutex> lock(mutex_); 125   29 std::lock_guard<std::mutex> lock(mutex_);
HITCBC 126   27 std::size_t index = salt_++; 126   29 std::size_t index = salt_++;
HITCBC 127   27 index = index % num_impls; 127   29 index = index % num_impls;
HITCBC 128   27 return &impls_[index]; 128   29 return &impls_[index];
HITCBC 129   27 } 129   29 }
130   130  
131   protected: 131   protected:
132   void 132   void
HITCBC 133   23 shutdown() override 133   25 shutdown() override
134   { 134   {
HITCBC 135   4876 for(std::size_t i = 0; i < num_impls; ++i) 135   5300 for(std::size_t i = 0; i < num_impls; ++i)
136   { 136   {
HITCBC 137   4853 std::lock_guard<std::mutex> lock(impls_[i].mutex_); 137   5275 std::lock_guard<std::mutex> lock(impls_[i].mutex_);
HITCBC 138   4853 impls_[i].locked_ = true; 138   5275 impls_[i].locked_ = true;
139   139  
HITCBC 140   4853 void* p = impls_[i].cached_frame_.exchange( 140   5275 void* p = impls_[i].cached_frame_.exchange(
141   kCacheClosed, std::memory_order_acquire); 141   kCacheClosed, std::memory_order_acquire);
HITCBC 142   4853 if(p) 142   5275 if(p)
HITCBC 143   11 ::operator delete(p); 143   13 ::operator delete(p);
HITCBC 144   4853 } 144   5275 }
HITCBC 145   23 } 145   25 }
146   146  
147   private: 147   private:
148   static bool 148   static bool
HITCBC 149   332 enqueue(strand_impl& impl, std::coroutine_handle<> h) 149   335 enqueue(strand_impl& impl, std::coroutine_handle<> h)
150   { 150   {
HITCBC 151   332 std::lock_guard<std::mutex> lock(impl.mutex_); 151   335 std::lock_guard<std::mutex> lock(impl.mutex_);
HITCBC 152   332 impl.pending_.push(h); 152   335 impl.pending_.push(h);
HITCBC 153   332 if(!impl.locked_) 153   335 if(!impl.locked_)
154   { 154   {
HITCBC 155   12 impl.locked_ = true; 155   16 impl.locked_ = true;
HITCBC 156   12 return true; 156   16 return true;
157   } 157   }
HITCBC 158   320 return false; 158   319 return false;
HITCBC 159   332 } 159   335 }
160   160  
161   static void 161   static void
HITCBC 162   15 dispatch_pending(strand_impl& impl) 162   22 dispatch_pending(strand_impl& impl)
163   { 163   {
HITCBC 164   15 strand_queue::taken_batch batch; 164   22 strand_queue::taken_batch batch;
165   { 165   {
HITCBC 166   15 std::lock_guard<std::mutex> lock(impl.mutex_); 166   22 std::lock_guard<std::mutex> lock(impl.mutex_);
HITCBC 167   15 batch = impl.pending_.take_all(); 167   22 batch = impl.pending_.take_all();
HITCBC 168   15 } 168   22 }
HITCBC 169   15 impl.pending_.dispatch_batch(batch); 169   22 impl.pending_.dispatch_batch(batch);
HITCBC 170   15 } 170   22 }
171   171  
172   static bool 172   static bool
HITCBC 173   15 try_unlock(strand_impl& impl) 173   22 try_unlock(strand_impl& impl)
174   { 174   {
HITCBC 175   15 std::lock_guard<std::mutex> lock(impl.mutex_); 175   22 std::lock_guard<std::mutex> lock(impl.mutex_);
HITCBC 176   15 if(impl.pending_.empty()) 176   22 if(impl.pending_.empty())
177   { 177   {
HITCBC 178   12 impl.locked_ = false; 178   16 impl.locked_ = false;
HITCBC 179   12 return true; 179   16 return true;
180   } 180   }
HITCBC 181   3 return false; 181   6 return false;
HITCBC 182   15 } 182   22 }
183   183  
184   static void 184   static void
HITCBC 185   15 set_dispatch_thread(strand_impl& impl) noexcept 185   22 set_dispatch_thread(strand_impl& impl) noexcept
186   { 186   {
HITCBC 187   15 impl.dispatch_thread_.store(std::this_thread::get_id()); 187   22 impl.dispatch_thread_.store(std::this_thread::get_id());
HITCBC 188   15 } 188   22 }
189   189  
190   static void 190   static void
HITCBC 191   12 clear_dispatch_thread(strand_impl& impl) noexcept 191   16 clear_dispatch_thread(strand_impl& impl) noexcept
192   { 192   {
HITCBC 193   12 impl.dispatch_thread_.store(std::thread::id{}); 193   16 impl.dispatch_thread_.store(std::thread::id{});
HITCBC 194   12 } 194   16 }
195   195  
196   // Loops until queue empty (aggressive). Alternative: per-batch fairness 196   // Loops until queue empty (aggressive). Alternative: per-batch fairness
197   // (repost after each batch to let other work run) - explore if starvation observed. 197   // (repost after each batch to let other work run) - explore if starvation observed.
198   static strand_invoker 198   static strand_invoker
HITCBC 199   12 make_invoker(strand_impl& impl) 199   16 make_invoker(strand_impl& impl)
200   { 200   {
201   strand_impl* p = &impl; 201   strand_impl* p = &impl;
202   for(;;) 202   for(;;)
203   { 203   {
204   set_dispatch_thread(*p); 204   set_dispatch_thread(*p);
205   dispatch_pending(*p); 205   dispatch_pending(*p);
206   if(try_unlock(*p)) 206   if(try_unlock(*p))
207   { 207   {
208   clear_dispatch_thread(*p); 208   clear_dispatch_thread(*p);
209   co_return; 209   co_return;
210   } 210   }
211   } 211   }
HITCBC 212   24 } 212   32 }
213   213  
214   static void 214   static void
HITCBC 215   12 post_invoker(strand_impl& impl, executor_ref ex) 215   16 post_invoker(strand_impl& impl, executor_ref ex)
216   { 216   {
HITCBC 217   12 auto invoker = make_invoker(impl); 217   16 auto invoker = make_invoker(impl);
HITCBC 218   12 auto& self = invoker.h_.promise().self_; 218   16 auto& self = invoker.h_.promise().self_;
HITCBC 219   12 self.h = invoker.h_; 219   16 self.h = invoker.h_;
HITCBC 220   12 ex.post(self); 220   16 ex.post(self);
HITCBC 221   12 } 221   16 }
222   222  
223   friend class strand_service; 223   friend class strand_service;
224   }; 224   };
225   225  
226   //---------------------------------------------------------- 226   //----------------------------------------------------------
227   227  
HITCBC 228   23 strand_service:: 228   25 strand_service::
HITCBC 229   23 strand_service() 229   25 strand_service()
HITCBC 230   23 : service() 230   25 : service()
231   { 231   {
HITCBC 232   23 } 232   25 }
233   233  
HITCBC 234   23 strand_service:: 234   25 strand_service::
235   ~strand_service() = default; 235   ~strand_service() = default;
236   236  
237   bool 237   bool
HITCBC 238   10 strand_service:: 238   12 strand_service::
239   running_in_this_thread(strand_impl& impl) noexcept 239   running_in_this_thread(strand_impl& impl) noexcept
240   { 240   {
HITCBC 241   10 return impl.dispatch_thread_.load() == std::this_thread::get_id(); 241   12 return impl.dispatch_thread_.load() == std::this_thread::get_id();
242   } 242   }
243   243  
244   std::coroutine_handle<> 244   std::coroutine_handle<>
HITCBC 245   8 strand_service:: 245   8 strand_service::
246   dispatch(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h) 246   dispatch(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h)
247   { 247   {
HITCBC 248   8 if(running_in_this_thread(impl)) 248   8 if(running_in_this_thread(impl))
HITCBC 249   3 return h; 249   3 return h;
250   250  
HITCBC 251   5 if(strand_service_impl::enqueue(impl, h)) 251   5 if(strand_service_impl::enqueue(impl, h))
HITCBC 252   5 strand_service_impl::post_invoker(impl, ex); 252   5 strand_service_impl::post_invoker(impl, ex);
HITCBC 253   5 return std::noop_coroutine(); 253   5 return std::noop_coroutine();
254   } 254   }
255   255  
256   void 256   void
HITCBC 257   327 strand_service:: 257   330 strand_service::
258   post(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h) 258   post(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h)
259   { 259   {
HITCBC 260   327 if(strand_service_impl::enqueue(impl, h)) 260   330 if(strand_service_impl::enqueue(impl, h))
HITCBC 261   7 strand_service_impl::post_invoker(impl, ex); 261   11 strand_service_impl::post_invoker(impl, ex);
HITCBC 262   327 } 262   330 }
263   263  
264   strand_service& 264   strand_service&
HITCBC 265   27 get_strand_service(execution_context& ctx) 265   29 get_strand_service(execution_context& ctx)
266   { 266   {
HITCBC 267   27 return ctx.use_service<strand_service_impl>(); 267   29 return ctx.use_service<strand_service_impl>();
268   } 268   }
269   269  
270   } // namespace detail 270   } // namespace detail
271   } // namespace capy 271   } // namespace capy
272   } // namespace boost 272   } // namespace boost