TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Michael Vandeberg
3 : // Copyright (c) 2026 Steve Gerbino
4 : //
5 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 : //
8 : // Official repository: https://github.com/cppalliance/capy
9 : //
10 :
11 : #ifndef BOOST_CAPY_WHEN_ANY_HPP
12 : #define BOOST_CAPY_WHEN_ANY_HPP
13 :
14 : #include <boost/capy/detail/config.hpp>
15 : #include <boost/capy/detail/io_result_combinators.hpp>
16 : #include <boost/capy/continuation.hpp>
17 : #include <boost/capy/concept/executor.hpp>
18 : #include <boost/capy/concept/io_awaitable.hpp>
19 : #include <coroutine>
20 : #include <boost/capy/ex/executor_ref.hpp>
21 : #include <boost/capy/ex/frame_alloc_mixin.hpp>
22 : #include <boost/capy/ex/frame_allocator.hpp>
23 : #include <boost/capy/ex/io_env.hpp>
24 : #include <boost/capy/task.hpp>
25 :
26 : #include <array>
27 : #include <atomic>
28 : #include <exception>
29 : #include <memory>
30 : #include <mutex>
31 : #include <optional>
32 : #include <ranges>
33 : #include <stdexcept>
34 : #include <stop_token>
35 : #include <tuple>
36 : #include <type_traits>
37 : #include <utility>
38 : #include <variant>
39 : #include <vector>
40 :
41 : /*
42 : when_any - Race multiple io_result tasks, select first success
43 : =============================================================
44 :
45 : OVERVIEW:
46 : ---------
47 : when_any launches N io_result-returning tasks concurrently. A task
48 : wins by returning !ec; errors and exceptions do not win. Once a
49 : winner is found, stop is requested for siblings and the winner's
50 : payload is returned. If no winner exists (all fail), one of the
51 : failures is surfaced (an error_code at variant index 0, or a child's
52 : exception rethrown); which one is unspecified.
53 :
54 : ARCHITECTURE:
55 : -------------
56 : The design mirrors when_all but with inverted completion semantics:
57 :
58 : when_all: complete when remaining_count reaches 0 (all done)
59 : when_any: complete when has_winner becomes true (first done)
60 : BUT still wait for remaining_count to reach 0 for cleanup
61 :
62 : Key components:
63 : - when_any_core: Shared state tracking winner and completion
64 : - when_any_io_runner: Wrapper coroutine for each child task
65 : - when_any_io_launcher/when_any_io_homogeneous_launcher:
66 : Awaitables that start all runners concurrently
67 :
68 : CRITICAL INVARIANTS:
69 : --------------------
70 : 1. Only a task returning !ec can become the winner (via atomic CAS)
71 : 2. All tasks must complete before parent resumes (cleanup safety)
72 : 3. Stop is requested immediately when winner is determined
73 : 4. Exceptions and errors do not claim winner status
74 :
75 : POSITIONAL VARIANT:
76 : -------------------
77 : The variadic overload returns std::variant<error_code, R1, R2, ..., Rn>.
78 : Index 0 is error_code (failure/no-winner). Index 1..N identifies the
79 : winning child and carries its payload.
80 :
81 : RANGE OVERLOAD:
82 : ---------------
83 : The range overload returns variant<error_code, pair<size_t, T>> for
84 : non-void children or variant<error_code, size_t> for void children.
85 :
86 : MEMORY MODEL:
87 : -------------
88 : Synchronization chain from winner's write to parent's read:
89 :
90 : 1. Winner thread writes result_ (non-atomic)
91 : 2. Winner thread calls signal_completion() -> fetch_sub(acq_rel) on remaining_count_
92 : 3. Last task thread (may be winner or non-winner) calls signal_completion()
93 : -> fetch_sub(acq_rel) on remaining_count_, observing count becomes 0
94 : 4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer
95 : 5. Parent coroutine resumes and reads result_
96 :
97 : Synchronization analysis:
98 : - All fetch_sub operations on remaining_count_ form a release sequence
99 : - Winner's fetch_sub releases; subsequent fetch_sub operations participate
100 : in the modification order of remaining_count_
101 : - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the
102 : modification order, establishing happens-before from winner's writes
103 : - Executor dispatch() is expected to provide queue-based synchronization
104 : (release-on-post, acquire-on-execute) completing the chain to parent
105 : - Even inline executors work (same thread = sequenced-before)
106 :
107 : EXCEPTION SEMANTICS:
108 : --------------------
109 : Exceptions do NOT claim winner status. If a child throws, the exception
110 : is recorded but the combinator keeps waiting for a success. Only when
111 : all children complete without a winner is a failure surfaced. There is
112 : no priority between errors and exceptions, and no guarantee about which
113 : child's failure is reported: the result either returns an error_code at
114 : variant index 0 or rethrows a child's exception.
115 : */
116 :
117 : namespace boost {
118 : namespace capy {
119 :
120 : namespace detail {
121 :
122 : /** Core shared state for when_any operations.
123 :
124 : Contains all members and methods common to both heterogeneous (variadic)
125 : and homogeneous (range) when_any implementations. State classes embed
126 : this via composition to avoid CRTP destructor ordering issues.
127 :
128 : @par Thread Safety
129 : Atomic operations protect winner selection and completion count.
130 : */
131 : struct when_any_core
132 : {
133 : std::atomic<std::size_t> remaining_count_;
134 : std::size_t winner_index_{0};
135 : std::exception_ptr winner_exception_;
136 : std::stop_source stop_source_;
137 :
138 : // Bridges parent's stop token to our stop_source
139 : struct stop_callback_fn
140 : {
141 : std::stop_source* source_;
142 HIT 3 : void operator()() const noexcept { source_->request_stop(); }
143 : };
144 : using stop_callback_t = std::stop_callback<stop_callback_fn>;
145 : std::optional<stop_callback_t> parent_stop_callback_;
146 :
147 : continuation continuation_;
148 : io_env const* caller_env_ = nullptr;
149 :
150 : // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members)
151 : std::atomic<bool> has_winner_{false};
152 :
153 34 : explicit when_any_core(std::size_t count) noexcept
154 34 : : remaining_count_(count)
155 : {
156 34 : }
157 :
158 : /** Atomically claim winner status; exactly one task succeeds. */
159 53 : bool try_win(std::size_t index) noexcept
160 : {
161 53 : bool expected = false;
162 53 : if(has_winner_.compare_exchange_strong(
163 : expected, true, std::memory_order_acq_rel))
164 : {
165 23 : winner_index_ = index;
166 23 : stop_source_.request_stop();
167 23 : return true;
168 : }
169 30 : return false;
170 : }
171 :
172 : /** @pre try_win() returned true. */
173 1 : void set_winner_exception(std::exception_ptr ep) noexcept
174 : {
175 1 : winner_exception_ = ep;
176 1 : }
177 :
178 : // Runners signal completion directly via final_suspend; no member function needed.
179 : };
180 :
181 : } // namespace detail
182 :
183 : namespace detail {
184 :
185 : // State for io_result-aware when_any: only !ec wins.
186 : template<typename... Ts>
187 : struct when_any_io_state
188 : {
189 : static constexpr std::size_t task_count = sizeof...(Ts);
190 : using variant_type = std::variant<std::error_code, Ts...>;
191 :
192 : when_any_core core_;
193 : std::optional<variant_type> result_;
194 : std::array<continuation, task_count> runner_handles_{};
195 :
196 : // A failure (error or exception) for the all-fail case. record_error
197 : // and record_exception overwrite each other, so which one survives is
198 : // unspecified (no priority between errors and exceptions).
199 : std::mutex failure_mu_;
200 : std::error_code last_error_;
201 : std::exception_ptr last_exception_;
202 :
203 18 : when_any_io_state()
204 18 : : core_(task_count)
205 : {
206 18 : }
207 :
208 14 : void record_error(std::error_code ec)
209 : {
210 14 : std::lock_guard lk(failure_mu_);
211 14 : last_error_ = ec;
212 14 : last_exception_ = nullptr;
213 14 : }
214 :
215 7 : void record_exception(std::exception_ptr ep)
216 : {
217 7 : std::lock_guard lk(failure_mu_);
218 7 : last_exception_ = ep;
219 7 : last_error_ = {};
220 7 : }
221 : };
222 :
223 : // Wrapper coroutine for io_result-aware when_any children.
224 : // unhandled_exception records the exception but does NOT claim winner status.
225 : template<typename StateType>
226 : struct BOOST_CAPY_CORO_DESTROY_WHEN_COMPLETE when_any_io_runner
227 : {
228 : struct promise_type
229 : : frame_alloc_mixin
230 : {
231 : StateType* state_ = nullptr;
232 : std::size_t index_ = 0;
233 : io_env env_;
234 :
235 87 : when_any_io_runner get_return_object() noexcept
236 : {
237 : return when_any_io_runner(
238 87 : std::coroutine_handle<promise_type>::from_promise(*this));
239 : }
240 :
241 87 : std::suspend_always initial_suspend() noexcept { return {}; }
242 :
243 87 : auto final_suspend() noexcept
244 : {
245 : struct awaiter
246 : {
247 : promise_type* p_;
248 87 : bool await_ready() const noexcept { return false; }
249 87 : auto await_suspend(std::coroutine_handle<> h) noexcept
250 : {
251 87 : auto& core = p_->state_->core_;
252 87 : auto* counter = &core.remaining_count_;
253 87 : auto* caller_env = core.caller_env_;
254 87 : auto& cont = core.continuation_;
255 :
256 87 : h.destroy();
257 :
258 87 : auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
259 87 : if(remaining == 1)
260 34 : return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
261 53 : return detail::symmetric_transfer(std::noop_coroutine());
262 : }
263 : void await_resume() const noexcept {} // LCOV_EXCL_LINE final_suspend awaiter, never resumed
264 : };
265 87 : return awaiter{this};
266 : }
267 :
268 74 : void return_void() noexcept {}
269 :
270 : // Exceptions do NOT win in io_result when_any
271 13 : void unhandled_exception() noexcept
272 : {
273 13 : state_->record_exception(std::current_exception());
274 13 : }
275 :
276 : template<class Awaitable>
277 : struct transform_awaiter
278 : {
279 : std::decay_t<Awaitable> a_;
280 : promise_type* p_;
281 :
282 87 : bool await_ready() { return a_.await_ready(); }
283 87 : decltype(auto) await_resume() { return a_.await_resume(); }
284 :
285 : template<class Promise>
286 86 : auto await_suspend(std::coroutine_handle<Promise> h)
287 : {
288 : using R = decltype(a_.await_suspend(h, &p_->env_));
289 : if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
290 86 : return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
291 : else
292 : return a_.await_suspend(h, &p_->env_);
293 : }
294 : };
295 :
296 : template<class Awaitable>
297 87 : auto await_transform(Awaitable&& a)
298 : {
299 : using A = std::decay_t<Awaitable>;
300 : if constexpr (IoAwaitable<A>)
301 : {
302 : return transform_awaiter<Awaitable>{
303 172 : std::forward<Awaitable>(a), this};
304 : }
305 : else
306 : {
307 : static_assert(sizeof(A) == 0, "requires IoAwaitable");
308 : }
309 85 : }
310 : };
311 :
312 : std::coroutine_handle<promise_type> h_;
313 :
314 87 : explicit when_any_io_runner(std::coroutine_handle<promise_type> h) noexcept
315 87 : : h_(h)
316 : {
317 87 : }
318 :
319 : when_any_io_runner(when_any_io_runner&& other) noexcept
320 : : h_(std::exchange(other.h_, nullptr))
321 : {
322 : }
323 :
324 : when_any_io_runner(when_any_io_runner const&) = delete;
325 : when_any_io_runner& operator=(when_any_io_runner const&) = delete;
326 : when_any_io_runner& operator=(when_any_io_runner&&) = delete;
327 :
328 87 : auto release() noexcept
329 : {
330 87 : return std::exchange(h_, nullptr);
331 : }
332 : };
333 :
334 : // Runner coroutine: only tries to win when the child returns !ec.
335 : template<std::size_t I, IoAwaitable Awaitable, typename StateType>
336 : when_any_io_runner<StateType>
337 33 : make_when_any_io_runner(Awaitable inner, StateType* state)
338 : {
339 : auto result = co_await std::move(inner);
340 :
341 : if(!result.ec)
342 : {
343 : // Success: try to claim winner
344 : if(state->core_.try_win(I))
345 : {
346 : try
347 : {
348 : state->result_.emplace(
349 : std::in_place_index<I + 1>,
350 : detail::extract_io_payload(std::move(result)));
351 : }
352 : catch(...)
353 : {
354 : state->core_.set_winner_exception(std::current_exception());
355 : }
356 : }
357 : }
358 : else
359 : {
360 : // Error: record but don't win
361 : state->record_error(result.ec);
362 : }
363 66 : }
364 :
365 : // Launcher for io_result-aware when_any.
366 : template<IoAwaitable... Awaitables>
367 : class when_any_io_launcher
368 : {
369 : using state_type = when_any_io_state<
370 : io_result_payload_t<awaitable_result_t<Awaitables>>...>;
371 :
372 : std::tuple<Awaitables...>* tasks_;
373 : state_type* state_;
374 :
375 : public:
376 18 : when_any_io_launcher(
377 : std::tuple<Awaitables...>* tasks,
378 : state_type* state)
379 18 : : tasks_(tasks)
380 18 : , state_(state)
381 : {
382 18 : }
383 :
384 18 : bool await_ready() const noexcept
385 : {
386 18 : return sizeof...(Awaitables) == 0;
387 : }
388 :
389 18 : std::coroutine_handle<> await_suspend(
390 : std::coroutine_handle<> continuation, io_env const* caller_env)
391 : {
392 18 : state_->core_.continuation_.h = continuation;
393 18 : state_->core_.caller_env_ = caller_env;
394 :
395 18 : if(caller_env->stop_token.stop_possible())
396 : {
397 4 : state_->core_.parent_stop_callback_.emplace(
398 2 : caller_env->stop_token,
399 2 : when_any_core::stop_callback_fn{&state_->core_.stop_source_});
400 :
401 2 : if(caller_env->stop_token.stop_requested())
402 1 : state_->core_.stop_source_.request_stop();
403 : }
404 :
405 18 : auto token = state_->core_.stop_source_.get_token();
406 18 : launch_all(std::index_sequence_for<Awaitables...>{},
407 : caller_env->executor, token);
408 :
409 36 : return std::noop_coroutine();
410 18 : }
411 :
412 18 : void await_resume() const noexcept {}
413 :
414 : private:
415 : template<std::size_t... Is>
416 18 : void launch_all(std::index_sequence<Is...>,
417 : executor_ref ex, std::stop_token token)
418 : {
419 18 : (..., launch_one<Is>(ex, token));
420 18 : }
421 :
422 : template<std::size_t I>
423 33 : void launch_one(executor_ref caller_ex, std::stop_token token)
424 : {
425 33 : auto runner = make_when_any_io_runner<I>(
426 33 : std::move(std::get<I>(*tasks_)), state_);
427 :
428 33 : auto h = runner.release();
429 33 : h.promise().state_ = state_;
430 33 : h.promise().index_ = I;
431 33 : h.promise().env_ = io_env{caller_ex, token,
432 33 : state_->core_.caller_env_->frame_allocator};
433 :
434 33 : state_->runner_handles_[I].h = std::coroutine_handle<>{h};
435 33 : caller_ex.post(state_->runner_handles_[I]);
436 66 : }
437 : };
438 :
439 : /** Shared state for homogeneous io_result-aware when_any (range overload).
440 :
441 : @tparam T The payload type extracted from io_result.
442 : */
443 : template<typename T>
444 : struct when_any_io_homogeneous_state
445 : {
446 : when_any_core core_;
447 : std::optional<T> result_;
448 : std::unique_ptr<continuation[]> runner_handles_;
449 :
450 : std::mutex failure_mu_;
451 : std::error_code last_error_;
452 : std::exception_ptr last_exception_;
453 :
454 13 : explicit when_any_io_homogeneous_state(std::size_t count)
455 13 : : core_(count)
456 13 : , runner_handles_(std::make_unique<continuation[]>(count))
457 : {
458 13 : }
459 :
460 6 : void record_error(std::error_code ec)
461 : {
462 6 : std::lock_guard lk(failure_mu_);
463 6 : last_error_ = ec;
464 6 : last_exception_ = nullptr;
465 6 : }
466 :
467 4 : void record_exception(std::exception_ptr ep)
468 : {
469 4 : std::lock_guard lk(failure_mu_);
470 4 : last_exception_ = ep;
471 4 : last_error_ = {};
472 4 : }
473 : };
474 :
475 : /** Specialization for void io_result children (no payload storage). */
476 : template<>
477 : struct when_any_io_homogeneous_state<std::tuple<>>
478 : {
479 : when_any_core core_;
480 : std::unique_ptr<continuation[]> runner_handles_;
481 :
482 : std::mutex failure_mu_;
483 : std::error_code last_error_;
484 : std::exception_ptr last_exception_;
485 :
486 3 : explicit when_any_io_homogeneous_state(std::size_t count)
487 3 : : core_(count)
488 3 : , runner_handles_(std::make_unique<continuation[]>(count))
489 : {
490 3 : }
491 :
492 1 : void record_error(std::error_code ec)
493 : {
494 1 : std::lock_guard lk(failure_mu_);
495 1 : last_error_ = ec;
496 1 : last_exception_ = nullptr;
497 1 : }
498 :
499 2 : void record_exception(std::exception_ptr ep)
500 : {
501 2 : std::lock_guard lk(failure_mu_);
502 2 : last_exception_ = ep;
503 2 : last_error_ = {};
504 2 : }
505 : };
506 :
507 : /** Create an io_result-aware runner for homogeneous when_any (range path).
508 :
509 : Only tries to win when the child returns !ec.
510 : */
511 : template<IoAwaitable Awaitable, typename StateType>
512 : when_any_io_runner<StateType>
513 54 : make_when_any_io_homogeneous_runner(
514 : Awaitable inner, StateType* state, std::size_t index)
515 : {
516 : auto result = co_await std::move(inner);
517 :
518 : if(!result.ec)
519 : {
520 : if(state->core_.try_win(index))
521 : {
522 : using PayloadT = io_result_payload_t<
523 : awaitable_result_t<Awaitable>>;
524 : if constexpr (!std::is_same_v<PayloadT, std::tuple<>>)
525 : {
526 : try
527 : {
528 : state->result_.emplace(
529 : extract_io_payload(std::move(result)));
530 : }
531 : catch(...)
532 : {
533 : state->core_.set_winner_exception(
534 : std::current_exception());
535 : }
536 : }
537 : }
538 : }
539 : else
540 : {
541 : state->record_error(result.ec);
542 : }
543 108 : }
544 :
545 : /** Launches all io_result-aware homogeneous runners concurrently. */
546 : template<IoAwaitableRange Range>
547 : class when_any_io_homogeneous_launcher
548 : {
549 : using Awaitable = std::ranges::range_value_t<Range>;
550 : using PayloadT = io_result_payload_t<awaitable_result_t<Awaitable>>;
551 :
552 : Range* range_;
553 : when_any_io_homogeneous_state<PayloadT>* state_;
554 :
555 : public:
556 16 : when_any_io_homogeneous_launcher(
557 : Range* range,
558 : when_any_io_homogeneous_state<PayloadT>* state)
559 16 : : range_(range)
560 16 : , state_(state)
561 : {
562 16 : }
563 :
564 16 : bool await_ready() const noexcept
565 : {
566 16 : return std::ranges::empty(*range_);
567 : }
568 :
569 16 : std::coroutine_handle<> await_suspend(
570 : std::coroutine_handle<> continuation, io_env const* caller_env)
571 : {
572 16 : state_->core_.continuation_.h = continuation;
573 16 : state_->core_.caller_env_ = caller_env;
574 :
575 16 : if(caller_env->stop_token.stop_possible())
576 : {
577 4 : state_->core_.parent_stop_callback_.emplace(
578 2 : caller_env->stop_token,
579 2 : when_any_core::stop_callback_fn{&state_->core_.stop_source_});
580 :
581 2 : if(caller_env->stop_token.stop_requested())
582 1 : state_->core_.stop_source_.request_stop();
583 : }
584 :
585 16 : auto token = state_->core_.stop_source_.get_token();
586 :
587 : // Phase 1: Create all runners without dispatching.
588 16 : std::size_t index = 0;
589 70 : for(auto&& a : *range_)
590 : {
591 54 : auto runner = make_when_any_io_homogeneous_runner(
592 54 : std::move(a), state_, index);
593 :
594 54 : auto h = runner.release();
595 54 : h.promise().state_ = state_;
596 54 : h.promise().index_ = index;
597 54 : h.promise().env_ = io_env{caller_env->executor, token,
598 54 : caller_env->frame_allocator};
599 :
600 54 : state_->runner_handles_[index].h = std::coroutine_handle<>{h};
601 54 : ++index;
602 : }
603 :
604 : // Phase 2: Post all runners. Any may complete synchronously.
605 16 : auto* handles = state_->runner_handles_.get();
606 16 : std::size_t count = state_->core_.remaining_count_.load(std::memory_order_relaxed);
607 70 : for(std::size_t i = 0; i < count; ++i)
608 54 : caller_env->executor.post(handles[i]);
609 :
610 32 : return std::noop_coroutine();
611 70 : }
612 :
613 16 : void await_resume() const noexcept {}
614 : };
615 :
616 : } // namespace detail
617 :
618 : /** Race a range of io_result-returning awaitables (non-void payloads).
619 :
620 : Only a child returning !ec can win. Errors and exceptions do not
621 : claim winner status. If all children fail, an unspecified one of
622 : the failures is reported — either an error_code at variant index 0,
623 : or a child's exception rethrown.
624 :
625 : @param awaitables Range of io_result-returning awaitables (must
626 : not be empty).
627 :
628 : @return A task yielding variant<error_code, pair<size_t, PayloadT>>
629 : where index 0 is failure and index 1 carries the winner's
630 : index and payload.
631 :
632 : @throws std::invalid_argument if range is empty.
633 : @throws Rethrows the winner's exception if extracting or
634 : move-constructing the winning payload throws (a winner was
635 : found but its result could not be produced).
636 : @throws Rethrows a child's exception when all children fail and the
637 : reported failure is an exception (which child is unspecified).
638 :
639 : @par Example
640 : @code
641 : task<void> example()
642 : {
643 : std::vector<io_task<size_t>> reads;
644 : for (auto& buf : buffers)
645 : reads.push_back(stream.read_some(buf));
646 :
647 : auto result = co_await when_any(std::move(reads));
648 : if (result.index() == 1)
649 : {
650 : auto [idx, n] = std::get<1>(result);
651 : }
652 : }
653 : @endcode
654 :
655 : @see IoAwaitableRange, when_any
656 : */
657 : template<IoAwaitableRange R>
658 : requires detail::is_io_result_v<
659 : awaitable_result_t<std::ranges::range_value_t<R>>>
660 : && (!std::is_same_v<
661 : detail::io_result_payload_t<
662 : awaitable_result_t<std::ranges::range_value_t<R>>>,
663 : std::tuple<>>)
664 14 : [[nodiscard]] auto when_any(R&& awaitables)
665 : -> task<std::variant<std::error_code,
666 : std::pair<std::size_t,
667 : detail::io_result_payload_t<
668 : awaitable_result_t<std::ranges::range_value_t<R>>>>>>
669 : {
670 : using Awaitable = std::ranges::range_value_t<R>;
671 : using PayloadT = detail::io_result_payload_t<
672 : awaitable_result_t<Awaitable>>;
673 : using result_type = std::variant<std::error_code,
674 : std::pair<std::size_t, PayloadT>>;
675 : using OwnedRange = std::remove_cvref_t<R>;
676 :
677 : auto count = std::ranges::size(awaitables);
678 : if(count == 0)
679 : throw std::invalid_argument("when_any requires at least one awaitable");
680 :
681 : OwnedRange owned_awaitables = std::forward<R>(awaitables);
682 :
683 : detail::when_any_io_homogeneous_state<PayloadT> state(count);
684 :
685 : co_await detail::when_any_io_homogeneous_launcher<OwnedRange>(
686 : &owned_awaitables, &state);
687 :
688 : // Winner found
689 : if(state.core_.has_winner_.load(std::memory_order_acquire))
690 : {
691 : if(state.core_.winner_exception_)
692 : std::rethrow_exception(state.core_.winner_exception_);
693 : co_return result_type{std::in_place_index<1>,
694 : std::pair{state.core_.winner_index_, std::move(*state.result_)}};
695 : }
696 :
697 : // No winner — report the recorded failure
698 : if(state.last_exception_)
699 : std::rethrow_exception(state.last_exception_);
700 : co_return result_type{std::in_place_index<0>, state.last_error_};
701 28 : }
702 :
703 : /** Race a range of void io_result-returning awaitables.
704 :
705 : Only a child returning !ec can win. Returns the winner's index
706 : at variant index 1, or error_code at index 0 on all-fail.
707 :
708 : @param awaitables Range of io_result<>-returning awaitables (must
709 : not be empty).
710 :
711 : @return A task yielding variant<error_code, size_t> where index 0
712 : is failure and index 1 carries the winner's index.
713 :
714 : @throws std::invalid_argument if range is empty.
715 : @throws Rethrows a child's exception when all children fail and the
716 : reported failure is an exception (which child is unspecified).
717 :
718 : @par Example
719 : @code
720 : task<void> example()
721 : {
722 : std::vector<io_task<>> jobs;
723 : jobs.push_back(background_work_a());
724 : jobs.push_back(background_work_b());
725 :
726 : auto result = co_await when_any(std::move(jobs));
727 : if (result.index() == 1)
728 : {
729 : auto winner = std::get<1>(result);
730 : }
731 : }
732 : @endcode
733 :
734 : @see IoAwaitableRange, when_any
735 : */
736 : template<IoAwaitableRange R>
737 : requires detail::is_io_result_v<
738 : awaitable_result_t<std::ranges::range_value_t<R>>>
739 : && std::is_same_v<
740 : detail::io_result_payload_t<
741 : awaitable_result_t<std::ranges::range_value_t<R>>>,
742 : std::tuple<>>
743 3 : [[nodiscard]] auto when_any(R&& awaitables)
744 : -> task<std::variant<std::error_code, std::size_t>>
745 : {
746 : using OwnedRange = std::remove_cvref_t<R>;
747 : using result_type = std::variant<std::error_code, std::size_t>;
748 :
749 : auto count = std::ranges::size(awaitables);
750 : if(count == 0)
751 : throw std::invalid_argument("when_any requires at least one awaitable");
752 :
753 : OwnedRange owned_awaitables = std::forward<R>(awaitables);
754 :
755 : detail::when_any_io_homogeneous_state<std::tuple<>> state(count);
756 :
757 : co_await detail::when_any_io_homogeneous_launcher<OwnedRange>(
758 : &owned_awaitables, &state);
759 :
760 : // Winner found
761 : if(state.core_.has_winner_.load(std::memory_order_acquire))
762 : {
763 : if(state.core_.winner_exception_)
764 : std::rethrow_exception(state.core_.winner_exception_);
765 : co_return result_type{std::in_place_index<1>,
766 : state.core_.winner_index_};
767 : }
768 :
769 : // No winner — report the recorded failure
770 : if(state.last_exception_)
771 : std::rethrow_exception(state.last_exception_);
772 : co_return result_type{std::in_place_index<0>, state.last_error_};
773 6 : }
774 :
775 : /** Race io_result-returning awaitables, selecting the first success.
776 :
777 : Overload selected when all children return io_result<Ts...>.
778 : Only a child returning !ec can win. Errors and exceptions do
779 : not claim winner status.
780 :
781 : @param as The awaitables to race. Each must satisfy @ref
782 : IoAwaitable and is consumed (moved-from) when `when_any`
783 : is awaited.
784 :
785 : @return A task yielding variant<error_code, R1, ..., Rn> where
786 : index 0 is the failure/no-winner case and index i+1
787 : identifies the winning child. On all-fail, index 0 holds
788 : an error_code from one of the failed children (unspecified
789 : which; no priority between errors and exceptions).
790 :
791 : @throws Rethrows the winner's exception if extracting or
792 : constructing the winning payload throws (a winner was found
793 : but its result could not be produced).
794 : @throws Rethrows a child's exception when all children fail and the
795 : reported failure is an exception (which child is unspecified).
796 :
797 : @note A failing child does not cancel its siblings; `when_any`
798 : waits for a success or for every child to finish. To make a
799 : benign error (e.g. @c cond::canceled) count as a win, wrap
800 : the child to translate the error into success. See the
801 : Concurrent Composition tutorial.
802 : */
803 : template<IoAwaitable... As>
804 : requires (sizeof...(As) > 0)
805 : && detail::all_io_result_awaitables<As...>
806 18 : [[nodiscard]] auto when_any(As... as)
807 : -> task<std::variant<
808 : std::error_code,
809 : detail::io_result_payload_t<awaitable_result_t<As>>...>>
810 : {
811 : using result_type = std::variant<
812 : std::error_code,
813 : detail::io_result_payload_t<awaitable_result_t<As>>...>;
814 :
815 : detail::when_any_io_state<
816 : detail::io_result_payload_t<awaitable_result_t<As>>...> state;
817 : std::tuple<As...> awaitable_tuple(std::move(as)...);
818 :
819 : co_await detail::when_any_io_launcher<As...>(
820 : &awaitable_tuple, &state);
821 :
822 : // Winner found: return their result
823 : if(state.result_.has_value())
824 : co_return std::move(*state.result_);
825 :
826 : // Winner claimed but payload construction failed
827 : if(state.core_.winner_exception_)
828 : std::rethrow_exception(state.core_.winner_exception_);
829 :
830 : // No winner — report the recorded failure
831 : if(state.last_exception_)
832 : std::rethrow_exception(state.last_exception_);
833 : co_return result_type{std::in_place_index<0>, state.last_error_};
834 36 : }
835 :
836 : } // namespace capy
837 : } // namespace boost
838 :
839 : #endif
|