100.00% Lines (66/66) 100.00% Functions (12/12)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) 2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3   // 3   //
4   // Distributed under the Boost Software License, Version 1.0. (See accompanying 4   // Distributed under the Boost Software License, Version 1.0. (See accompanying
5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6   // 6   //
7   // Official repository: https://github.com/cppalliance/capy 7   // Official repository: https://github.com/cppalliance/capy
8   // 8   //
9   9  
10   #ifndef BOOST_CAPY_ASYNC_EVENT_HPP 10   #ifndef BOOST_CAPY_ASYNC_EVENT_HPP
11   #define BOOST_CAPY_ASYNC_EVENT_HPP 11   #define BOOST_CAPY_ASYNC_EVENT_HPP
12   12  
13   #include <boost/capy/detail/config.hpp> 13   #include <boost/capy/detail/config.hpp>
14   #include <boost/capy/detail/intrusive.hpp> 14   #include <boost/capy/detail/intrusive.hpp>
15   #include <boost/capy/continuation.hpp> 15   #include <boost/capy/continuation.hpp>
16   #include <boost/capy/concept/executor.hpp> 16   #include <boost/capy/concept/executor.hpp>
17   #include <boost/capy/error.hpp> 17   #include <boost/capy/error.hpp>
18   #include <boost/capy/ex/io_env.hpp> 18   #include <boost/capy/ex/io_env.hpp>
19   #include <boost/capy/io_result.hpp> 19   #include <boost/capy/io_result.hpp>
20   20  
21   #include <stop_token> 21   #include <stop_token>
22   22  
23   #include <atomic> 23   #include <atomic>
24   #include <coroutine> 24   #include <coroutine>
25   #include <new> 25   #include <new>
26   #include <utility> 26   #include <utility>
27   27  
28   /* async_event implementation notes 28   /* async_event implementation notes
29   ================================= 29   =================================
30   30  
31   Same cancellation pattern as async_mutex (see that file for the 31   Same cancellation pattern as async_mutex (see that file for the
32   full discussion on claimed_, stop_cb lifetime, member ordering, 32   full discussion on claimed_, stop_cb lifetime, member ordering,
33   and threading assumptions). 33   and threading assumptions).
34   34  
35   Key difference: set() wakes ALL waiters (broadcast), not one. 35   Key difference: set() wakes ALL waiters (broadcast), not one.
36   It pops every waiter from the list and posts the ones it 36   It pops every waiter from the list and posts the ones it
37   claims. Waiters already claimed by a stop callback are skipped. 37   claims. Waiters already claimed by a stop callback are skipped.
38   38  
39   Because set() pops all waiters, a canceled waiter may have been 39   Because set() pops all waiters, a canceled waiter may have been
40   removed from the list by set() before its await_resume runs. 40   removed from the list by set() before its await_resume runs.
41   This requires a separate in_list_ flag (unlike async_mutex where 41   This requires a separate in_list_ flag (unlike async_mutex where
42   active_ served double duty). await_resume only calls remove() 42   active_ served double duty). await_resume only calls remove()
43   when in_list_ is true. 43   when in_list_ is true.
44   */ 44   */
45   45  
46   namespace boost { 46   namespace boost {
47   namespace capy { 47   namespace capy {
48   48  
49   /** An asynchronous event for coroutines. 49   /** An asynchronous event for coroutines.
50   50  
51   This event provides a way to notify multiple coroutines that some 51   This event provides a way to notify multiple coroutines that some
52   condition has occurred. When a coroutine awaits an unset event, it 52   condition has occurred. When a coroutine awaits an unset event, it
53   suspends and is added to a wait queue. When the event is set, all 53   suspends and is added to a wait queue. When the event is set, all
54   waiting coroutines are resumed. 54   waiting coroutines are resumed.
55   55  
56   @par Cancellation 56   @par Cancellation
57   57  
58   When a coroutine is suspended waiting for the event and its stop 58   When a coroutine is suspended waiting for the event and its stop
59   token is triggered, the waiter completes with `error::canceled` 59   token is triggered, the waiter completes with `error::canceled`
60   instead of waiting for `set()`. 60   instead of waiting for `set()`.
61   61  
62   Cancellation only applies while the coroutine is suspended in the 62   Cancellation only applies while the coroutine is suspended in the
63   wait queue. If the event is already set when `wait()` is called, 63   wait queue. If the event is already set when `wait()` is called,
64   the wait completes immediately even if the stop token is already 64   the wait completes immediately even if the stop token is already
65   signaled. 65   signaled.
66   66  
67   @par Zero Allocation 67   @par Zero Allocation
68   68  
69   No heap allocation occurs for wait operations. 69   No heap allocation occurs for wait operations.
70   70  
71   @par Thread Safety 71   @par Thread Safety
72   72  
73   Distinct objects: Safe.@n 73   Distinct objects: Safe.@n
74   Shared objects: Unsafe. 74   Shared objects: Unsafe.
75   75  
76   The event operations are designed for single-threaded use on one 76   The event operations are designed for single-threaded use on one
77   executor. The stop callback may fire from any thread. 77   executor. The stop callback may fire from any thread.
78   78  
79   This type is non-copyable and non-movable because suspended 79   This type is non-copyable and non-movable because suspended
80   waiters hold intrusive pointers into the event's internal list. 80   waiters hold intrusive pointers into the event's internal list.
81   81  
82   @par Example 82   @par Example
83   @code 83   @code
84   async_event event; 84   async_event event;
85   85  
86   task<> waiter() { 86   task<> waiter() {
87   auto [ec] = co_await event.wait(); 87   auto [ec] = co_await event.wait();
88   if(ec) 88   if(ec)
89   co_return; 89   co_return;
90   // ... event was set ... 90   // ... event was set ...
91   } 91   }
92   92  
93   task<> notifier() { 93   task<> notifier() {
94   // ... do some work ... 94   // ... do some work ...
95   event.set(); // Wake all waiters 95   event.set(); // Wake all waiters
96   } 96   }
97   @endcode 97   @endcode
98   */ 98   */
99   class async_event 99   class async_event
100   { 100   {
101   public: 101   public:
102   class wait_awaiter; 102   class wait_awaiter;
103   103  
104   private: 104   private:
105   bool set_ = false; 105   bool set_ = false;
106   detail::intrusive_list<wait_awaiter> waiters_; 106   detail::intrusive_list<wait_awaiter> waiters_;
107   107  
108   public: 108   public:
109   /** Awaiter returned by wait(). 109   /** Awaiter returned by wait().
110   */ 110   */
111   class wait_awaiter 111   class wait_awaiter
112   : public detail::intrusive_list<wait_awaiter>::node 112   : public detail::intrusive_list<wait_awaiter>::node
113   { 113   {
114   friend class async_event; 114   friend class async_event;
115   115  
116   async_event* e_; 116   async_event* e_;
117   continuation cont_; 117   continuation cont_;
118   executor_ref ex_; 118   executor_ref ex_;
119   119  
120   // Declared before stop_cb_buf_: the callback 120   // Declared before stop_cb_buf_: the callback
121   // accesses these members, so they must still be 121   // accesses these members, so they must still be
122   // alive if the stop_cb_ destructor blocks. 122   // alive if the stop_cb_ destructor blocks.
123   std::atomic<bool> claimed_{false}; 123   std::atomic<bool> claimed_{false};
124   bool canceled_ = false; 124   bool canceled_ = false;
125   bool active_ = false; 125   bool active_ = false;
126   bool in_list_ = false; 126   bool in_list_ = false;
127   127  
128   struct cancel_fn 128   struct cancel_fn
129   { 129   {
130   wait_awaiter* self_; 130   wait_awaiter* self_;
131   131  
HITCBC 132   4 void operator()() const noexcept 132   4 void operator()() const noexcept
133   { 133   {
HITCBC 134   4 if(!self_->claimed_.exchange( 134   4 if(!self_->claimed_.exchange(
135   true, std::memory_order_acq_rel)) 135   true, std::memory_order_acq_rel))
136   { 136   {
HITCBC 137   3 self_->canceled_ = true; 137   3 self_->canceled_ = true;
HITCBC 138   3 self_->ex_.post(self_->cont_); 138   3 self_->ex_.post(self_->cont_);
139   } 139   }
HITCBC 140   4 } 140   4 }
141   }; 141   };
142   142  
143   using stop_cb_t = 143   using stop_cb_t =
144   std::stop_callback<cancel_fn>; 144   std::stop_callback<cancel_fn>;
145   145  
146   // Aligned storage for stop_cb_t. Declared last: 146   // Aligned storage for stop_cb_t. Declared last:
147   // its destructor may block while the callback 147   // its destructor may block while the callback
148   // accesses the members above. 148   // accesses the members above.
149   BOOST_CAPY_MSVC_WARNING_PUSH 149   BOOST_CAPY_MSVC_WARNING_PUSH
150   BOOST_CAPY_MSVC_WARNING_DISABLE(4324) // padded due to alignas 150   BOOST_CAPY_MSVC_WARNING_DISABLE(4324) // padded due to alignas
151   alignas(stop_cb_t) 151   alignas(stop_cb_t)
152   unsigned char stop_cb_buf_[sizeof(stop_cb_t)]; 152   unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
153   BOOST_CAPY_MSVC_WARNING_POP 153   BOOST_CAPY_MSVC_WARNING_POP
154   154  
HITCBC 155   20 stop_cb_t& stop_cb_() noexcept 155   20 stop_cb_t& stop_cb_() noexcept
156   { 156   {
157   return *reinterpret_cast<stop_cb_t*>( 157   return *reinterpret_cast<stop_cb_t*>(
HITCBC 158   20 stop_cb_buf_); 158   20 stop_cb_buf_);
159   } 159   }
160   160  
161   public: 161   public:
HITCBC 162   52 ~wait_awaiter() 162   52 ~wait_awaiter()
163   { 163   {
HITCBC 164   52 if(active_) 164   52 if(active_)
HITCBC 165   1 stop_cb_().~stop_cb_t(); 165   1 stop_cb_().~stop_cb_t();
HITCBC 166   52 if(in_list_) 166   52 if(in_list_)
HITCBC 167   1 e_->waiters_.remove(this); 167   1 e_->waiters_.remove(this);
HITCBC 168   52 } 168   52 }
169   169  
HITCBC 170   25 explicit wait_awaiter(async_event* e) noexcept 170   25 explicit wait_awaiter(async_event* e) noexcept
HITCBC 171   25 : e_(e) 171   25 : e_(e)
172   { 172   {
HITCBC 173   25 } 173   25 }
174   174  
HITCBC 175   27 wait_awaiter(wait_awaiter&& o) noexcept 175   27 wait_awaiter(wait_awaiter&& o) noexcept
HITCBC 176   27 : e_(o.e_) 176   27 : e_(o.e_)
HITCBC 177   27 , cont_(o.cont_) 177   27 , cont_(o.cont_)
HITCBC 178   27 , ex_(o.ex_) 178   27 , ex_(o.ex_)
HITCBC 179   27 , claimed_(o.claimed_.load( 179   27 , claimed_(o.claimed_.load(
180   std::memory_order_relaxed)) 180   std::memory_order_relaxed))
HITCBC 181   27 , canceled_(o.canceled_) 181   27 , canceled_(o.canceled_)
HITCBC 182   27 , active_(std::exchange(o.active_, false)) 182   27 , active_(std::exchange(o.active_, false))
HITCBC 183   27 , in_list_(std::exchange(o.in_list_, false)) 183   27 , in_list_(std::exchange(o.in_list_, false))
184   { 184   {
HITCBC 185   27 } 185   27 }
186   186  
187   wait_awaiter(wait_awaiter const&) = delete; 187   wait_awaiter(wait_awaiter const&) = delete;
188   wait_awaiter& operator=(wait_awaiter const&) = delete; 188   wait_awaiter& operator=(wait_awaiter const&) = delete;
189   wait_awaiter& operator=(wait_awaiter&&) = delete; 189   wait_awaiter& operator=(wait_awaiter&&) = delete;
190   190  
HITCBC 191   25 bool await_ready() const noexcept 191   25 bool await_ready() const noexcept
192   { 192   {
HITCBC 193   25 return e_->set_; 193   25 return e_->set_;
194   } 194   }
195   195  
196   /** IoAwaitable protocol overload. */ 196   /** IoAwaitable protocol overload. */
197   std::coroutine_handle<> 197   std::coroutine_handle<>
HITCBC 198   21 await_suspend( 198   21 await_suspend(
199   std::coroutine_handle<> h, 199   std::coroutine_handle<> h,
200   io_env const* env) noexcept 200   io_env const* env) noexcept
201   { 201   {
HITCBC 202   21 if(env->stop_token.stop_requested()) 202   21 if(env->stop_token.stop_requested())
203   { 203   {
HITCBC 204   1 canceled_ = true; 204   1 canceled_ = true;
HITCBC 205   1 return h; 205   1 return h;
206   } 206   }
HITCBC 207   20 cont_.h = h; 207   20 cont_.h = h;
HITCBC 208   20 ex_ = env->executor; 208   20 ex_ = env->executor;
HITCBC 209   20 e_->waiters_.push_back(this); 209   20 e_->waiters_.push_back(this);
HITCBC 210   20 in_list_ = true; 210   20 in_list_ = true;
HITCBC 211   60 ::new(stop_cb_buf_) stop_cb_t( 211   60 ::new(stop_cb_buf_) stop_cb_t(
HITCBC 212   20 env->stop_token, cancel_fn{this}); 212   20 env->stop_token, cancel_fn{this});
HITCBC 213   20 active_ = true; 213   20 active_ = true;
HITCBC 214   20 return std::noop_coroutine(); 214   20 return std::noop_coroutine();
215   } 215   }
216   216  
HITCBC 217   22 io_result<> await_resume() noexcept 217   22 io_result<> await_resume() noexcept
218   { 218   {
HITCBC 219   22 if(active_) 219   22 if(active_)
220   { 220   {
HITCBC 221   19 stop_cb_().~stop_cb_t(); 221   19 stop_cb_().~stop_cb_t();
HITCBC 222   19 active_ = false; 222   19 active_ = false;
223   } 223   }
HITCBC 224   22 if(canceled_) 224   22 if(canceled_)
225   { 225   {
HITCBC 226   4 if(in_list_) 226   4 if(in_list_)
227   { 227   {
HITCBC 228   3 e_->waiters_.remove(this); 228   3 e_->waiters_.remove(this);
HITCBC 229   3 in_list_ = false; 229   3 in_list_ = false;
230   } 230   }
231   return {make_error_code( 231   return {make_error_code(
HITCBC 232   4 error::canceled)}; 232   4 error::canceled)};
233   } 233   }
HITCBC 234   18 return {{}}; 234   18 return {{}};
235   } 235   }
236   }; 236   };
237   237  
238   /// Construct an unset event. 238   /// Construct an unset event.
239   async_event() = default; 239   async_event() = default;
240   240  
241   /// Copy constructor (deleted). 241   /// Copy constructor (deleted).
242   async_event(async_event const&) = delete; 242   async_event(async_event const&) = delete;
243   243  
244   /// Copy assignment (deleted). 244   /// Copy assignment (deleted).
245   async_event& operator=(async_event const&) = delete; 245   async_event& operator=(async_event const&) = delete;
246   246  
247   /// Move constructor (deleted). 247   /// Move constructor (deleted).
248   async_event(async_event&&) = delete; 248   async_event(async_event&&) = delete;
249   249  
250   /// Move assignment (deleted). 250   /// Move assignment (deleted).
251   async_event& operator=(async_event&&) = delete; 251   async_event& operator=(async_event&&) = delete;
252   252  
253   /** Returns an awaiter that waits until the event is set. 253   /** Returns an awaiter that waits until the event is set.
254   254  
255   If the event is already set, completes immediately. 255   If the event is already set, completes immediately.
256   256  
257   @return An awaitable that await-returns `(error_code)`. 257   @return An awaitable that await-returns `(error_code)`.
258   */ 258   */
HITCBC 259   25 wait_awaiter wait() noexcept 259   25 wait_awaiter wait() noexcept
260   { 260   {
HITCBC 261   25 return wait_awaiter{this}; 261   25 return wait_awaiter{this};
262   } 262   }
263   263  
264   /** Sets the event. 264   /** Sets the event.
265   265  
266   All waiting coroutines are resumed. Canceled waiters 266   All waiting coroutines are resumed. Canceled waiters
267   are skipped. Subsequent calls to wait() complete 267   are skipped. Subsequent calls to wait() complete
268   immediately until clear() is called. 268   immediately until clear() is called.
269   */ 269   */
HITCBC 270   17 void set() 270   17 void set()
271   { 271   {
HITCBC 272   17 set_ = true; 272   17 set_ = true;
273   for(;;) 273   for(;;)
274   { 274   {
HITCBC 275   33 auto* w = waiters_.pop_front(); 275   33 auto* w = waiters_.pop_front();
HITCBC 276   33 if(!w) 276   33 if(!w)
HITCBC 277   17 break; 277   17 break;
HITCBC 278   16 w->in_list_ = false; 278   16 w->in_list_ = false;
HITCBC 279   16 if(!w->claimed_.exchange( 279   16 if(!w->claimed_.exchange(
280   true, std::memory_order_acq_rel)) 280   true, std::memory_order_acq_rel))
281   { 281   {
HITCBC 282   16 w->ex_.post(w->cont_); 282   16 w->ex_.post(w->cont_);
283   } 283   }
HITCBC 284   16 } 284   16 }
HITCBC 285   17 } 285   17 }
286   286  
287   /** Clears the event. 287   /** Clears the event.
288   288  
289   Subsequent calls to wait() will suspend until 289   Subsequent calls to wait() will suspend until
290   set() is called again. 290   set() is called again.
291   */ 291   */
HITCBC 292   2 void clear() noexcept 292   2 void clear() noexcept
293   { 293   {
HITCBC 294   2 set_ = false; 294   2 set_ = false;
HITCBC 295   2 } 295   2 }
296   296  
297   /** Returns true if the event is currently set. 297   /** Returns true if the event is currently set.
298   */ 298   */
HITCBC 299   9 bool is_set() const noexcept 299   9 bool is_set() const noexcept
300   { 300   {
HITCBC 301   9 return set_; 301   9 return set_;
302   } 302   }
303   }; 303   };
304   304  
305   } // namespace capy 305   } // namespace capy
306   } // namespace boost 306   } // namespace boost
307   307  
308   #endif 308   #endif