100.00% Lines (10/10) 100.00% Functions (4/4)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) 2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3   // Copyright (c) 2026 Michael Vandeberg 3   // Copyright (c) 2026 Michael Vandeberg
4   // 4   //
5   // Distributed under the Boost Software License, Version 1.0. (See accompanying 5   // Distributed under the Boost Software License, Version 1.0. (See accompanying
6   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 6   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7   // 7   //
8   // Official repository: https://github.com/cppalliance/capy 8   // Official repository: https://github.com/cppalliance/capy
9   // 9   //
10   10  
11   #ifndef BOOST_CAPY_EX_THREAD_POOL_HPP 11   #ifndef BOOST_CAPY_EX_THREAD_POOL_HPP
12   #define BOOST_CAPY_EX_THREAD_POOL_HPP 12   #define BOOST_CAPY_EX_THREAD_POOL_HPP
13   13  
14   #include <boost/capy/detail/config.hpp> 14   #include <boost/capy/detail/config.hpp>
15   #include <boost/capy/continuation.hpp> 15   #include <boost/capy/continuation.hpp>
16   #include <coroutine> 16   #include <coroutine>
17   #include <boost/capy/ex/execution_context.hpp> 17   #include <boost/capy/ex/execution_context.hpp>
18   #include <cstddef> 18   #include <cstddef>
19   #include <string_view> 19   #include <string_view>
20   20  
21   namespace boost { 21   namespace boost {
22   namespace capy { 22   namespace capy {
23   23  
24   /** A pool of threads for executing work concurrently. 24   /** A pool of threads for executing work concurrently.
25   25  
26   Use this when you need to run coroutines on multiple threads 26   Use this when you need to run coroutines on multiple threads
27   without the overhead of creating and destroying threads for 27   without the overhead of creating and destroying threads for
28   each task. Work items are distributed across the pool using 28   each task. Work items are distributed across the pool using
29   a shared queue. 29   a shared queue.
30   30  
31   @par Thread Safety 31   @par Thread Safety
32   Distinct objects: Safe. 32   Distinct objects: Safe.
33   Shared objects: Unsafe. 33   Shared objects: Unsafe.
34   34  
35   @par Example 35   @par Example
36   @code 36   @code
37   thread_pool pool(4); // 4 worker threads 37   thread_pool pool(4); // 4 worker threads
38   auto ex = pool.get_executor(); 38   auto ex = pool.get_executor();
39   ex.post(some_coroutine); 39   ex.post(some_coroutine);
40   // pool destructor waits for all work to complete 40   // pool destructor waits for all work to complete
41   @endcode 41   @endcode
42   */ 42   */
43   class BOOST_CAPY_DECL 43   class BOOST_CAPY_DECL
44   thread_pool 44   thread_pool
45   : public execution_context 45   : public execution_context
46   { 46   {
47   class impl; 47   class impl;
48   impl* impl_; 48   impl* impl_;
49   49  
50   public: 50   public:
51   class executor_type; 51   class executor_type;
52   52  
53   /** Destroy the thread pool. 53   /** Destroy the thread pool.
54   54  
55   Signals all worker threads to stop, waits for them to 55   Signals all worker threads to stop, waits for them to
56   finish, and destroys any pending work items. 56   finish, and destroys any pending work items.
57   */ 57   */
58   ~thread_pool(); 58   ~thread_pool();
59   59  
60   /** Construct a thread pool. 60   /** Construct a thread pool.
61   61  
62   Creates a pool with the specified number of worker threads. 62   Creates a pool with the specified number of worker threads.
63   If `num_threads` is zero, the number of threads is set to 63   If `num_threads` is zero, the number of threads is set to
64   the hardware concurrency, or one if that cannot be determined. 64   the hardware concurrency, or one if that cannot be determined.
65   65  
66   @param num_threads The number of worker threads, or zero 66   @param num_threads The number of worker threads, or zero
67   for automatic selection. 67   for automatic selection.
68   68  
69   @param thread_name_prefix The prefix for worker thread names. 69   @param thread_name_prefix The prefix for worker thread names.
70   Thread names appear as "{prefix}0", "{prefix}1", etc. 70   Thread names appear as "{prefix}0", "{prefix}1", etc.
71   The prefix is truncated to 12 characters. Defaults to 71   The prefix is truncated to 12 characters. Defaults to
72   "capy-pool-". 72   "capy-pool-".
73   */ 73   */
74   explicit 74   explicit
75   thread_pool( 75   thread_pool(
76   std::size_t num_threads = 0, 76   std::size_t num_threads = 0,
77   std::string_view thread_name_prefix = "capy-pool-"); 77   std::string_view thread_name_prefix = "capy-pool-");
78   78  
79   thread_pool(thread_pool const&) = delete; 79   thread_pool(thread_pool const&) = delete;
80   thread_pool& operator=(thread_pool const&) = delete; 80   thread_pool& operator=(thread_pool const&) = delete;
81   81  
82   /** Wait for all outstanding work to complete. 82   /** Wait for all outstanding work to complete.
83   83  
84   Releases the internal work guard, then blocks the calling 84   Releases the internal work guard, then blocks the calling
85   thread until all outstanding work tracked by 85   thread until all outstanding work tracked by
86   @ref executor_type::on_work_started and 86   @ref executor_type::on_work_started and
87   @ref executor_type::on_work_finished completes. After all 87   @ref executor_type::on_work_finished completes. After all
88   work finishes, joins the worker threads. 88   work finishes, joins the worker threads.
89   89  
90   If @ref stop is called while `join()` is blocking, the 90   If @ref stop is called while `join()` is blocking, the
91   pool stops without waiting for remaining work to 91   pool stops without waiting for remaining work to
92   complete. Worker threads finish their current item and 92   complete. Worker threads finish their current item and
93   exit; `join()` still waits for all threads to be joined 93   exit; `join()` still waits for all threads to be joined
94   before returning. 94   before returning.
95   95  
96   This function is idempotent. The first call performs the 96   This function is idempotent. The first call performs the
97   join; subsequent calls return immediately. 97   join; subsequent calls return immediately.
98   98  
99   @par Preconditions 99   @par Preconditions
100   Must not be called from a thread in this pool (undefined 100   Must not be called from a thread in this pool (undefined
101   behavior). 101   behavior).
102   102  
103   @par Postconditions 103   @par Postconditions
104   All worker threads have been joined. The pool cannot be 104   All worker threads have been joined. The pool cannot be
105   reused. 105   reused.
106   106  
107   @par Thread Safety 107   @par Thread Safety
108   May be called from any thread not in this pool. 108   May be called from any thread not in this pool.
109   */ 109   */
110   void 110   void
111   join() noexcept; 111   join() noexcept;
112   112  
113   /** Request all worker threads to stop. 113   /** Request all worker threads to stop.
114   114  
115   Signals all threads to exit after finishing their current 115   Signals all threads to exit after finishing their current
116   work item. Queued work that has not started is abandoned. 116   work item. Queued work that has not started is abandoned.
117   Does not wait for threads to exit. 117   Does not wait for threads to exit.
118   118  
119   If @ref join is blocking on another thread, calling 119   If @ref join is blocking on another thread, calling
120   `stop()` causes it to stop waiting for outstanding 120   `stop()` causes it to stop waiting for outstanding
121   work. The `join()` call still waits for worker threads 121   work. The `join()` call still waits for worker threads
122   to finish their current item and exit before returning. 122   to finish their current item and exit before returning.
123   */ 123   */
124   void 124   void
125   stop() noexcept; 125   stop() noexcept;
126   126  
127   /** Return an executor for this thread pool. 127   /** Return an executor for this thread pool.
128   128  
129   @return An executor associated with this thread pool. 129   @return An executor associated with this thread pool.
130   */ 130   */
131   executor_type 131   executor_type
132   get_executor() const noexcept; 132   get_executor() const noexcept;
133   }; 133   };
134   134  
135   /** An executor that submits work to a thread_pool. 135   /** An executor that submits work to a thread_pool.
136   136  
137   Executors are lightweight handles that can be copied and stored. 137   Executors are lightweight handles that can be copied and stored.
138   All copies refer to the same underlying thread pool. 138   All copies refer to the same underlying thread pool.
139   139  
140   @par Thread Safety 140   @par Thread Safety
141   Distinct objects: Safe. 141   Distinct objects: Safe.
142   Shared objects: Safe. 142   Shared objects: Safe.
143   */ 143   */
144   class thread_pool::executor_type 144   class thread_pool::executor_type
145   { 145   {
146   friend class thread_pool; 146   friend class thread_pool;
147   147  
148   thread_pool* pool_ = nullptr; 148   thread_pool* pool_ = nullptr;
149   149  
150   explicit 150   explicit
HITCBC 151   163 executor_type(thread_pool& pool) noexcept 151   164 executor_type(thread_pool& pool) noexcept
HITCBC 152   163 : pool_(&pool) 152   164 : pool_(&pool)
153   { 153   {
HITCBC 154   163 } 154   164 }
155   155  
156   public: 156   public:
157   /** Construct a default null executor. 157   /** Construct a default null executor.
158   158  
159   The resulting executor is not associated with any pool. 159   The resulting executor is not associated with any pool.
160   `context()`, `dispatch()`, and `post()` require the 160   `context()`, `dispatch()`, and `post()` require the
161   executor to be associated with a pool before use. 161   executor to be associated with a pool before use.
162   */ 162   */
163   executor_type() = default; 163   executor_type() = default;
164   164  
165   /// Return the underlying thread pool. 165   /// Return the underlying thread pool.
166   thread_pool& 166   thread_pool&
HITCBC 167   400 context() const noexcept 167   400 context() const noexcept
168   { 168   {
HITCBC 169   400 return *pool_; 169   400 return *pool_;
170   } 170   }
171   171  
172   /** Notify that work has started. 172   /** Notify that work has started.
173   173  
174   Increments the outstanding work count. Must be paired 174   Increments the outstanding work count. Must be paired
175   with a subsequent call to @ref on_work_finished. 175   with a subsequent call to @ref on_work_finished.
176   176  
177   @see on_work_finished, work_guard 177   @see on_work_finished, work_guard
178   */ 178   */
179   BOOST_CAPY_DECL 179   BOOST_CAPY_DECL
180   void 180   void
181   on_work_started() const noexcept; 181   on_work_started() const noexcept;
182   182  
183   /** Notify that work has finished. 183   /** Notify that work has finished.
184   184  
185   Decrements the outstanding work count. When the count 185   Decrements the outstanding work count. When the count
186   reaches zero after @ref thread_pool::join has been called, 186   reaches zero after @ref thread_pool::join has been called,
187   the pool's worker threads are signaled to stop. 187   the pool's worker threads are signaled to stop.
188   188  
189   @pre A preceding call to @ref on_work_started was made. 189   @pre A preceding call to @ref on_work_started was made.
190   190  
191   @see on_work_started, work_guard 191   @see on_work_started, work_guard
192   */ 192   */
193   BOOST_CAPY_DECL 193   BOOST_CAPY_DECL
194   void 194   void
195   on_work_finished() const noexcept; 195   on_work_finished() const noexcept;
196   196  
197   /** Dispatch a continuation for execution. 197   /** Dispatch a continuation for execution.
198   198  
199   Posts the continuation to the thread pool for execution on a 199   Posts the continuation to the thread pool for execution on a
200   worker thread and returns `std::noop_coroutine()`. Thread 200   worker thread and returns `std::noop_coroutine()`. Thread
201   pools never execute inline because no single thread "owns" 201   pools never execute inline because no single thread "owns"
202   the pool. 202   the pool.
203   203  
204   @param c The continuation to execute. Must remain at a 204   @param c The continuation to execute. Must remain at a
205   stable address until dequeued and resumed. 205   stable address until dequeued and resumed.
206   206  
207   @return `std::noop_coroutine()` always. 207   @return `std::noop_coroutine()` always.
208   */ 208   */
209   std::coroutine_handle<> 209   std::coroutine_handle<>
HITCBC 210   352 dispatch(continuation& c) const 210   351 dispatch(continuation& c) const
211   { 211   {
HITCBC 212   352 post(c); 212   351 post(c);
HITCBC 213   352 return std::noop_coroutine(); 213   351 return std::noop_coroutine();
214   } 214   }
215   215  
216   /** Post a continuation to the thread pool. 216   /** Post a continuation to the thread pool.
217   217  
218   The continuation will be resumed on one of the pool's 218   The continuation will be resumed on one of the pool's
219   worker threads. The continuation must remain at a stable 219   worker threads. The continuation must remain at a stable
220   address until it is dequeued and resumed. 220   address until it is dequeued and resumed.
221   221  
222   @param c The continuation to execute. 222   @param c The continuation to execute.
223   */ 223   */
224   BOOST_CAPY_DECL 224   BOOST_CAPY_DECL
225   void 225   void
226   post(continuation& c) const; 226   post(continuation& c) const;
227   227  
228   /// Return true if two executors refer to the same thread pool. 228   /// Return true if two executors refer to the same thread pool.
229   bool 229   bool
HITCBC 230   13 operator==(executor_type const& other) const noexcept 230   13 operator==(executor_type const& other) const noexcept
231   { 231   {
HITCBC 232   13 return pool_ == other.pool_; 232   13 return pool_ == other.pool_;
233   } 233   }
234   }; 234   };
235   235  
236   } // capy 236   } // capy
237   } // boost 237   } // boost
238   238  
239   #endif 239   #endif