98.14% Lines (158/161)
97.73% Functions (43/44)
| TLA | Baseline | Branch | ||||||
|---|---|---|---|---|---|---|---|---|
| Line | Hits | Code | Line | Hits | Code | |||
| 1 | // | 1 | // | |||||
| 2 | // Copyright (c) 2026 Steve Gerbino | 2 | // Copyright (c) 2026 Steve Gerbino | |||||
| 3 | // | 3 | // | |||||
| 4 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | 4 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | |||||
| 5 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | 5 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |||||
| 6 | // | 6 | // | |||||
| 7 | // Official repository: https://github.com/cppalliance/capy | 7 | // Official repository: https://github.com/cppalliance/capy | |||||
| 8 | // | 8 | // | |||||
| 9 | 9 | |||||||
| 10 | #ifndef BOOST_CAPY_WHEN_ALL_HPP | 10 | #ifndef BOOST_CAPY_WHEN_ALL_HPP | |||||
| 11 | #define BOOST_CAPY_WHEN_ALL_HPP | 11 | #define BOOST_CAPY_WHEN_ALL_HPP | |||||
| 12 | 12 | |||||||
| 13 | #include <boost/capy/detail/config.hpp> | 13 | #include <boost/capy/detail/config.hpp> | |||||
| 14 | #include <boost/capy/detail/io_result_combinators.hpp> | 14 | #include <boost/capy/detail/io_result_combinators.hpp> | |||||
| 15 | #include <boost/capy/continuation.hpp> | 15 | #include <boost/capy/continuation.hpp> | |||||
| 16 | #include <boost/capy/concept/executor.hpp> | 16 | #include <boost/capy/concept/executor.hpp> | |||||
| 17 | #include <boost/capy/concept/io_awaitable.hpp> | 17 | #include <boost/capy/concept/io_awaitable.hpp> | |||||
| 18 | #include <coroutine> | 18 | #include <coroutine> | |||||
| 19 | #include <boost/capy/ex/frame_alloc_mixin.hpp> | 19 | #include <boost/capy/ex/frame_alloc_mixin.hpp> | |||||
| 20 | #include <boost/capy/ex/io_env.hpp> | 20 | #include <boost/capy/ex/io_env.hpp> | |||||
| 21 | #include <boost/capy/ex/frame_allocator.hpp> | 21 | #include <boost/capy/ex/frame_allocator.hpp> | |||||
| 22 | #include <boost/capy/task.hpp> | 22 | #include <boost/capy/task.hpp> | |||||
| 23 | 23 | |||||||
| 24 | #include <array> | 24 | #include <array> | |||||
| 25 | #include <atomic> | 25 | #include <atomic> | |||||
| 26 | #include <exception> | 26 | #include <exception> | |||||
| 27 | #include <memory> | 27 | #include <memory> | |||||
| 28 | #include <optional> | 28 | #include <optional> | |||||
| 29 | #include <ranges> | 29 | #include <ranges> | |||||
| 30 | #include <stdexcept> | 30 | #include <stdexcept> | |||||
| 31 | #include <stop_token> | 31 | #include <stop_token> | |||||
| 32 | #include <tuple> | 32 | #include <tuple> | |||||
| 33 | #include <type_traits> | 33 | #include <type_traits> | |||||
| 34 | #include <utility> | 34 | #include <utility> | |||||
| 35 | #include <vector> | 35 | #include <vector> | |||||
| 36 | 36 | |||||||
| 37 | namespace boost { | 37 | namespace boost { | |||||
| 38 | namespace capy { | 38 | namespace capy { | |||||
| 39 | 39 | |||||||
| 40 | namespace detail { | 40 | namespace detail { | |||||
| 41 | 41 | |||||||
| 42 | /** Holds the result of a single task within when_all. | 42 | /** Holds the result of a single task within when_all. | |||||
| 43 | */ | 43 | */ | |||||
| 44 | template<typename T> | 44 | template<typename T> | |||||
| 45 | struct result_holder | 45 | struct result_holder | |||||
| 46 | { | 46 | { | |||||
| 47 | std::optional<T> value_; | 47 | std::optional<T> value_; | |||||
| 48 | 48 | |||||||
| HITCBC | 49 | 81 | void set(T v) | 49 | 81 | void set(T v) | ||
| 50 | { | 50 | { | |||||
| HITCBC | 51 | 81 | value_ = std::move(v); | 51 | 81 | value_ = std::move(v); | ||
| HITCBC | 52 | 81 | } | 52 | 81 | } | ||
| 53 | 53 | |||||||
| HITCBC | 54 | 69 | T get() && | 54 | 69 | T get() && | ||
| 55 | { | 55 | { | |||||
| HITCBC | 56 | 69 | return std::move(*value_); | 56 | 69 | return std::move(*value_); | ||
| 57 | } | 57 | } | |||||
| 58 | }; | 58 | }; | |||||
| 59 | 59 | |||||||
| 60 | /** Core shared state for when_all operations. | 60 | /** Core shared state for when_all operations. | |||||
| 61 | 61 | |||||||
| 62 | Contains all members and methods common to both heterogeneous (variadic) | 62 | Contains all members and methods common to both heterogeneous (variadic) | |||||
| 63 | and homogeneous (range) when_all implementations. State classes embed | 63 | and homogeneous (range) when_all implementations. State classes embed | |||||
| 64 | this via composition to avoid CRTP destructor ordering issues. | 64 | this via composition to avoid CRTP destructor ordering issues. | |||||
| 65 | 65 | |||||||
| 66 | @par Thread Safety | 66 | @par Thread Safety | |||||
| 67 | Atomic operations protect exception capture and completion count. | 67 | Atomic operations protect exception capture and completion count. | |||||
| 68 | */ | 68 | */ | |||||
| 69 | struct when_all_core | 69 | struct when_all_core | |||||
| 70 | { | 70 | { | |||||
| 71 | std::atomic<std::size_t> remaining_count_; | 71 | std::atomic<std::size_t> remaining_count_; | |||||
| 72 | 72 | |||||||
| 73 | // Exception storage - first error wins, others discarded | 73 | // Exception storage - first error wins, others discarded | |||||
| 74 | std::atomic<bool> has_exception_{false}; | 74 | std::atomic<bool> has_exception_{false}; | |||||
| 75 | std::exception_ptr first_exception_; | 75 | std::exception_ptr first_exception_; | |||||
| 76 | 76 | |||||||
| 77 | std::stop_source stop_source_; | 77 | std::stop_source stop_source_; | |||||
| 78 | 78 | |||||||
| 79 | // Bridges parent's stop token to our stop_source | 79 | // Bridges parent's stop token to our stop_source | |||||
| 80 | struct stop_callback_fn | 80 | struct stop_callback_fn | |||||
| 81 | { | 81 | { | |||||
| 82 | std::stop_source* source_; | 82 | std::stop_source* source_; | |||||
| HITCBC | 83 | 1 | void operator()() const { source_->request_stop(); } | 83 | 1 | void operator()() const { source_->request_stop(); } | ||
| 84 | }; | 84 | }; | |||||
| 85 | using stop_callback_t = std::stop_callback<stop_callback_fn>; | 85 | using stop_callback_t = std::stop_callback<stop_callback_fn>; | |||||
| 86 | std::optional<stop_callback_t> parent_stop_callback_; | 86 | std::optional<stop_callback_t> parent_stop_callback_; | |||||
| 87 | 87 | |||||||
| 88 | continuation continuation_; | 88 | continuation continuation_; | |||||
| 89 | io_env const* caller_env_ = nullptr; | 89 | io_env const* caller_env_ = nullptr; | |||||
| 90 | 90 | |||||||
| HITCBC | 91 | 72 | explicit when_all_core(std::size_t count) noexcept | 91 | 72 | explicit when_all_core(std::size_t count) noexcept | ||
| HITCBC | 92 | 72 | : remaining_count_(count) | 92 | 72 | : remaining_count_(count) | ||
| 93 | { | 93 | { | |||||
| HITCBC | 94 | 72 | } | 94 | 72 | } | ||
| 95 | 95 | |||||||
| 96 | /** Capture an exception (first one wins). */ | 96 | /** Capture an exception (first one wins). */ | |||||
| HITCBC | 97 | 19 | void capture_exception(std::exception_ptr ep) | 97 | 19 | void capture_exception(std::exception_ptr ep) | ||
| 98 | { | 98 | { | |||||
| HITCBC | 99 | 19 | bool expected = false; | 99 | 19 | bool expected = false; | ||
| HITCBC | 100 | 19 | if(has_exception_.compare_exchange_strong( | 100 | 19 | if(has_exception_.compare_exchange_strong( | ||
| 101 | expected, true, std::memory_order_relaxed)) | 101 | expected, true, std::memory_order_relaxed)) | |||||
| HITCBC | 102 | 17 | first_exception_ = ep; | 102 | 17 | first_exception_ = ep; | ||
| HITCBC | 103 | 19 | } | 103 | 19 | } | ||
| 104 | }; | 104 | }; | |||||
| 105 | 105 | |||||||
| 106 | /** Shared state for heterogeneous when_all (variadic overload). | 106 | /** Shared state for heterogeneous when_all (variadic overload). | |||||
| 107 | 107 | |||||||
| 108 | @tparam Ts The result types of the tasks. | 108 | @tparam Ts The result types of the tasks. | |||||
| 109 | */ | 109 | */ | |||||
| 110 | template<typename... Ts> | 110 | template<typename... Ts> | |||||
| 111 | struct when_all_state | 111 | struct when_all_state | |||||
| 112 | { | 112 | { | |||||
| 113 | static constexpr std::size_t task_count = sizeof...(Ts); | 113 | static constexpr std::size_t task_count = sizeof...(Ts); | |||||
| 114 | 114 | |||||||
| 115 | when_all_core core_; | 115 | when_all_core core_; | |||||
| 116 | std::tuple<result_holder<Ts>...> results_; | 116 | std::tuple<result_holder<Ts>...> results_; | |||||
| 117 | std::array<continuation, task_count> runner_handles_{}; | 117 | std::array<continuation, task_count> runner_handles_{}; | |||||
| 118 | 118 | |||||||
| 119 | std::atomic<bool> has_error_{false}; | 119 | std::atomic<bool> has_error_{false}; | |||||
| 120 | std::error_code first_error_; | 120 | std::error_code first_error_; | |||||
| 121 | 121 | |||||||
| HITCBC | 122 | 50 | when_all_state() | 122 | 50 | when_all_state() | ||
| HITCBC | 123 | 50 | : core_(task_count) | 123 | 50 | : core_(task_count) | ||
| 124 | { | 124 | { | |||||
| HITCBC | 125 | 50 | } | 125 | 50 | } | ||
| 126 | 126 | |||||||
| 127 | /** Record the first error (subsequent errors are discarded). */ | 127 | /** Record the first error (subsequent errors are discarded). */ | |||||
| HITCBC | 128 | 43 | void record_error(std::error_code ec) | 128 | 43 | void record_error(std::error_code ec) | ||
| 129 | { | 129 | { | |||||
| HITCBC | 130 | 43 | bool expected = false; | 130 | 43 | bool expected = false; | ||
| HITCBC | 131 | 43 | if(has_error_.compare_exchange_strong( | 131 | 43 | if(has_error_.compare_exchange_strong( | ||
| 132 | expected, true, std::memory_order_relaxed)) | 132 | expected, true, std::memory_order_relaxed)) | |||||
| HITCBC | 133 | 29 | first_error_ = ec; | 133 | 29 | first_error_ = ec; | ||
| HITCBC | 134 | 43 | } | 134 | 43 | } | ||
| 135 | }; | 135 | }; | |||||
| 136 | 136 | |||||||
| 137 | /** Shared state for homogeneous when_all (range overload). | 137 | /** Shared state for homogeneous when_all (range overload). | |||||
| 138 | 138 | |||||||
| 139 | Stores extracted io_result payloads in a vector indexed by task | 139 | Stores extracted io_result payloads in a vector indexed by task | |||||
| 140 | position. Tracks the first error_code for error propagation. | 140 | position. Tracks the first error_code for error propagation. | |||||
| 141 | 141 | |||||||
| 142 | @tparam T The payload type extracted from io_result. | 142 | @tparam T The payload type extracted from io_result. | |||||
| 143 | */ | 143 | */ | |||||
| 144 | template<typename T> | 144 | template<typename T> | |||||
| 145 | struct when_all_homogeneous_state | 145 | struct when_all_homogeneous_state | |||||
| 146 | { | 146 | { | |||||
| 147 | when_all_core core_; | 147 | when_all_core core_; | |||||
| 148 | std::vector<std::optional<T>> results_; | 148 | std::vector<std::optional<T>> results_; | |||||
| 149 | std::unique_ptr<continuation[]> runner_handles_; | 149 | std::unique_ptr<continuation[]> runner_handles_; | |||||
| 150 | 150 | |||||||
| 151 | std::atomic<bool> has_error_{false}; | 151 | std::atomic<bool> has_error_{false}; | |||||
| 152 | std::error_code first_error_; | 152 | std::error_code first_error_; | |||||
| 153 | 153 | |||||||
| HITCBC | 154 | 11 | explicit when_all_homogeneous_state(std::size_t count) | 154 | 11 | explicit when_all_homogeneous_state(std::size_t count) | ||
| HITCBC | 155 | 11 | : core_(count) | 155 | 11 | : core_(count) | ||
| HITCBC | 156 | 22 | , results_(count) | 156 | 22 | , results_(count) | ||
| HITCBC | 157 | 11 | , runner_handles_(std::make_unique<continuation[]>(count)) | 157 | 11 | , runner_handles_(std::make_unique<continuation[]>(count)) | ||
| 158 | { | 158 | { | |||||
| HITCBC | 159 | 11 | } | 159 | 11 | } | ||
| 160 | 160 | |||||||
| HITCBC | 161 | 16 | void set_result(std::size_t index, T value) | 161 | 16 | void set_result(std::size_t index, T value) | ||
| 162 | { | 162 | { | |||||
| HITCBC | 163 | 16 | results_[index].emplace(std::move(value)); | 163 | 16 | results_[index].emplace(std::move(value)); | ||
| HITCBC | 164 | 16 | } | 164 | 16 | } | ||
| 165 | 165 | |||||||
| 166 | /** Record the first error (subsequent errors are discarded). */ | 166 | /** Record the first error (subsequent errors are discarded). */ | |||||
| HITCBC | 167 | 7 | void record_error(std::error_code ec) | 167 | 7 | void record_error(std::error_code ec) | ||
| 168 | { | 168 | { | |||||
| HITCBC | 169 | 7 | bool expected = false; | 169 | 7 | bool expected = false; | ||
| HITCBC | 170 | 7 | if(has_error_.compare_exchange_strong( | 170 | 7 | if(has_error_.compare_exchange_strong( | ||
| 171 | expected, true, std::memory_order_relaxed)) | 171 | expected, true, std::memory_order_relaxed)) | |||||
| HITCBC | 172 | 5 | first_error_ = ec; | 172 | 5 | first_error_ = ec; | ||
| HITCBC | 173 | 7 | } | 173 | 7 | } | ||
| 174 | }; | 174 | }; | |||||
| 175 | 175 | |||||||
| 176 | /** Specialization for void io_result children (no payload storage). */ | 176 | /** Specialization for void io_result children (no payload storage). */ | |||||
| 177 | template<> | 177 | template<> | |||||
| 178 | struct when_all_homogeneous_state<std::tuple<>> | 178 | struct when_all_homogeneous_state<std::tuple<>> | |||||
| 179 | { | 179 | { | |||||
| 180 | when_all_core core_; | 180 | when_all_core core_; | |||||
| 181 | std::unique_ptr<continuation[]> runner_handles_; | 181 | std::unique_ptr<continuation[]> runner_handles_; | |||||
| 182 | 182 | |||||||
| 183 | std::atomic<bool> has_error_{false}; | 183 | std::atomic<bool> has_error_{false}; | |||||
| 184 | std::error_code first_error_; | 184 | std::error_code first_error_; | |||||
| 185 | 185 | |||||||
| HITCBC | 186 | 3 | explicit when_all_homogeneous_state(std::size_t count) | 186 | 3 | explicit when_all_homogeneous_state(std::size_t count) | ||
| HITCBC | 187 | 3 | : core_(count) | 187 | 3 | : core_(count) | ||
| HITCBC | 188 | 3 | , runner_handles_(std::make_unique<continuation[]>(count)) | 188 | 3 | , runner_handles_(std::make_unique<continuation[]>(count)) | ||
| 189 | { | 189 | { | |||||
| HITCBC | 190 | 3 | } | 190 | 3 | } | ||
| 191 | 191 | |||||||
| 192 | /** Record the first error (subsequent errors are discarded). */ | 192 | /** Record the first error (subsequent errors are discarded). */ | |||||
| HITCBC | 193 | 1 | void record_error(std::error_code ec) | 193 | 1 | void record_error(std::error_code ec) | ||
| 194 | { | 194 | { | |||||
| HITCBC | 195 | 1 | bool expected = false; | 195 | 1 | bool expected = false; | ||
| HITCBC | 196 | 1 | if(has_error_.compare_exchange_strong( | 196 | 1 | if(has_error_.compare_exchange_strong( | ||
| 197 | expected, true, std::memory_order_relaxed)) | 197 | expected, true, std::memory_order_relaxed)) | |||||
| HITCBC | 198 | 1 | first_error_ = ec; | 198 | 1 | first_error_ = ec; | ||
| HITCBC | 199 | 1 | } | 199 | 1 | } | ||
| 200 | }; | 200 | }; | |||||
| 201 | 201 | |||||||
| 202 | /** Wrapper coroutine that intercepts task completion for when_all. | 202 | /** Wrapper coroutine that intercepts task completion for when_all. | |||||
| 203 | 203 | |||||||
| 204 | Parameterized on StateType to work with both heterogeneous (variadic) | 204 | Parameterized on StateType to work with both heterogeneous (variadic) | |||||
| 205 | and homogeneous (range) state types. All state types expose their | 205 | and homogeneous (range) state types. All state types expose their | |||||
| 206 | shared members through a `core_` member of type when_all_core. | 206 | shared members through a `core_` member of type when_all_core. | |||||
| 207 | 207 | |||||||
| 208 | @tparam StateType The state type (when_all_state or when_all_homogeneous_state). | 208 | @tparam StateType The state type (when_all_state or when_all_homogeneous_state). | |||||
| 209 | */ | 209 | */ | |||||
| 210 | template<typename StateType> | 210 | template<typename StateType> | |||||
| 211 | struct BOOST_CAPY_CORO_DESTROY_WHEN_COMPLETE when_all_runner | 211 | struct BOOST_CAPY_CORO_DESTROY_WHEN_COMPLETE when_all_runner | |||||
| 212 | { | 212 | { | |||||
| 213 | struct promise_type | 213 | struct promise_type | |||||
| 214 | : frame_alloc_mixin | 214 | : frame_alloc_mixin | |||||
| 215 | { | 215 | { | |||||
| 216 | StateType* state_ = nullptr; | 216 | StateType* state_ = nullptr; | |||||
| 217 | std::size_t index_ = 0; | 217 | std::size_t index_ = 0; | |||||
| 218 | io_env env_; | 218 | io_env env_; | |||||
| 219 | 219 | |||||||
| HITCBC | 220 | 145 | when_all_runner get_return_object() noexcept | 220 | 145 | when_all_runner get_return_object() noexcept | ||
| 221 | { | 221 | { | |||||
| 222 | return when_all_runner( | 222 | return when_all_runner( | |||||
| HITCBC | 223 | 145 | std::coroutine_handle<promise_type>::from_promise(*this)); | 223 | 145 | std::coroutine_handle<promise_type>::from_promise(*this)); | ||
| 224 | } | 224 | } | |||||
| 225 | 225 | |||||||
| HITCBC | 226 | 145 | std::suspend_always initial_suspend() noexcept | 226 | 145 | std::suspend_always initial_suspend() noexcept | ||
| 227 | { | 227 | { | |||||
| HITCBC | 228 | 145 | return {}; | 228 | 145 | return {}; | ||
| 229 | } | 229 | } | |||||
| 230 | 230 | |||||||
| HITCBC | 231 | 145 | auto final_suspend() noexcept | 231 | 145 | auto final_suspend() noexcept | ||
| 232 | { | 232 | { | |||||
| 233 | struct awaiter | 233 | struct awaiter | |||||
| 234 | { | 234 | { | |||||
| 235 | promise_type* p_; | 235 | promise_type* p_; | |||||
| HITCBC | 236 | 145 | bool await_ready() const noexcept { return false; } | 236 | 145 | bool await_ready() const noexcept { return false; } | ||
| HITCBC | 237 | 145 | auto await_suspend(std::coroutine_handle<> h) noexcept | 237 | 145 | auto await_suspend(std::coroutine_handle<> h) noexcept | ||
| 238 | { | 238 | { | |||||
| HITCBC | 239 | 145 | auto& core = p_->state_->core_; | 239 | 145 | auto& core = p_->state_->core_; | ||
| HITCBC | 240 | 145 | auto* counter = &core.remaining_count_; | 240 | 145 | auto* counter = &core.remaining_count_; | ||
| HITCBC | 241 | 145 | auto* caller_env = core.caller_env_; | 241 | 145 | auto* caller_env = core.caller_env_; | ||
| HITCBC | 242 | 145 | auto& cont = core.continuation_; | 242 | 145 | auto& cont = core.continuation_; | ||
| 243 | 243 | |||||||
| HITCBC | 244 | 145 | h.destroy(); | 244 | 145 | h.destroy(); | ||
| 245 | 245 | |||||||
| HITCBC | 246 | 145 | auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel); | 246 | 145 | auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel); | ||
| HITCBC | 247 | 145 | if(remaining == 1) | 247 | 145 | if(remaining == 1) | ||
| HITCBC | 248 | 72 | return detail::symmetric_transfer(caller_env->executor.dispatch(cont)); | 248 | 72 | return detail::symmetric_transfer(caller_env->executor.dispatch(cont)); | ||
| HITCBC | 249 | 73 | return detail::symmetric_transfer(std::noop_coroutine()); | 249 | 73 | return detail::symmetric_transfer(std::noop_coroutine()); | ||
| 250 | } | 250 | } | |||||
| MISUBC | 251 | ✗ | void await_resume() const noexcept {} | 251 | ✗ | void await_resume() const noexcept {} | ||
| 252 | }; | 252 | }; | |||||
| HITCBC | 253 | 145 | return awaiter{this}; | 253 | 145 | return awaiter{this}; | ||
| 254 | } | 254 | } | |||||
| 255 | 255 | |||||||
| HITCBC | 256 | 126 | void return_void() noexcept {} | 256 | 126 | void return_void() noexcept {} | ||
| 257 | 257 | |||||||
| HITCBC | 258 | 19 | void unhandled_exception() noexcept | 258 | 19 | void unhandled_exception() noexcept | ||
| 259 | { | 259 | { | |||||
| HITCBC | 260 | 19 | state_->core_.capture_exception(std::current_exception()); | 260 | 19 | state_->core_.capture_exception(std::current_exception()); | ||
| HITCBC | 261 | 19 | state_->core_.stop_source_.request_stop(); | 261 | 19 | state_->core_.stop_source_.request_stop(); | ||
| HITCBC | 262 | 19 | } | 262 | 19 | } | ||
| 263 | 263 | |||||||
| 264 | template<class Awaitable> | 264 | template<class Awaitable> | |||||
| 265 | struct transform_awaiter | 265 | struct transform_awaiter | |||||
| 266 | { | 266 | { | |||||
| 267 | std::decay_t<Awaitable> a_; | 267 | std::decay_t<Awaitable> a_; | |||||
| 268 | promise_type* p_; | 268 | promise_type* p_; | |||||
| 269 | 269 | |||||||
| HITCBC | 270 | 145 | bool await_ready() { return a_.await_ready(); } | 270 | 145 | bool await_ready() { return a_.await_ready(); } | ||
| HITCBC | 271 | 145 | decltype(auto) await_resume() { return a_.await_resume(); } | 271 | 145 | decltype(auto) await_resume() { return a_.await_resume(); } | ||
| 272 | 272 | |||||||
| 273 | template<class Promise> | 273 | template<class Promise> | |||||
| HITCBC | 274 | 144 | auto await_suspend(std::coroutine_handle<Promise> h) | 274 | 144 | auto await_suspend(std::coroutine_handle<Promise> h) | ||
| 275 | { | 275 | { | |||||
| 276 | using R = decltype(a_.await_suspend(h, &p_->env_)); | 276 | using R = decltype(a_.await_suspend(h, &p_->env_)); | |||||
| 277 | if constexpr (std::is_same_v<R, std::coroutine_handle<>>) | 277 | if constexpr (std::is_same_v<R, std::coroutine_handle<>>) | |||||
| HITCBC | 278 | 144 | return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_)); | 278 | 144 | return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_)); | ||
| 279 | else | 279 | else | |||||
| 280 | return a_.await_suspend(h, &p_->env_); | 280 | return a_.await_suspend(h, &p_->env_); | |||||
| 281 | } | 281 | } | |||||
| 282 | }; | 282 | }; | |||||
| 283 | 283 | |||||||
| 284 | template<class Awaitable> | 284 | template<class Awaitable> | |||||
| HITCBC | 285 | 145 | auto await_transform(Awaitable&& a) | 285 | 145 | auto await_transform(Awaitable&& a) | ||
| 286 | { | 286 | { | |||||
| 287 | using A = std::decay_t<Awaitable>; | 287 | using A = std::decay_t<Awaitable>; | |||||
| 288 | if constexpr (IoAwaitable<A>) | 288 | if constexpr (IoAwaitable<A>) | |||||
| 289 | { | 289 | { | |||||
| 290 | return transform_awaiter<Awaitable>{ | 290 | return transform_awaiter<Awaitable>{ | |||||
| HITCBC | 291 | 290 | std::forward<Awaitable>(a), this}; | 291 | 290 | std::forward<Awaitable>(a), this}; | ||
| 292 | } | 292 | } | |||||
| 293 | else | 293 | else | |||||
| 294 | { | 294 | { | |||||
| 295 | static_assert(sizeof(A) == 0, "requires IoAwaitable"); | 295 | static_assert(sizeof(A) == 0, "requires IoAwaitable"); | |||||
| 296 | } | 296 | } | |||||
| HITCBC | 297 | 145 | } | 297 | 145 | } | ||
| 298 | }; | 298 | }; | |||||
| 299 | 299 | |||||||
| 300 | std::coroutine_handle<promise_type> h_; | 300 | std::coroutine_handle<promise_type> h_; | |||||
| 301 | 301 | |||||||
| HITCBC | 302 | 145 | explicit when_all_runner(std::coroutine_handle<promise_type> h) noexcept | 302 | 145 | explicit when_all_runner(std::coroutine_handle<promise_type> h) noexcept | ||
| HITCBC | 303 | 145 | : h_(h) | 303 | 145 | : h_(h) | ||
| 304 | { | 304 | { | |||||
| HITCBC | 305 | 145 | } | 305 | 145 | } | ||
| 306 | 306 | |||||||
| 307 | // Enable move for all clang versions - some versions need it | 307 | // Enable move for all clang versions - some versions need it | |||||
| 308 | when_all_runner(when_all_runner&& other) noexcept | 308 | when_all_runner(when_all_runner&& other) noexcept | |||||
| 309 | : h_(std::exchange(other.h_, nullptr)) | 309 | : h_(std::exchange(other.h_, nullptr)) | |||||
| 310 | { | 310 | { | |||||
| 311 | } | 311 | } | |||||
| 312 | 312 | |||||||
| 313 | when_all_runner(when_all_runner const&) = delete; | 313 | when_all_runner(when_all_runner const&) = delete; | |||||
| 314 | when_all_runner& operator=(when_all_runner const&) = delete; | 314 | when_all_runner& operator=(when_all_runner const&) = delete; | |||||
| 315 | when_all_runner& operator=(when_all_runner&&) = delete; | 315 | when_all_runner& operator=(when_all_runner&&) = delete; | |||||
| 316 | 316 | |||||||
| HITCBC | 317 | 145 | auto release() noexcept | 317 | 145 | auto release() noexcept | ||
| 318 | { | 318 | { | |||||
| HITCBC | 319 | 145 | return std::exchange(h_, nullptr); | 319 | 145 | return std::exchange(h_, nullptr); | ||
| 320 | } | 320 | } | |||||
| 321 | }; | 321 | }; | |||||
| 322 | 322 | |||||||
| 323 | /** Create an io_result-aware runner for a single awaitable (range path). | 323 | /** Create an io_result-aware runner for a single awaitable (range path). | |||||
| 324 | 324 | |||||||
| 325 | Checks the error code, records errors and requests stop on failure, | 325 | Checks the error code, records errors and requests stop on failure, | |||||
| 326 | or extracts the payload on success. | 326 | or extracts the payload on success. | |||||
| 327 | */ | 327 | */ | |||||
| 328 | template<IoAwaitable Awaitable, typename StateType> | 328 | template<IoAwaitable Awaitable, typename StateType> | |||||
| 329 | when_all_runner<StateType> | 329 | when_all_runner<StateType> | |||||
| HITCBC | 330 | 32 | make_when_all_homogeneous_runner(Awaitable inner, StateType* state, std::size_t index) | 330 | 32 | make_when_all_homogeneous_runner(Awaitable inner, StateType* state, std::size_t index) | ||
| 331 | { | 331 | { | |||||
| 332 | auto result = co_await std::move(inner); | 332 | auto result = co_await std::move(inner); | |||||
| 333 | 333 | |||||||
| 334 | if(result.ec) | 334 | if(result.ec) | |||||
| 335 | { | 335 | { | |||||
| 336 | state->record_error(result.ec); | 336 | state->record_error(result.ec); | |||||
| 337 | state->core_.stop_source_.request_stop(); | 337 | state->core_.stop_source_.request_stop(); | |||||
| 338 | } | 338 | } | |||||
| 339 | else | 339 | else | |||||
| 340 | { | 340 | { | |||||
| 341 | using PayloadT = io_result_payload_t< | 341 | using PayloadT = io_result_payload_t< | |||||
| 342 | awaitable_result_t<Awaitable>>; | 342 | awaitable_result_t<Awaitable>>; | |||||
| 343 | if constexpr (!std::is_same_v<PayloadT, std::tuple<>>) | 343 | if constexpr (!std::is_same_v<PayloadT, std::tuple<>>) | |||||
| 344 | { | 344 | { | |||||
| 345 | state->set_result(index, | 345 | state->set_result(index, | |||||
| 346 | extract_io_payload(std::move(result))); | 346 | extract_io_payload(std::move(result))); | |||||
| 347 | } | 347 | } | |||||
| 348 | } | 348 | } | |||||
| HITCBC | 349 | 64 | } | 349 | 64 | } | ||
| 350 | 350 | |||||||
| 351 | /** Create a runner for io_result children that requests stop on ec. */ | 351 | /** Create a runner for io_result children that requests stop on ec. */ | |||||
| 352 | template<std::size_t Index, IoAwaitable Awaitable, typename... Ts> | 352 | template<std::size_t Index, IoAwaitable Awaitable, typename... Ts> | |||||
| 353 | when_all_runner<when_all_state<Ts...>> | 353 | when_all_runner<when_all_state<Ts...>> | |||||
| HITCBC | 354 | 97 | make_when_all_io_runner(Awaitable inner, when_all_state<Ts...>* state) | 354 | 97 | make_when_all_io_runner(Awaitable inner, when_all_state<Ts...>* state) | ||
| 355 | { | 355 | { | |||||
| 356 | auto result = co_await std::move(inner); | 356 | auto result = co_await std::move(inner); | |||||
| 357 | auto ec = result.ec; | 357 | auto ec = result.ec; | |||||
| 358 | std::get<Index>(state->results_).set(std::move(result)); | 358 | std::get<Index>(state->results_).set(std::move(result)); | |||||
| 359 | 359 | |||||||
| 360 | if(ec) | 360 | if(ec) | |||||
| 361 | { | 361 | { | |||||
| 362 | state->record_error(ec); | 362 | state->record_error(ec); | |||||
| 363 | state->core_.stop_source_.request_stop(); | 363 | state->core_.stop_source_.request_stop(); | |||||
| 364 | } | 364 | } | |||||
| HITCBC | 365 | 194 | } | 365 | 194 | } | ||
| 366 | 366 | |||||||
| 367 | /** Launcher that uses io_result-aware runners. */ | 367 | /** Launcher that uses io_result-aware runners. */ | |||||
| 368 | template<IoAwaitable... Awaitables> | 368 | template<IoAwaitable... Awaitables> | |||||
| 369 | class when_all_io_launcher | 369 | class when_all_io_launcher | |||||
| 370 | { | 370 | { | |||||
| 371 | using state_type = when_all_state<awaitable_result_t<Awaitables>...>; | 371 | using state_type = when_all_state<awaitable_result_t<Awaitables>...>; | |||||
| 372 | 372 | |||||||
| 373 | std::tuple<Awaitables...>* awaitables_; | 373 | std::tuple<Awaitables...>* awaitables_; | |||||
| 374 | state_type* state_; | 374 | state_type* state_; | |||||
| 375 | 375 | |||||||
| 376 | public: | 376 | public: | |||||
| HITCBC | 377 | 50 | when_all_io_launcher( | 377 | 50 | when_all_io_launcher( | ||
| 378 | std::tuple<Awaitables...>* awaitables, | 378 | std::tuple<Awaitables...>* awaitables, | |||||
| 379 | state_type* state) | 379 | state_type* state) | |||||
| HITCBC | 380 | 50 | : awaitables_(awaitables) | 380 | 50 | : awaitables_(awaitables) | ||
| HITCBC | 381 | 50 | , state_(state) | 381 | 50 | , state_(state) | ||
| 382 | { | 382 | { | |||||
| HITCBC | 383 | 50 | } | 383 | 50 | } | ||
| 384 | 384 | |||||||
| HITCBC | 385 | 50 | bool await_ready() const noexcept | 385 | 50 | bool await_ready() const noexcept | ||
| 386 | { | 386 | { | |||||
| HITCBC | 387 | 50 | return sizeof...(Awaitables) == 0; | 387 | 50 | return sizeof...(Awaitables) == 0; | ||
| 388 | } | 388 | } | |||||
| 389 | 389 | |||||||
| HITCBC | 390 | 50 | std::coroutine_handle<> await_suspend( | 390 | 50 | std::coroutine_handle<> await_suspend( | ||
| 391 | std::coroutine_handle<> continuation, io_env const* caller_env) | 391 | std::coroutine_handle<> continuation, io_env const* caller_env) | |||||
| 392 | { | 392 | { | |||||
| HITCBC | 393 | 50 | state_->core_.continuation_.h = continuation; | 393 | 50 | state_->core_.continuation_.h = continuation; | ||
| HITCBC | 394 | 50 | state_->core_.caller_env_ = caller_env; | 394 | 50 | state_->core_.caller_env_ = caller_env; | ||
| 395 | 395 | |||||||
| HITCBC | 396 | 50 | if(caller_env->stop_token.stop_possible()) | 396 | 50 | if(caller_env->stop_token.stop_possible()) | ||
| 397 | { | 397 | { | |||||
| HITCBC | 398 | 2 | state_->core_.parent_stop_callback_.emplace( | 398 | 2 | state_->core_.parent_stop_callback_.emplace( | ||
| HITCBC | 399 | 1 | caller_env->stop_token, | 399 | 1 | caller_env->stop_token, | ||
| HITCBC | 400 | 1 | when_all_core::stop_callback_fn{&state_->core_.stop_source_}); | 400 | 1 | when_all_core::stop_callback_fn{&state_->core_.stop_source_}); | ||
| 401 | 401 | |||||||
| HITCBC | 402 | 1 | if(caller_env->stop_token.stop_requested()) | 402 | 1 | if(caller_env->stop_token.stop_requested()) | ||
| MISUBC | 403 | ✗ | state_->core_.stop_source_.request_stop(); | 403 | ✗ | state_->core_.stop_source_.request_stop(); | ||
| 404 | } | 404 | } | |||||
| 405 | 405 | |||||||
| HITCBC | 406 | 50 | auto token = state_->core_.stop_source_.get_token(); | 406 | 50 | auto token = state_->core_.stop_source_.get_token(); | ||
| HITCBC | 407 | 46 | [&]<std::size_t... Is>(std::index_sequence<Is...>) { | 407 | 46 | [&]<std::size_t... Is>(std::index_sequence<Is...>) { | ||
| HITCBC | 408 | 50 | (..., launch_one<Is>(caller_env->executor, token)); | 408 | 50 | (..., launch_one<Is>(caller_env->executor, token)); | ||
| HITCBC | 409 | 50 | }(std::index_sequence_for<Awaitables...>{}); | 409 | 50 | }(std::index_sequence_for<Awaitables...>{}); | ||
| 410 | 410 | |||||||
| HITCBC | 411 | 100 | return std::noop_coroutine(); | 411 | 100 | return std::noop_coroutine(); | ||
| HITCBC | 412 | 50 | } | 412 | 50 | } | ||
| 413 | 413 | |||||||
| HITCBC | 414 | 50 | void await_resume() const noexcept {} | 414 | 50 | void await_resume() const noexcept {} | ||
| 415 | 415 | |||||||
| 416 | private: | 416 | private: | |||||
| 417 | template<std::size_t I> | 417 | template<std::size_t I> | |||||
| HITCBC | 418 | 97 | void launch_one(executor_ref caller_ex, std::stop_token token) | 418 | 97 | void launch_one(executor_ref caller_ex, std::stop_token token) | ||
| 419 | { | 419 | { | |||||
| HITCBC | 420 | 97 | auto runner = make_when_all_io_runner<I>( | 420 | 97 | auto runner = make_when_all_io_runner<I>( | ||
| HITCBC | 421 | 97 | std::move(std::get<I>(*awaitables_)), state_); | 421 | 97 | std::move(std::get<I>(*awaitables_)), state_); | ||
| 422 | 422 | |||||||
| HITCBC | 423 | 97 | auto h = runner.release(); | 423 | 97 | auto h = runner.release(); | ||
| HITCBC | 424 | 97 | h.promise().state_ = state_; | 424 | 97 | h.promise().state_ = state_; | ||
| HITCBC | 425 | 97 | h.promise().env_ = io_env{caller_ex, token, | 425 | 97 | h.promise().env_ = io_env{caller_ex, token, | ||
| HITCBC | 426 | 97 | state_->core_.caller_env_->frame_allocator}; | 426 | 97 | state_->core_.caller_env_->frame_allocator}; | ||
| 427 | 427 | |||||||
| HITCBC | 428 | 97 | state_->runner_handles_[I].h = std::coroutine_handle<>{h}; | 428 | 97 | state_->runner_handles_[I].h = std::coroutine_handle<>{h}; | ||
| HITCBC | 429 | 97 | state_->core_.caller_env_->executor.post(state_->runner_handles_[I]); | 429 | 97 | state_->core_.caller_env_->executor.post(state_->runner_handles_[I]); | ||
| HITCBC | 430 | 194 | } | 430 | 194 | } | ||
| 431 | }; | 431 | }; | |||||
| 432 | 432 | |||||||
| 433 | /** Helper to extract a single result from state. | 433 | /** Helper to extract a single result from state. | |||||
| 434 | This is a separate function to work around a GCC-11 ICE that occurs | 434 | This is a separate function to work around a GCC-11 ICE that occurs | |||||
| 435 | when using nested immediately-invoked lambdas with pack expansion. | 435 | when using nested immediately-invoked lambdas with pack expansion. | |||||
| 436 | */ | 436 | */ | |||||
| 437 | template<std::size_t I, typename... Ts> | 437 | template<std::size_t I, typename... Ts> | |||||
| HITCBC | 438 | 69 | auto extract_single_result(when_all_state<Ts...>& state) | 438 | 69 | auto extract_single_result(when_all_state<Ts...>& state) | ||
| 439 | { | 439 | { | |||||
| HITCBC | 440 | 69 | return std::move(std::get<I>(state.results_)).get(); | 440 | 69 | return std::move(std::get<I>(state.results_)).get(); | ||
| 441 | } | 441 | } | |||||
| 442 | 442 | |||||||
| 443 | /** Extract all results from state as a tuple. | 443 | /** Extract all results from state as a tuple. | |||||
| 444 | */ | 444 | */ | |||||
| 445 | template<typename... Ts> | 445 | template<typename... Ts> | |||||
| HITCBC | 446 | 36 | auto extract_results(when_all_state<Ts...>& state) | 446 | 36 | auto extract_results(when_all_state<Ts...>& state) | ||
| 447 | { | 447 | { | |||||
| HITCBC | 448 | 55 | return [&]<std::size_t... Is>(std::index_sequence<Is...>) { | 448 | 55 | return [&]<std::size_t... Is>(std::index_sequence<Is...>) { | ||
| HITCBC | 449 | 36 | return std::tuple(extract_single_result<Is>(state)...); | 449 | 36 | return std::tuple(extract_single_result<Is>(state)...); | ||
| HITCBC | 450 | 72 | }(std::index_sequence_for<Ts...>{}); | 450 | 72 | }(std::index_sequence_for<Ts...>{}); | ||
| 451 | } | 451 | } | |||||
| 452 | 452 | |||||||
| 453 | /** Launches all homogeneous runners concurrently. | 453 | /** Launches all homogeneous runners concurrently. | |||||
| 454 | 454 | |||||||
| 455 | Two-phase approach: create all runners first, then post all. | 455 | Two-phase approach: create all runners first, then post all. | |||||
| 456 | This avoids lifetime issues if a task completes synchronously. | 456 | This avoids lifetime issues if a task completes synchronously. | |||||
| 457 | */ | 457 | */ | |||||
| 458 | template<typename Range> | 458 | template<typename Range> | |||||
| 459 | class when_all_homogeneous_launcher | 459 | class when_all_homogeneous_launcher | |||||
| 460 | { | 460 | { | |||||
| 461 | using Awaitable = std::ranges::range_value_t<Range>; | 461 | using Awaitable = std::ranges::range_value_t<Range>; | |||||
| 462 | using PayloadT = io_result_payload_t<awaitable_result_t<Awaitable>>; | 462 | using PayloadT = io_result_payload_t<awaitable_result_t<Awaitable>>; | |||||
| 463 | 463 | |||||||
| 464 | Range* range_; | 464 | Range* range_; | |||||
| 465 | when_all_homogeneous_state<PayloadT>* state_; | 465 | when_all_homogeneous_state<PayloadT>* state_; | |||||
| 466 | 466 | |||||||
| 467 | public: | 467 | public: | |||||
| HITCBC | 468 | 14 | when_all_homogeneous_launcher( | 468 | 14 | when_all_homogeneous_launcher( | ||
| 469 | Range* range, | 469 | Range* range, | |||||
| 470 | when_all_homogeneous_state<PayloadT>* state) | 470 | when_all_homogeneous_state<PayloadT>* state) | |||||
| HITCBC | 471 | 14 | : range_(range) | 471 | 14 | : range_(range) | ||
| HITCBC | 472 | 14 | , state_(state) | 472 | 14 | , state_(state) | ||
| 473 | { | 473 | { | |||||
| HITCBC | 474 | 14 | } | 474 | 14 | } | ||
| 475 | 475 | |||||||
| HITCBC | 476 | 14 | bool await_ready() const noexcept | 476 | 14 | bool await_ready() const noexcept | ||
| 477 | { | 477 | { | |||||
| HITCBC | 478 | 14 | return std::ranges::empty(*range_); | 478 | 14 | return std::ranges::empty(*range_); | ||
| 479 | } | 479 | } | |||||
| 480 | 480 | |||||||
| HITCBC | 481 | 14 | std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env) | 481 | 14 | std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env) | ||
| 482 | { | 482 | { | |||||
| HITCBC | 483 | 14 | state_->core_.continuation_.h = continuation; | 483 | 14 | state_->core_.continuation_.h = continuation; | ||
| HITCBC | 484 | 14 | state_->core_.caller_env_ = caller_env; | 484 | 14 | state_->core_.caller_env_ = caller_env; | ||
| 485 | 485 | |||||||
| HITCBC | 486 | 14 | if(caller_env->stop_token.stop_possible()) | 486 | 14 | if(caller_env->stop_token.stop_possible()) | ||
| 487 | { | 487 | { | |||||
| HITCBC | 488 | 2 | state_->core_.parent_stop_callback_.emplace( | 488 | 2 | state_->core_.parent_stop_callback_.emplace( | ||
| HITCBC | 489 | 1 | caller_env->stop_token, | 489 | 1 | caller_env->stop_token, | ||
| HITCBC | 490 | 1 | when_all_core::stop_callback_fn{&state_->core_.stop_source_}); | 490 | 1 | when_all_core::stop_callback_fn{&state_->core_.stop_source_}); | ||
| 491 | 491 | |||||||
| HITCBC | 492 | 1 | if(caller_env->stop_token.stop_requested()) | 492 | 1 | if(caller_env->stop_token.stop_requested()) | ||
| MISUBC | 493 | ✗ | state_->core_.stop_source_.request_stop(); | 493 | ✗ | state_->core_.stop_source_.request_stop(); | ||
| 494 | } | 494 | } | |||||
| 495 | 495 | |||||||
| HITCBC | 496 | 14 | auto token = state_->core_.stop_source_.get_token(); | 496 | 14 | auto token = state_->core_.stop_source_.get_token(); | ||
| 497 | 497 | |||||||
| 498 | // Phase 1: Create all runners without dispatching. | 498 | // Phase 1: Create all runners without dispatching. | |||||
| HITCBC | 499 | 14 | std::size_t index = 0; | 499 | 14 | std::size_t index = 0; | ||
| HITCBC | 500 | 46 | for(auto&& a : *range_) | 500 | 46 | for(auto&& a : *range_) | ||
| 501 | { | 501 | { | |||||
| HITCBC | 502 | 32 | auto runner = make_when_all_homogeneous_runner( | 502 | 32 | auto runner = make_when_all_homogeneous_runner( | ||
| HITCBC | 503 | 32 | std::move(a), state_, index); | 503 | 32 | std::move(a), state_, index); | ||
| 504 | 504 | |||||||
| HITCBC | 505 | 32 | auto h = runner.release(); | 505 | 32 | auto h = runner.release(); | ||
| HITCBC | 506 | 32 | h.promise().state_ = state_; | 506 | 32 | h.promise().state_ = state_; | ||
| HITCBC | 507 | 32 | h.promise().index_ = index; | 507 | 32 | h.promise().index_ = index; | ||
| HITCBC | 508 | 32 | h.promise().env_ = io_env{caller_env->executor, token, caller_env->frame_allocator}; | 508 | 32 | h.promise().env_ = io_env{caller_env->executor, token, caller_env->frame_allocator}; | ||
| 509 | 509 | |||||||
| HITCBC | 510 | 32 | state_->runner_handles_[index].h = std::coroutine_handle<>{h}; | 510 | 32 | state_->runner_handles_[index].h = std::coroutine_handle<>{h}; | ||
| HITCBC | 511 | 32 | ++index; | 511 | 32 | ++index; | ||
| 512 | } | 512 | } | |||||
| 513 | 513 | |||||||
| 514 | // Phase 2: Post all runners. Any may complete synchronously. | 514 | // Phase 2: Post all runners. Any may complete synchronously. | |||||
| 515 | // After last post, state_ and this may be destroyed. | 515 | // After last post, state_ and this may be destroyed. | |||||
| HITCBC | 516 | 14 | auto* handles = state_->runner_handles_.get(); | 516 | 14 | auto* handles = state_->runner_handles_.get(); | ||
| HITCBC | 517 | 14 | std::size_t count = state_->core_.remaining_count_.load(std::memory_order_relaxed); | 517 | 14 | std::size_t count = state_->core_.remaining_count_.load(std::memory_order_relaxed); | ||
| HITCBC | 518 | 46 | for(std::size_t i = 0; i < count; ++i) | 518 | 46 | for(std::size_t i = 0; i < count; ++i) | ||
| HITCBC | 519 | 32 | caller_env->executor.post(handles[i]); | 519 | 32 | caller_env->executor.post(handles[i]); | ||
| 520 | 520 | |||||||
| HITCBC | 521 | 28 | return std::noop_coroutine(); | 521 | 28 | return std::noop_coroutine(); | ||
| HITCBC | 522 | 46 | } | 522 | 46 | } | ||
| 523 | 523 | |||||||
| HITCBC | 524 | 14 | void await_resume() const noexcept | 524 | 14 | void await_resume() const noexcept | ||
| 525 | { | 525 | { | |||||
| HITCBC | 526 | 14 | } | 526 | 14 | } | ||
| 527 | }; | 527 | }; | |||||
| 528 | 528 | |||||||
| 529 | } // namespace detail | 529 | } // namespace detail | |||||
| 530 | 530 | |||||||
| 531 | /** Execute a range of io_result-returning awaitables concurrently. | 531 | /** Execute a range of io_result-returning awaitables concurrently. | |||||
| 532 | 532 | |||||||
| 533 | Launches all awaitables simultaneously and waits for all to complete. | 533 | Launches all awaitables simultaneously and waits for all to complete. | |||||
| 534 | On success, extracted payloads are collected in a vector preserving | 534 | On success, extracted payloads are collected in a vector preserving | |||||
| 535 | input order. The first error_code cancels siblings and is propagated | 535 | input order. The first error_code cancels siblings and is propagated | |||||
| 536 | in the outer io_result. Exceptions always beat error codes. | 536 | in the outer io_result. Exceptions always beat error codes. | |||||
| 537 | 537 | |||||||
| 538 | @li All child awaitables run concurrently on the caller's executor | 538 | @li All child awaitables run concurrently on the caller's executor | |||||
| 539 | @li Payloads are returned as a vector in input order | 539 | @li Payloads are returned as a vector in input order | |||||
| 540 | @li First error_code wins and cancels siblings | 540 | @li First error_code wins and cancels siblings | |||||
| 541 | @li Exception always beats error_code | 541 | @li Exception always beats error_code | |||||
| 542 | @li Completes only after all children have finished | 542 | @li Completes only after all children have finished | |||||
| 543 | 543 | |||||||
| 544 | @par Thread Safety | 544 | @par Thread Safety | |||||
| 545 | The returned task must be awaited from a single execution context. | 545 | The returned task must be awaited from a single execution context. | |||||
| 546 | Child awaitables execute concurrently but complete through the caller's | 546 | Child awaitables execute concurrently but complete through the caller's | |||||
| 547 | executor. | 547 | executor. | |||||
| 548 | 548 | |||||||
| 549 | @param awaitables Range of io_result-returning awaitables to execute | 549 | @param awaitables Range of io_result-returning awaitables to execute | |||||
| 550 | concurrently (must not be empty). | 550 | concurrently (must not be empty). | |||||
| 551 | 551 | |||||||
| 552 | @return A task yielding io_result<vector<PayloadT>> where PayloadT | 552 | @return A task yielding io_result<vector<PayloadT>> where PayloadT | |||||
| 553 | is the payload extracted from each child's io_result. | 553 | is the payload extracted from each child's io_result. | |||||
| 554 | 554 | |||||||
| 555 | @throws std::invalid_argument if range is empty (thrown before | 555 | @throws std::invalid_argument if range is empty (thrown before | |||||
| 556 | coroutine suspends). | 556 | coroutine suspends). | |||||
| 557 | @throws Rethrows the first child exception after all children | 557 | @throws Rethrows the first child exception after all children | |||||
| 558 | complete (exception beats error_code). | 558 | complete (exception beats error_code). | |||||
| 559 | 559 | |||||||
| 560 | @par Example | 560 | @par Example | |||||
| 561 | @code | 561 | @code | |||||
| 562 | task<void> example() | 562 | task<void> example() | |||||
| 563 | { | 563 | { | |||||
| 564 | std::vector<io_task<size_t>> reads; | 564 | std::vector<io_task<size_t>> reads; | |||||
| 565 | for (auto& buf : buffers) | 565 | for (auto& buf : buffers) | |||||
| 566 | reads.push_back(stream.read_some(buf)); | 566 | reads.push_back(stream.read_some(buf)); | |||||
| 567 | 567 | |||||||
| 568 | auto [ec, counts] = co_await when_all(std::move(reads)); | 568 | auto [ec, counts] = co_await when_all(std::move(reads)); | |||||
| 569 | if (ec) { // handle error | 569 | if (ec) { // handle error | |||||
| 570 | } | 570 | } | |||||
| 571 | } | 571 | } | |||||
| 572 | @endcode | 572 | @endcode | |||||
| 573 | 573 | |||||||
| 574 | @see IoAwaitableRange, when_all | 574 | @see IoAwaitableRange, when_all | |||||
| 575 | */ | 575 | */ | |||||
| 576 | template<IoAwaitableRange R> | 576 | template<IoAwaitableRange R> | |||||
| 577 | requires detail::is_io_result_v< | 577 | requires detail::is_io_result_v< | |||||
| 578 | awaitable_result_t<std::ranges::range_value_t<R>>> | 578 | awaitable_result_t<std::ranges::range_value_t<R>>> | |||||
| 579 | && (!std::is_same_v< | 579 | && (!std::is_same_v< | |||||
| 580 | detail::io_result_payload_t< | 580 | detail::io_result_payload_t< | |||||
| 581 | awaitable_result_t<std::ranges::range_value_t<R>>>, | 581 | awaitable_result_t<std::ranges::range_value_t<R>>>, | |||||
| 582 | std::tuple<>>) | 582 | std::tuple<>>) | |||||
| HITCBC | 583 | 12 | [[nodiscard]] auto when_all(R&& awaitables) | 583 | 12 | [[nodiscard]] auto when_all(R&& awaitables) | ||
| 584 | -> task<io_result<std::vector< | 584 | -> task<io_result<std::vector< | |||||
| 585 | detail::io_result_payload_t< | 585 | detail::io_result_payload_t< | |||||
| 586 | awaitable_result_t<std::ranges::range_value_t<R>>>>>> | 586 | awaitable_result_t<std::ranges::range_value_t<R>>>>>> | |||||
| 587 | { | 587 | { | |||||
| 588 | using Awaitable = std::ranges::range_value_t<R>; | 588 | using Awaitable = std::ranges::range_value_t<R>; | |||||
| 589 | using PayloadT = detail::io_result_payload_t< | 589 | using PayloadT = detail::io_result_payload_t< | |||||
| 590 | awaitable_result_t<Awaitable>>; | 590 | awaitable_result_t<Awaitable>>; | |||||
| 591 | using OwnedRange = std::remove_cvref_t<R>; | 591 | using OwnedRange = std::remove_cvref_t<R>; | |||||
| 592 | 592 | |||||||
| 593 | auto count = std::ranges::size(awaitables); | 593 | auto count = std::ranges::size(awaitables); | |||||
| 594 | if(count == 0) | 594 | if(count == 0) | |||||
| 595 | throw std::invalid_argument("when_all requires at least one awaitable"); | 595 | throw std::invalid_argument("when_all requires at least one awaitable"); | |||||
| 596 | 596 | |||||||
| 597 | OwnedRange owned_awaitables = std::forward<R>(awaitables); | 597 | OwnedRange owned_awaitables = std::forward<R>(awaitables); | |||||
| 598 | 598 | |||||||
| 599 | detail::when_all_homogeneous_state<PayloadT> state(count); | 599 | detail::when_all_homogeneous_state<PayloadT> state(count); | |||||
| 600 | 600 | |||||||
| 601 | co_await detail::when_all_homogeneous_launcher<OwnedRange>( | 601 | co_await detail::when_all_homogeneous_launcher<OwnedRange>( | |||||
| 602 | &owned_awaitables, &state); | 602 | &owned_awaitables, &state); | |||||
| 603 | 603 | |||||||
| 604 | if(state.core_.first_exception_) | 604 | if(state.core_.first_exception_) | |||||
| 605 | std::rethrow_exception(state.core_.first_exception_); | 605 | std::rethrow_exception(state.core_.first_exception_); | |||||
| 606 | 606 | |||||||
| 607 | if(state.has_error_.load(std::memory_order_relaxed)) | 607 | if(state.has_error_.load(std::memory_order_relaxed)) | |||||
| 608 | co_return io_result<std::vector<PayloadT>>{state.first_error_, {}}; | 608 | co_return io_result<std::vector<PayloadT>>{state.first_error_, {}}; | |||||
| 609 | 609 | |||||||
| 610 | std::vector<PayloadT> results; | 610 | std::vector<PayloadT> results; | |||||
| 611 | results.reserve(count); | 611 | results.reserve(count); | |||||
| 612 | for(auto& opt : state.results_) | 612 | for(auto& opt : state.results_) | |||||
| 613 | results.push_back(std::move(*opt)); | 613 | results.push_back(std::move(*opt)); | |||||
| 614 | 614 | |||||||
| 615 | co_return io_result<std::vector<PayloadT>>{{}, std::move(results)}; | 615 | co_return io_result<std::vector<PayloadT>>{{}, std::move(results)}; | |||||
| HITCBC | 616 | 24 | } | 616 | 24 | } | ||
| 617 | 617 | |||||||
| 618 | /** Execute a range of void io_result-returning awaitables concurrently. | 618 | /** Execute a range of void io_result-returning awaitables concurrently. | |||||
| 619 | 619 | |||||||
| 620 | Launches all awaitables simultaneously and waits for all to complete. | 620 | Launches all awaitables simultaneously and waits for all to complete. | |||||
| 621 | Since all awaitables return io_result<>, no payload values are | 621 | Since all awaitables return io_result<>, no payload values are | |||||
| 622 | collected. The first error_code cancels siblings and is propagated. | 622 | collected. The first error_code cancels siblings and is propagated. | |||||
| 623 | Exceptions always beat error codes. | 623 | Exceptions always beat error codes. | |||||
| 624 | 624 | |||||||
| 625 | @param awaitables Range of io_result<>-returning awaitables to | 625 | @param awaitables Range of io_result<>-returning awaitables to | |||||
| 626 | execute concurrently (must not be empty). | 626 | execute concurrently (must not be empty). | |||||
| 627 | 627 | |||||||
| 628 | @return A task yielding io_result<> whose ec is the first child | 628 | @return A task yielding io_result<> whose ec is the first child | |||||
| 629 | error, or default-constructed on success. | 629 | error, or default-constructed on success. | |||||
| 630 | 630 | |||||||
| 631 | @throws std::invalid_argument if range is empty. | 631 | @throws std::invalid_argument if range is empty. | |||||
| 632 | @throws Rethrows the first child exception after all children | 632 | @throws Rethrows the first child exception after all children | |||||
| 633 | complete (exception beats error_code). | 633 | complete (exception beats error_code). | |||||
| 634 | 634 | |||||||
| 635 | @par Example | 635 | @par Example | |||||
| 636 | @code | 636 | @code | |||||
| 637 | task<void> example() | 637 | task<void> example() | |||||
| 638 | { | 638 | { | |||||
| 639 | std::vector<io_task<>> jobs; | 639 | std::vector<io_task<>> jobs; | |||||
| 640 | for (int i = 0; i < n; ++i) | 640 | for (int i = 0; i < n; ++i) | |||||
| 641 | jobs.push_back(process(i)); | 641 | jobs.push_back(process(i)); | |||||
| 642 | 642 | |||||||
| 643 | auto [ec] = co_await when_all(std::move(jobs)); | 643 | auto [ec] = co_await when_all(std::move(jobs)); | |||||
| 644 | } | 644 | } | |||||
| 645 | @endcode | 645 | @endcode | |||||
| 646 | 646 | |||||||
| 647 | @see IoAwaitableRange, when_all | 647 | @see IoAwaitableRange, when_all | |||||
| 648 | */ | 648 | */ | |||||
| 649 | template<IoAwaitableRange R> | 649 | template<IoAwaitableRange R> | |||||
| 650 | requires detail::is_io_result_v< | 650 | requires detail::is_io_result_v< | |||||
| 651 | awaitable_result_t<std::ranges::range_value_t<R>>> | 651 | awaitable_result_t<std::ranges::range_value_t<R>>> | |||||
| 652 | && std::is_same_v< | 652 | && std::is_same_v< | |||||
| 653 | detail::io_result_payload_t< | 653 | detail::io_result_payload_t< | |||||
| 654 | awaitable_result_t<std::ranges::range_value_t<R>>>, | 654 | awaitable_result_t<std::ranges::range_value_t<R>>>, | |||||
| 655 | std::tuple<>> | 655 | std::tuple<>> | |||||
| HITCBC | 656 | 4 | [[nodiscard]] auto when_all(R&& awaitables) -> task<io_result<>> | 656 | 4 | [[nodiscard]] auto when_all(R&& awaitables) -> task<io_result<>> | ||
| 657 | { | 657 | { | |||||
| 658 | using OwnedRange = std::remove_cvref_t<R>; | 658 | using OwnedRange = std::remove_cvref_t<R>; | |||||
| 659 | 659 | |||||||
| 660 | auto count = std::ranges::size(awaitables); | 660 | auto count = std::ranges::size(awaitables); | |||||
| 661 | if(count == 0) | 661 | if(count == 0) | |||||
| 662 | throw std::invalid_argument("when_all requires at least one awaitable"); | 662 | throw std::invalid_argument("when_all requires at least one awaitable"); | |||||
| 663 | 663 | |||||||
| 664 | OwnedRange owned_awaitables = std::forward<R>(awaitables); | 664 | OwnedRange owned_awaitables = std::forward<R>(awaitables); | |||||
| 665 | 665 | |||||||
| 666 | detail::when_all_homogeneous_state<std::tuple<>> state(count); | 666 | detail::when_all_homogeneous_state<std::tuple<>> state(count); | |||||
| 667 | 667 | |||||||
| 668 | co_await detail::when_all_homogeneous_launcher<OwnedRange>( | 668 | co_await detail::when_all_homogeneous_launcher<OwnedRange>( | |||||
| 669 | &owned_awaitables, &state); | 669 | &owned_awaitables, &state); | |||||
| 670 | 670 | |||||||
| 671 | if(state.core_.first_exception_) | 671 | if(state.core_.first_exception_) | |||||
| 672 | std::rethrow_exception(state.core_.first_exception_); | 672 | std::rethrow_exception(state.core_.first_exception_); | |||||
| 673 | 673 | |||||||
| 674 | if(state.has_error_.load(std::memory_order_relaxed)) | 674 | if(state.has_error_.load(std::memory_order_relaxed)) | |||||
| 675 | co_return io_result<>{state.first_error_}; | 675 | co_return io_result<>{state.first_error_}; | |||||
| 676 | 676 | |||||||
| 677 | co_return io_result<>{}; | 677 | co_return io_result<>{}; | |||||
| HITCBC | 678 | 8 | } | 678 | 8 | } | ||
| 679 | 679 | |||||||
| 680 | /** Execute io_result-returning awaitables concurrently, inspecting error codes. | 680 | /** Execute io_result-returning awaitables concurrently, inspecting error codes. | |||||
| 681 | 681 | |||||||
| 682 | Overload selected when all children return io_result<Ts...>. | 682 | Overload selected when all children return io_result<Ts...>. | |||||
| 683 | The error_code is lifted out of each child into a single outer | 683 | The error_code is lifted out of each child into a single outer | |||||
| 684 | io_result. On success all values are returned; on failure the | 684 | io_result. On success all values are returned; on failure the | |||||
| 685 | first error_code wins. | 685 | first error_code wins. | |||||
| 686 | 686 | |||||||
| 687 | @par Exception Safety | 687 | @par Exception Safety | |||||
| 688 | Exception always beats error_code. If any child throws, the | 688 | Exception always beats error_code. If any child throws, the | |||||
| 689 | exception is rethrown regardless of error_code results. | 689 | exception is rethrown regardless of error_code results. | |||||
| 690 | 690 | |||||||
| 691 | @param awaitables One or more awaitables each returning | 691 | @param awaitables One or more awaitables each returning | |||||
| 692 | io_result<Ts...>. | 692 | io_result<Ts...>. | |||||
| 693 | 693 | |||||||
| 694 | @return A task yielding io_result<R1, R2, ..., Rn> where each Ri | 694 | @return A task yielding io_result<R1, R2, ..., Rn> where each Ri | |||||
| 695 | follows the payload flattening rules. | 695 | follows the payload flattening rules. | |||||
| 696 | */ | 696 | */ | |||||
| 697 | template<IoAwaitable... As> | 697 | template<IoAwaitable... As> | |||||
| 698 | requires (sizeof...(As) > 0) | 698 | requires (sizeof...(As) > 0) | |||||
| 699 | && detail::all_io_result_awaitables<As...> | 699 | && detail::all_io_result_awaitables<As...> | |||||
| HITCBC | 700 | 50 | [[nodiscard]] auto when_all(As... awaitables) | 700 | 50 | [[nodiscard]] auto when_all(As... awaitables) | ||
| 701 | -> task<io_result< | 701 | -> task<io_result< | |||||
| 702 | detail::io_result_payload_t<awaitable_result_t<As>>...>> | 702 | detail::io_result_payload_t<awaitable_result_t<As>>...>> | |||||
| 703 | { | 703 | { | |||||
| 704 | using result_type = io_result< | 704 | using result_type = io_result< | |||||
| 705 | detail::io_result_payload_t<awaitable_result_t<As>>...>; | 705 | detail::io_result_payload_t<awaitable_result_t<As>>...>; | |||||
| 706 | 706 | |||||||
| 707 | detail::when_all_state<awaitable_result_t<As>...> state; | 707 | detail::when_all_state<awaitable_result_t<As>...> state; | |||||
| 708 | std::tuple<As...> awaitable_tuple(std::move(awaitables)...); | 708 | std::tuple<As...> awaitable_tuple(std::move(awaitables)...); | |||||
| 709 | 709 | |||||||
| 710 | co_await detail::when_all_io_launcher<As...>(&awaitable_tuple, &state); | 710 | co_await detail::when_all_io_launcher<As...>(&awaitable_tuple, &state); | |||||
| 711 | 711 | |||||||
| 712 | // Exception always wins over error_code | 712 | // Exception always wins over error_code | |||||
| 713 | if(state.core_.first_exception_) | 713 | if(state.core_.first_exception_) | |||||
| 714 | std::rethrow_exception(state.core_.first_exception_); | 714 | std::rethrow_exception(state.core_.first_exception_); | |||||
| 715 | 715 | |||||||
| 716 | auto r = detail::build_when_all_io_result<result_type>( | 716 | auto r = detail::build_when_all_io_result<result_type>( | |||||
| 717 | detail::extract_results(state)); | 717 | detail::extract_results(state)); | |||||
| 718 | if(state.has_error_.load(std::memory_order_relaxed)) | 718 | if(state.has_error_.load(std::memory_order_relaxed)) | |||||
| 719 | r.ec = state.first_error_; | 719 | r.ec = state.first_error_; | |||||
| 720 | co_return r; | 720 | co_return r; | |||||
| HITCBC | 721 | 100 | } | 721 | 100 | } | ||
| 722 | 722 | |||||||
| 723 | } // namespace capy | 723 | } // namespace capy | |||||
| 724 | } // namespace boost | 724 | } // namespace boost | |||||
| 725 | 725 | |||||||
| 726 | #endif | 726 | #endif | |||||