100.00% Lines (128/128) 100.00% Functions (25/25)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) 2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3   // Copyright (c) 2026 Michael Vandeberg 3   // Copyright (c) 2026 Michael Vandeberg
4   // 4   //
5   // Distributed under the Boost Software License, Version 1.0. (See accompanying 5   // Distributed under the Boost Software License, Version 1.0. (See accompanying
6   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 6   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7   // 7   //
8   // Official repository: https://github.com/boostorg/capy 8   // Official repository: https://github.com/boostorg/capy
9   // 9   //
10   10  
11   #include <boost/capy/ex/thread_pool.hpp> 11   #include <boost/capy/ex/thread_pool.hpp>
12   #include <boost/capy/continuation.hpp> 12   #include <boost/capy/continuation.hpp>
13   #include <boost/capy/ex/frame_allocator.hpp> 13   #include <boost/capy/ex/frame_allocator.hpp>
14   #include <boost/capy/test/thread_name.hpp> 14   #include <boost/capy/test/thread_name.hpp>
15   #include <algorithm> 15   #include <algorithm>
16   #include <atomic> 16   #include <atomic>
17   #include <condition_variable> 17   #include <condition_variable>
18   #include <cstdio> 18   #include <cstdio>
19   #include <mutex> 19   #include <mutex>
20   #include <thread> 20   #include <thread>
21   #include <vector> 21   #include <vector>
22   22  
23   /* 23   /*
24   Thread pool implementation using a shared work queue. 24   Thread pool implementation using a shared work queue.
25   25  
26   Work items are continuations linked via their intrusive next pointer, 26   Work items are continuations linked via their intrusive next pointer,
27   stored in a single queue protected by a mutex. No per-post heap 27   stored in a single queue protected by a mutex. No per-post heap
28   allocation: the continuation is owned by the caller and linked 28   allocation: the continuation is owned by the caller and linked
29   directly. Worker threads wait on a condition_variable until work 29   directly. Worker threads wait on a condition_variable until work
30   is available or stop is requested. 30   is available or stop is requested.
31   31  
32   Threads are started lazily on first post() via std::call_once to avoid 32   Threads are started lazily on first post() via std::call_once to avoid
33   spawning threads for pools that are constructed but never used. Each 33   spawning threads for pools that are constructed but never used. Each
34   thread is named with a configurable prefix plus index for debugger 34   thread is named with a configurable prefix plus index for debugger
35   visibility. 35   visibility.
36   36  
37   Work tracking: on_work_started/on_work_finished maintain an atomic 37   Work tracking: on_work_started/on_work_finished maintain an atomic
38   outstanding_work_ counter. join() blocks until this counter reaches 38   outstanding_work_ counter. join() blocks until this counter reaches
39   zero, then signals workers to stop and joins threads. 39   zero, then signals workers to stop and joins threads.
40   40  
41   Two shutdown paths: 41   Two shutdown paths:
42   - join(): waits for outstanding work to drain, then stops workers. 42   - join(): waits for outstanding work to drain, then stops workers.
43   - stop(): immediately signals workers to exit; queued work is abandoned. 43   - stop(): immediately signals workers to exit; queued work is abandoned.
44   - Destructor: stop() then join() (abandon + wait for threads). 44   - Destructor: stop() then join() (abandon + wait for threads).
45   */ 45   */
46   46  
47   namespace boost { 47   namespace boost {
48   namespace capy { 48   namespace capy {
49   49  
50   //------------------------------------------------------------------------------ 50   //------------------------------------------------------------------------------
51   51  
52   class thread_pool::impl 52   class thread_pool::impl
53   { 53   {
54   // Intrusive queue of continuations via continuation::next. 54   // Intrusive queue of continuations via continuation::next.
55   // No per-post allocation: the continuation is owned by the caller. 55   // No per-post allocation: the continuation is owned by the caller.
56   continuation* head_ = nullptr; 56   continuation* head_ = nullptr;
57   continuation* tail_ = nullptr; 57   continuation* tail_ = nullptr;
58   58  
HITCBC 59   828 void push(continuation* c) noexcept 59   830 void push(continuation* c) noexcept
60   { 60   {
HITCBC 61   828 c->next = nullptr; 61   830 c->next = nullptr;
HITCBC 62   828 if(tail_) 62   830 if(tail_)
HITCBC 63   599 tail_->next = c; 63   595 tail_->next = c;
64   else 64   else
HITCBC 65   229 head_ = c; 65   235 head_ = c;
HITCBC 66   828 tail_ = c; 66   830 tail_ = c;
HITCBC 67   828 } 67   830 }
68   68  
HITCBC 69   985 continuation* pop() noexcept 69   988 continuation* pop() noexcept
70   { 70   {
HITCBC 71   985 if(!head_) 71   988 if(!head_)
HITCBC 72   157 return nullptr; 72   158 return nullptr;
HITCBC 73   828 continuation* c = head_; 73   830 continuation* c = head_;
HITCBC 74   828 head_ = head_->next; 74   830 head_ = head_->next;
HITCBC 75   828 if(!head_) 75   830 if(!head_)
HITCBC 76   229 tail_ = nullptr; 76   235 tail_ = nullptr;
HITCBC 77   828 return c; 77   830 return c;
78   } 78   }
79   79  
HITCBC 80   1036 bool empty() const noexcept 80   1060 bool empty() const noexcept
81   { 81   {
HITCBC 82   1036 return head_ == nullptr; 82   1060 return head_ == nullptr;
83   } 83   }
84   84  
85   std::mutex mutex_; 85   std::mutex mutex_;
86   std::condition_variable work_cv_; 86   std::condition_variable work_cv_;
87   std::condition_variable done_cv_; 87   std::condition_variable done_cv_;
88   std::vector<std::thread> threads_; 88   std::vector<std::thread> threads_;
89   std::atomic<std::size_t> outstanding_work_{0}; 89   std::atomic<std::size_t> outstanding_work_{0};
90   bool stop_{false}; 90   bool stop_{false};
91   bool joined_{false}; 91   bool joined_{false};
92   std::size_t num_threads_; 92   std::size_t num_threads_;
93   char thread_name_prefix_[13]{}; // 12 chars max + null terminator 93   char thread_name_prefix_[13]{}; // 12 chars max + null terminator
94   std::once_flag start_flag_; 94   std::once_flag start_flag_;
95   95  
96   public: 96   public:
HITCBC 97   157 ~impl() = default; 97   158 ~impl() = default;
98   98  
99   // Destroy abandoned coroutine frames. Must be called 99   // Destroy abandoned coroutine frames. Must be called
100   // before execution_context::shutdown()/destroy() so 100   // before execution_context::shutdown()/destroy() so
101   // that suspended-frame destructors (e.g. delay_awaitable 101   // that suspended-frame destructors (e.g. delay_awaitable
102   // calling timer_service::cancel()) run while services 102   // calling timer_service::cancel()) run while services
103   // are still valid. 103   // are still valid.
104   void 104   void
HITCBC 105   157 drain_abandoned() noexcept 105   158 drain_abandoned() noexcept
106   { 106   {
HITCBC 107   364 while(auto* c = pop()) 107   357 while(auto* c = pop())
108   { 108   {
HITCBC 109   207 auto h = c->h; 109   199 auto h = c->h;
HITCBC 110   207 if(h && h != std::noop_coroutine()) 110   199 if(h && h != std::noop_coroutine())
HITCBC 111   155 h.destroy(); 111   147 h.destroy();
HITCBC 112   207 } 112   199 }
HITCBC 113   157 } 113   158 }
114   114  
HITCBC 115   157 impl(std::size_t num_threads, std::string_view thread_name_prefix) 115   158 impl(std::size_t num_threads, std::string_view thread_name_prefix)
HITCBC 116   157 : num_threads_(num_threads) 116   158 : num_threads_(num_threads)
117   { 117   {
HITCBC 118   157 if(num_threads_ == 0) 118   158 if(num_threads_ == 0)
HITCBC 119   4 num_threads_ = std::max( 119   4 num_threads_ = std::max(
HITCBC 120   2 std::thread::hardware_concurrency(), 1u); 120   2 std::thread::hardware_concurrency(), 1u);
121   121  
122   // Truncate prefix to 12 chars, leaving room for up to 3-digit index. 122   // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
HITCBC 123   157 auto n = thread_name_prefix.copy(thread_name_prefix_, 12); 123   158 auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
HITCBC 124   157 thread_name_prefix_[n] = '\0'; 124   158 thread_name_prefix_[n] = '\0';
HITCBC 125   157 } 125   158 }
126   126  
127   void 127   void
HITCBC 128   828 post(continuation& c) 128   830 post(continuation& c)
129   { 129   {
HITCBC 130   828 ensure_started(); 130   830 ensure_started();
131   { 131   {
HITCBC 132   828 std::lock_guard<std::mutex> lock(mutex_); 132   830 std::lock_guard<std::mutex> lock(mutex_);
HITCBC 133   828 push(&c); 133   830 push(&c);
HITCBC 134   828 } 134   830 }
HITCBC 135   828 work_cv_.notify_one(); 135   830 work_cv_.notify_one();
HITCBC 136   828 } 136   830 }
137   137  
138   void 138   void
HITCBC 139   345 on_work_started() noexcept 139   345 on_work_started() noexcept
140   { 140   {
HITCBC 141   345 outstanding_work_.fetch_add(1, std::memory_order_acq_rel); 141   345 outstanding_work_.fetch_add(1, std::memory_order_acq_rel);
HITCBC 142   345 } 142   345 }
143   143  
144   void 144   void
HITCBC 145   345 on_work_finished() noexcept 145   345 on_work_finished() noexcept
146   { 146   {
HITCBC 147   345 if(outstanding_work_.fetch_sub( 147   345 if(outstanding_work_.fetch_sub(
HITCBC 148   345 1, std::memory_order_acq_rel) == 1) 148   345 1, std::memory_order_acq_rel) == 1)
149   { 149   {
HITCBC 150   85 std::lock_guard<std::mutex> lock(mutex_); 150   85 std::lock_guard<std::mutex> lock(mutex_);
HITCBC 151   85 if(joined_ && !stop_) 151   85 if(joined_ && !stop_)
HITCBC 152   4 stop_ = true; 152   4 stop_ = true;
HITCBC 153   85 done_cv_.notify_all(); 153   85 done_cv_.notify_all();
HITCBC 154   85 work_cv_.notify_all(); 154   85 work_cv_.notify_all();
HITCBC 155   85 } 155   85 }
HITCBC 156   345 } 156   345 }
157   157  
158   void 158   void
HITCBC 159   168 join() noexcept 159   170 join() noexcept
160   { 160   {
161   { 161   {
HITCBC 162   168 std::unique_lock<std::mutex> lock(mutex_); 162   170 std::unique_lock<std::mutex> lock(mutex_);
HITCBC 163   168 if(joined_) 163   170 if(joined_)
HITCBC 164   11 return; 164   12 return;
HITCBC 165   157 joined_ = true; 165   158 joined_ = true;
166   166  
HITCBC 167   157 if(outstanding_work_.load( 167   158 if(outstanding_work_.load(
HITCBC 168   157 std::memory_order_acquire) == 0) 168   158 std::memory_order_acquire) == 0)
169   { 169   {
HITCBC 170   100 stop_ = true; 170   100 stop_ = true;
HITCBC 171   100 work_cv_.notify_all(); 171   100 work_cv_.notify_all();
172   } 172   }
173   else 173   else
174   { 174   {
HITCBC 175   57 done_cv_.wait(lock, [this]{ 175   58 done_cv_.wait(lock, [this]{
HITCBC 176   62 return stop_; 176   63 return stop_;
177   }); 177   });
178   } 178   }
HITCBC 179   168 } 179   170 }
180   180  
HITCBC 181   336 for(auto& t : threads_) 181   339 for(auto& t : threads_)
HITCBC 182   179 if(t.joinable()) 182   181 if(t.joinable())
HITCBC 183   179 t.join(); 183   181 t.join();
184   } 184   }
185   185  
186   void 186   void
HITCBC 187   159 stop() noexcept 187   160 stop() noexcept
188   { 188   {
189   { 189   {
HITCBC 190   159 std::lock_guard<std::mutex> lock(mutex_); 190   160 std::lock_guard<std::mutex> lock(mutex_);
HITCBC 191   159 stop_ = true; 191   160 stop_ = true;
HITCBC 192   159 } 192   160 }
HITCBC 193   159 work_cv_.notify_all(); 193   160 work_cv_.notify_all();
HITCBC 194   159 done_cv_.notify_all(); 194   160 done_cv_.notify_all();
HITCBC 195   159 } 195   160 }
196   196  
197   private: 197   private:
198   void 198   void
HITCBC 199   828 ensure_started() 199   830 ensure_started()
200   { 200   {
HITCBC 201   828 std::call_once(start_flag_, [this]{ 201   830 std::call_once(start_flag_, [this]{
HITCBC 202   101 threads_.reserve(num_threads_); 202   102 threads_.reserve(num_threads_);
HITCBC 203   280 for(std::size_t i = 0; i < num_threads_; ++i) 203   283 for(std::size_t i = 0; i < num_threads_; ++i)
HITCBC 204   358 threads_.emplace_back([this, i]{ run(i); }); 204   362 threads_.emplace_back([this, i]{ run(i); });
HITCBC 205   101 }); 205   102 });
HITCBC 206   828 } 206   830 }
207   207  
208   void 208   void
HITCBC 209   179 run(std::size_t index) 209   181 run(std::size_t index)
210   { 210   {
211   // Build name; set_current_thread_name truncates to platform limits. 211   // Build name; set_current_thread_name truncates to platform limits.
212   char name[16]; 212   char name[16];
HITCBC 213   179 std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index); 213   181 std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
HITCBC 214   179 set_current_thread_name(name); 214   181 set_current_thread_name(name);
215   215  
216   for(;;) 216   for(;;)
217   { 217   {
HITCBC 218   800 continuation* c = nullptr; 218   812 continuation* c = nullptr;
219   { 219   {
HITCBC 220   800 std::unique_lock<std::mutex> lock(mutex_); 220   812 std::unique_lock<std::mutex> lock(mutex_);
HITCBC 221   800 work_cv_.wait(lock, [this]{ 221   812 work_cv_.wait(lock, [this]{
HITCBC 222   1349 return !empty() || 222   1397 return !empty() ||
HITCBC 223   1349 stop_; 223   1397 stop_;
224   }); 224   });
HITCBC 225   800 if(stop_) 225   812 if(stop_)
HITCBC 226   358 return; 226   362 return;
HITCBC 227   621 c = pop(); 227   631 c = pop();
HITCBC 228   800 } 228   812 }
HITCBC 229   621 if(c) 229   631 if(c)
HITCBC 230   621 safe_resume(c->h); 230   631 safe_resume(c->h);
HITCBC 231   621 } 231   631 }
232   } 232   }
233   }; 233   };
234   234  
235   //------------------------------------------------------------------------------ 235   //------------------------------------------------------------------------------
236   236  
HITCBC 237   157 thread_pool:: 237   158 thread_pool::
238   ~thread_pool() 238   ~thread_pool()
239   { 239   {
HITCBC 240   157 impl_->stop(); 240   158 impl_->stop();
HITCBC 241   157 impl_->join(); 241   158 impl_->join();
HITCBC 242   157 impl_->drain_abandoned(); 242   158 impl_->drain_abandoned();
HITCBC 243   157 shutdown(); 243   158 shutdown();
HITCBC 244   157 destroy(); 244   158 destroy();
HITCBC 245   157 delete impl_; 245   158 delete impl_;
HITCBC 246   157 } 246   158 }
247   247  
HITCBC 248   157 thread_pool:: 248   158 thread_pool::
HITCBC 249   157 thread_pool(std::size_t num_threads, std::string_view thread_name_prefix) 249   158 thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
HITCBC 250   157 : impl_(new impl(num_threads, thread_name_prefix)) 250   158 : impl_(new impl(num_threads, thread_name_prefix))
251   { 251   {
HITCBC 252   157 this->set_frame_allocator(std::allocator<void>{}); 252   158 this->set_frame_allocator(std::allocator<void>{});
HITCBC 253   157 } 253   158 }
254   254  
255   void 255   void
HITCBC 256   11 thread_pool:: 256   12 thread_pool::
257   join() noexcept 257   join() noexcept
258   { 258   {
HITCBC 259   11 impl_->join(); 259   12 impl_->join();
HITCBC 260   11 } 260   12 }
261   261  
262   void 262   void
HITCBC 263   2 thread_pool:: 263   2 thread_pool::
264   stop() noexcept 264   stop() noexcept
265   { 265   {
HITCBC 266   2 impl_->stop(); 266   2 impl_->stop();
HITCBC 267   2 } 267   2 }
268   268  
269   //------------------------------------------------------------------------------ 269   //------------------------------------------------------------------------------
270   270  
271   thread_pool::executor_type 271   thread_pool::executor_type
HITCBC 272   163 thread_pool:: 272   164 thread_pool::
273   get_executor() const noexcept 273   get_executor() const noexcept
274   { 274   {
HITCBC 275   163 return executor_type( 275   164 return executor_type(
HITCBC 276   163 const_cast<thread_pool&>(*this)); 276   164 const_cast<thread_pool&>(*this));
277   } 277   }
278   278  
279   void 279   void
HITCBC 280   345 thread_pool::executor_type:: 280   345 thread_pool::executor_type::
281   on_work_started() const noexcept 281   on_work_started() const noexcept
282   { 282   {
HITCBC 283   345 pool_->impl_->on_work_started(); 283   345 pool_->impl_->on_work_started();
HITCBC 284   345 } 284   345 }
285   285  
286   void 286   void
HITCBC 287   345 thread_pool::executor_type:: 287   345 thread_pool::executor_type::
288   on_work_finished() const noexcept 288   on_work_finished() const noexcept
289   { 289   {
HITCBC 290   345 pool_->impl_->on_work_finished(); 290   345 pool_->impl_->on_work_finished();
HITCBC 291   345 } 291   345 }
292   292  
293   void 293   void
HITCBC 294   828 thread_pool::executor_type:: 294   830 thread_pool::executor_type::
295   post(continuation& c) const 295   post(continuation& c) const
296   { 296   {
HITCBC 297   828 pool_->impl_->post(c); 297   830 pool_->impl_->post(c);
HITCBC 298   828 } 298   830 }
299   299  
300   } // capy 300   } // capy
301   } // boost 301   } // boost