LCOV - code coverage report
Current view: top level - capy - when_any.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 100.0 % 165 165
Test Date: 2026-06-24 18:54:23 Functions: 95.0 % 179 170 9

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Michael Vandeberg
       3                 : // Copyright (c) 2026 Steve Gerbino
       4                 : //
       5                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       6                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       7                 : //
       8                 : // Official repository: https://github.com/cppalliance/capy
       9                 : //
      10                 : 
      11                 : #ifndef BOOST_CAPY_WHEN_ANY_HPP
      12                 : #define BOOST_CAPY_WHEN_ANY_HPP
      13                 : 
      14                 : #include <boost/capy/detail/config.hpp>
      15                 : #include <boost/capy/detail/io_result_combinators.hpp>
      16                 : #include <boost/capy/continuation.hpp>
      17                 : #include <boost/capy/concept/executor.hpp>
      18                 : #include <boost/capy/concept/io_awaitable.hpp>
      19                 : #include <coroutine>
      20                 : #include <boost/capy/ex/executor_ref.hpp>
      21                 : #include <boost/capy/ex/frame_alloc_mixin.hpp>
      22                 : #include <boost/capy/ex/frame_allocator.hpp>
      23                 : #include <boost/capy/ex/io_env.hpp>
      24                 : #include <boost/capy/task.hpp>
      25                 : 
      26                 : #include <array>
      27                 : #include <atomic>
      28                 : #include <exception>
      29                 : #include <memory>
      30                 : #include <mutex>
      31                 : #include <optional>
      32                 : #include <ranges>
      33                 : #include <stdexcept>
      34                 : #include <stop_token>
      35                 : #include <tuple>
      36                 : #include <type_traits>
      37                 : #include <utility>
      38                 : #include <variant>
      39                 : #include <vector>
      40                 : 
      41                 : /*
      42                 :    when_any - Race multiple io_result tasks, select first success
      43                 :    =============================================================
      44                 : 
      45                 :    OVERVIEW:
      46                 :    ---------
      47                 :    when_any launches N io_result-returning tasks concurrently. A task
      48                 :    wins by returning !ec; errors and exceptions do not win. Once a
      49                 :    winner is found, stop is requested for siblings and the winner's
      50                 :    payload is returned. If no winner exists (all fail), one of the
      51                 :    failures is surfaced (an error_code at variant index 0, or a child's
      52                 :    exception rethrown); which one is unspecified.
      53                 : 
      54                 :    ARCHITECTURE:
      55                 :    -------------
      56                 :    The design mirrors when_all but with inverted completion semantics:
      57                 : 
      58                 :      when_all:  complete when remaining_count reaches 0 (all done)
      59                 :      when_any:  complete when has_winner becomes true (first done)
      60                 :                 BUT still wait for remaining_count to reach 0 for cleanup
      61                 : 
      62                 :    Key components:
      63                 :      - when_any_core:    Shared state tracking winner and completion
      64                 :      - when_any_io_runner: Wrapper coroutine for each child task
      65                 :      - when_any_io_launcher/when_any_io_homogeneous_launcher:
      66                 :                           Awaitables that start all runners concurrently
      67                 : 
      68                 :    CRITICAL INVARIANTS:
      69                 :    --------------------
      70                 :    1. Only a task returning !ec can become the winner (via atomic CAS)
      71                 :    2. All tasks must complete before parent resumes (cleanup safety)
      72                 :    3. Stop is requested immediately when winner is determined
      73                 :    4. Exceptions and errors do not claim winner status
      74                 : 
      75                 :    POSITIONAL VARIANT:
      76                 :    -------------------
      77                 :    The variadic overload returns std::variant<error_code, R1, R2, ..., Rn>.
      78                 :    Index 0 is error_code (failure/no-winner). Index 1..N identifies the
      79                 :    winning child and carries its payload.
      80                 : 
      81                 :    RANGE OVERLOAD:
      82                 :    ---------------
      83                 :    The range overload returns variant<error_code, pair<size_t, T>> for
      84                 :    non-void children or variant<error_code, size_t> for void children.
      85                 : 
      86                 :    MEMORY MODEL:
      87                 :    -------------
      88                 :    Synchronization chain from winner's write to parent's read:
      89                 : 
      90                 :    1. Winner thread writes result_ (non-atomic)
      91                 :    2. Winner thread calls signal_completion() -> fetch_sub(acq_rel) on remaining_count_
      92                 :    3. Last task thread (may be winner or non-winner) calls signal_completion()
      93                 :       -> fetch_sub(acq_rel) on remaining_count_, observing count becomes 0
      94                 :    4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer
      95                 :    5. Parent coroutine resumes and reads result_
      96                 : 
      97                 :    Synchronization analysis:
      98                 :    - All fetch_sub operations on remaining_count_ form a release sequence
      99                 :    - Winner's fetch_sub releases; subsequent fetch_sub operations participate
     100                 :      in the modification order of remaining_count_
     101                 :    - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the
     102                 :      modification order, establishing happens-before from winner's writes
     103                 :    - Executor dispatch() is expected to provide queue-based synchronization
     104                 :      (release-on-post, acquire-on-execute) completing the chain to parent
     105                 :    - Even inline executors work (same thread = sequenced-before)
     106                 : 
     107                 :    EXCEPTION SEMANTICS:
     108                 :    --------------------
     109                 :    Exceptions do NOT claim winner status. If a child throws, the exception
     110                 :    is recorded but the combinator keeps waiting for a success. Only when
     111                 :    all children complete without a winner is a failure surfaced. There is
     112                 :    no priority between errors and exceptions, and no guarantee about which
     113                 :    child's failure is reported: the result either returns an error_code at
     114                 :    variant index 0 or rethrows a child's exception.
     115                 : */
     116                 : 
     117                 : namespace boost {
     118                 : namespace capy {
     119                 : 
     120                 : namespace detail {
     121                 : 
     122                 : /** Core shared state for when_any operations.
     123                 : 
     124                 :     Contains all members and methods common to both heterogeneous (variadic)
     125                 :     and homogeneous (range) when_any implementations. State classes embed
     126                 :     this via composition to avoid CRTP destructor ordering issues.
     127                 : 
     128                 :     @par Thread Safety
     129                 :     Atomic operations protect winner selection and completion count.
     130                 : */
     131                 : struct when_any_core
     132                 : {
     133                 :     std::atomic<std::size_t> remaining_count_;
     134                 :     std::size_t winner_index_{0};
     135                 :     std::exception_ptr winner_exception_;
     136                 :     std::stop_source stop_source_;
     137                 : 
     138                 :     // Bridges parent's stop token to our stop_source
     139                 :     struct stop_callback_fn
     140                 :     {
     141                 :         std::stop_source* source_;
     142 HIT           3 :         void operator()() const noexcept { source_->request_stop(); }
     143                 :     };
     144                 :     using stop_callback_t = std::stop_callback<stop_callback_fn>;
     145                 :     std::optional<stop_callback_t> parent_stop_callback_;
     146                 : 
     147                 :     continuation continuation_;
     148                 :     io_env const* caller_env_ = nullptr;
     149                 : 
     150                 :     // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members)
     151                 :     std::atomic<bool> has_winner_{false};
     152                 : 
     153              34 :     explicit when_any_core(std::size_t count) noexcept
     154              34 :         : remaining_count_(count)
     155                 :     {
     156              34 :     }
     157                 : 
     158                 :     /** Atomically claim winner status; exactly one task succeeds. */
     159              53 :     bool try_win(std::size_t index) noexcept
     160                 :     {
     161              53 :         bool expected = false;
     162              53 :         if(has_winner_.compare_exchange_strong(
     163                 :             expected, true, std::memory_order_acq_rel))
     164                 :         {
     165              23 :             winner_index_ = index;
     166              23 :             stop_source_.request_stop();
     167              23 :             return true;
     168                 :         }
     169              30 :         return false;
     170                 :     }
     171                 : 
     172                 :     /** @pre try_win() returned true. */
     173               1 :     void set_winner_exception(std::exception_ptr ep) noexcept
     174                 :     {
     175               1 :         winner_exception_ = ep;
     176               1 :     }
     177                 : 
     178                 :     // Runners signal completion directly via final_suspend; no member function needed.
     179                 : };
     180                 : 
     181                 : } // namespace detail
     182                 : 
     183                 : namespace detail {
     184                 : 
     185                 : // State for io_result-aware when_any: only !ec wins.
     186                 : template<typename... Ts>
     187                 : struct when_any_io_state
     188                 : {
     189                 :     static constexpr std::size_t task_count = sizeof...(Ts);
     190                 :     using variant_type = std::variant<std::error_code, Ts...>;
     191                 : 
     192                 :     when_any_core core_;
     193                 :     std::optional<variant_type> result_;
     194                 :     std::array<continuation, task_count> runner_handles_{};
     195                 : 
     196                 :     // A failure (error or exception) for the all-fail case. record_error
     197                 :     // and record_exception overwrite each other, so which one survives is
     198                 :     // unspecified (no priority between errors and exceptions).
     199                 :     std::mutex failure_mu_;
     200                 :     std::error_code last_error_;
     201                 :     std::exception_ptr last_exception_;
     202                 : 
     203              18 :     when_any_io_state()
     204              18 :         : core_(task_count)
     205                 :     {
     206              18 :     }
     207                 : 
     208              14 :     void record_error(std::error_code ec)
     209                 :     {
     210              14 :         std::lock_guard lk(failure_mu_);
     211              14 :         last_error_ = ec;
     212              14 :         last_exception_ = nullptr;
     213              14 :     }
     214                 : 
     215               7 :     void record_exception(std::exception_ptr ep)
     216                 :     {
     217               7 :         std::lock_guard lk(failure_mu_);
     218               7 :         last_exception_ = ep;
     219               7 :         last_error_ = {};
     220               7 :     }
     221                 : };
     222                 : 
     223                 : // Wrapper coroutine for io_result-aware when_any children.
     224                 : // unhandled_exception records the exception but does NOT claim winner status.
     225                 : template<typename StateType>
     226                 : struct BOOST_CAPY_CORO_DESTROY_WHEN_COMPLETE when_any_io_runner
     227                 : {
     228                 :     struct promise_type
     229                 :         : frame_alloc_mixin
     230                 :     {
     231                 :         StateType* state_ = nullptr;
     232                 :         std::size_t index_ = 0;
     233                 :         io_env env_;
     234                 : 
     235              87 :         when_any_io_runner get_return_object() noexcept
     236                 :         {
     237                 :             return when_any_io_runner(
     238              87 :                 std::coroutine_handle<promise_type>::from_promise(*this));
     239                 :         }
     240                 : 
     241              87 :         std::suspend_always initial_suspend() noexcept { return {}; }
     242                 : 
     243              87 :         auto final_suspend() noexcept
     244                 :         {
     245                 :             struct awaiter
     246                 :             {
     247                 :                 promise_type* p_;
     248              87 :                 bool await_ready() const noexcept { return false; }
     249              87 :                 auto await_suspend(std::coroutine_handle<> h) noexcept
     250                 :                 {
     251              87 :                     auto& core = p_->state_->core_;
     252              87 :                     auto* counter = &core.remaining_count_;
     253              87 :                     auto* caller_env = core.caller_env_;
     254              87 :                     auto& cont = core.continuation_;
     255                 : 
     256              87 :                     h.destroy();
     257                 : 
     258              87 :                     auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
     259              87 :                     if(remaining == 1)
     260              34 :                         return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
     261              53 :                     return detail::symmetric_transfer(std::noop_coroutine());
     262                 :                 }
     263                 :                 void await_resume() const noexcept {} // LCOV_EXCL_LINE final_suspend awaiter, never resumed
     264                 :             };
     265              87 :             return awaiter{this};
     266                 :         }
     267                 : 
     268              74 :         void return_void() noexcept {}
     269                 : 
     270                 :         // Exceptions do NOT win in io_result when_any
     271              13 :         void unhandled_exception() noexcept
     272                 :         {
     273              13 :             state_->record_exception(std::current_exception());
     274              13 :         }
     275                 : 
     276                 :         template<class Awaitable>
     277                 :         struct transform_awaiter
     278                 :         {
     279                 :             std::decay_t<Awaitable> a_;
     280                 :             promise_type* p_;
     281                 : 
     282              87 :             bool await_ready() { return a_.await_ready(); }
     283              87 :             decltype(auto) await_resume() { return a_.await_resume(); }
     284                 : 
     285                 :             template<class Promise>
     286              86 :             auto await_suspend(std::coroutine_handle<Promise> h)
     287                 :             {
     288                 :                 using R = decltype(a_.await_suspend(h, &p_->env_));
     289                 :                 if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
     290              86 :                     return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
     291                 :                 else
     292                 :                     return a_.await_suspend(h, &p_->env_);
     293                 :             }
     294                 :         };
     295                 : 
     296                 :         template<class Awaitable>
     297              87 :         auto await_transform(Awaitable&& a)
     298                 :         {
     299                 :             using A = std::decay_t<Awaitable>;
     300                 :             if constexpr (IoAwaitable<A>)
     301                 :             {
     302                 :                 return transform_awaiter<Awaitable>{
     303             172 :                     std::forward<Awaitable>(a), this};
     304                 :             }
     305                 :             else
     306                 :             {
     307                 :                 static_assert(sizeof(A) == 0, "requires IoAwaitable");
     308                 :             }
     309              85 :         }
     310                 :     };
     311                 : 
     312                 :     std::coroutine_handle<promise_type> h_;
     313                 : 
     314              87 :     explicit when_any_io_runner(std::coroutine_handle<promise_type> h) noexcept
     315              87 :         : h_(h)
     316                 :     {
     317              87 :     }
     318                 : 
     319                 :     when_any_io_runner(when_any_io_runner&& other) noexcept
     320                 :         : h_(std::exchange(other.h_, nullptr))
     321                 :     {
     322                 :     }
     323                 : 
     324                 :     when_any_io_runner(when_any_io_runner const&) = delete;
     325                 :     when_any_io_runner& operator=(when_any_io_runner const&) = delete;
     326                 :     when_any_io_runner& operator=(when_any_io_runner&&) = delete;
     327                 : 
     328              87 :     auto release() noexcept
     329                 :     {
     330              87 :         return std::exchange(h_, nullptr);
     331                 :     }
     332                 : };
     333                 : 
     334                 : // Runner coroutine: only tries to win when the child returns !ec.
     335                 : template<std::size_t I, IoAwaitable Awaitable, typename StateType>
     336                 : when_any_io_runner<StateType>
     337              33 : make_when_any_io_runner(Awaitable inner, StateType* state)
     338                 : {
     339                 :     auto result = co_await std::move(inner);
     340                 : 
     341                 :     if(!result.ec)
     342                 :     {
     343                 :         // Success: try to claim winner
     344                 :         if(state->core_.try_win(I))
     345                 :         {
     346                 :             try
     347                 :             {
     348                 :                 state->result_.emplace(
     349                 :                     std::in_place_index<I + 1>,
     350                 :                     detail::extract_io_payload(std::move(result)));
     351                 :             }
     352                 :             catch(...)
     353                 :             {
     354                 :                 state->core_.set_winner_exception(std::current_exception());
     355                 :             }
     356                 :         }
     357                 :     }
     358                 :     else
     359                 :     {
     360                 :         // Error: record but don't win
     361                 :         state->record_error(result.ec);
     362                 :     }
     363              66 : }
     364                 : 
     365                 : // Launcher for io_result-aware when_any.
     366                 : template<IoAwaitable... Awaitables>
     367                 : class when_any_io_launcher
     368                 : {
     369                 :     using state_type = when_any_io_state<
     370                 :         io_result_payload_t<awaitable_result_t<Awaitables>>...>;
     371                 : 
     372                 :     std::tuple<Awaitables...>* tasks_;
     373                 :     state_type* state_;
     374                 : 
     375                 : public:
     376              18 :     when_any_io_launcher(
     377                 :         std::tuple<Awaitables...>* tasks,
     378                 :         state_type* state)
     379              18 :         : tasks_(tasks)
     380              18 :         , state_(state)
     381                 :     {
     382              18 :     }
     383                 : 
     384              18 :     bool await_ready() const noexcept
     385                 :     {
     386              18 :         return sizeof...(Awaitables) == 0;
     387                 :     }
     388                 : 
     389              18 :     std::coroutine_handle<> await_suspend(
     390                 :         std::coroutine_handle<> continuation, io_env const* caller_env)
     391                 :     {
     392              18 :         state_->core_.continuation_.h = continuation;
     393              18 :         state_->core_.caller_env_ = caller_env;
     394                 : 
     395              18 :         if(caller_env->stop_token.stop_possible())
     396                 :         {
     397               4 :             state_->core_.parent_stop_callback_.emplace(
     398               2 :                 caller_env->stop_token,
     399               2 :                 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
     400                 : 
     401               2 :             if(caller_env->stop_token.stop_requested())
     402               1 :                 state_->core_.stop_source_.request_stop();
     403                 :         }
     404                 : 
     405              18 :         auto token = state_->core_.stop_source_.get_token();
     406              18 :         launch_all(std::index_sequence_for<Awaitables...>{},
     407                 :             caller_env->executor, token);
     408                 : 
     409              36 :         return std::noop_coroutine();
     410              18 :     }
     411                 : 
     412              18 :     void await_resume() const noexcept {}
     413                 : 
     414                 : private:
     415                 :     template<std::size_t... Is>
     416              18 :     void launch_all(std::index_sequence<Is...>,
     417                 :         executor_ref ex, std::stop_token token)
     418                 :     {
     419              18 :         (..., launch_one<Is>(ex, token));
     420              18 :     }
     421                 : 
     422                 :     template<std::size_t I>
     423              33 :     void launch_one(executor_ref caller_ex, std::stop_token token)
     424                 :     {
     425              33 :         auto runner = make_when_any_io_runner<I>(
     426              33 :             std::move(std::get<I>(*tasks_)), state_);
     427                 : 
     428              33 :         auto h = runner.release();
     429              33 :         h.promise().state_ = state_;
     430              33 :         h.promise().index_ = I;
     431              33 :         h.promise().env_ = io_env{caller_ex, token,
     432              33 :             state_->core_.caller_env_->frame_allocator};
     433                 : 
     434              33 :         state_->runner_handles_[I].h = std::coroutine_handle<>{h};
     435              33 :         caller_ex.post(state_->runner_handles_[I]);
     436              66 :     }
     437                 : };
     438                 : 
     439                 : /** Shared state for homogeneous io_result-aware when_any (range overload).
     440                 : 
     441                 :     @tparam T The payload type extracted from io_result.
     442                 : */
     443                 : template<typename T>
     444                 : struct when_any_io_homogeneous_state
     445                 : {
     446                 :     when_any_core core_;
     447                 :     std::optional<T> result_;
     448                 :     std::unique_ptr<continuation[]> runner_handles_;
     449                 : 
     450                 :     std::mutex failure_mu_;
     451                 :     std::error_code last_error_;
     452                 :     std::exception_ptr last_exception_;
     453                 : 
     454              13 :     explicit when_any_io_homogeneous_state(std::size_t count)
     455              13 :         : core_(count)
     456              13 :         , runner_handles_(std::make_unique<continuation[]>(count))
     457                 :     {
     458              13 :     }
     459                 : 
     460               6 :     void record_error(std::error_code ec)
     461                 :     {
     462               6 :         std::lock_guard lk(failure_mu_);
     463               6 :         last_error_ = ec;
     464               6 :         last_exception_ = nullptr;
     465               6 :     }
     466                 : 
     467               4 :     void record_exception(std::exception_ptr ep)
     468                 :     {
     469               4 :         std::lock_guard lk(failure_mu_);
     470               4 :         last_exception_ = ep;
     471               4 :         last_error_ = {};
     472               4 :     }
     473                 : };
     474                 : 
     475                 : /** Specialization for void io_result children (no payload storage). */
     476                 : template<>
     477                 : struct when_any_io_homogeneous_state<std::tuple<>>
     478                 : {
     479                 :     when_any_core core_;
     480                 :     std::unique_ptr<continuation[]> runner_handles_;
     481                 : 
     482                 :     std::mutex failure_mu_;
     483                 :     std::error_code last_error_;
     484                 :     std::exception_ptr last_exception_;
     485                 : 
     486               3 :     explicit when_any_io_homogeneous_state(std::size_t count)
     487               3 :         : core_(count)
     488               3 :         , runner_handles_(std::make_unique<continuation[]>(count))
     489                 :     {
     490               3 :     }
     491                 : 
     492               1 :     void record_error(std::error_code ec)
     493                 :     {
     494               1 :         std::lock_guard lk(failure_mu_);
     495               1 :         last_error_ = ec;
     496               1 :         last_exception_ = nullptr;
     497               1 :     }
     498                 : 
     499               2 :     void record_exception(std::exception_ptr ep)
     500                 :     {
     501               2 :         std::lock_guard lk(failure_mu_);
     502               2 :         last_exception_ = ep;
     503               2 :         last_error_ = {};
     504               2 :     }
     505                 : };
     506                 : 
     507                 : /** Create an io_result-aware runner for homogeneous when_any (range path).
     508                 : 
     509                 :     Only tries to win when the child returns !ec.
     510                 : */
     511                 : template<IoAwaitable Awaitable, typename StateType>
     512                 : when_any_io_runner<StateType>
     513              54 : make_when_any_io_homogeneous_runner(
     514                 :     Awaitable inner, StateType* state, std::size_t index)
     515                 : {
     516                 :     auto result = co_await std::move(inner);
     517                 : 
     518                 :     if(!result.ec)
     519                 :     {
     520                 :         if(state->core_.try_win(index))
     521                 :         {
     522                 :             using PayloadT = io_result_payload_t<
     523                 :                 awaitable_result_t<Awaitable>>;
     524                 :             if constexpr (!std::is_same_v<PayloadT, std::tuple<>>)
     525                 :             {
     526                 :                 try
     527                 :                 {
     528                 :                     state->result_.emplace(
     529                 :                         extract_io_payload(std::move(result)));
     530                 :                 }
     531                 :                 catch(...)
     532                 :                 {
     533                 :                     state->core_.set_winner_exception(
     534                 :                         std::current_exception());
     535                 :                 }
     536                 :             }
     537                 :         }
     538                 :     }
     539                 :     else
     540                 :     {
     541                 :         state->record_error(result.ec);
     542                 :     }
     543             108 : }
     544                 : 
     545                 : /** Launches all io_result-aware homogeneous runners concurrently. */
     546                 : template<IoAwaitableRange Range>
     547                 : class when_any_io_homogeneous_launcher
     548                 : {
     549                 :     using Awaitable = std::ranges::range_value_t<Range>;
     550                 :     using PayloadT = io_result_payload_t<awaitable_result_t<Awaitable>>;
     551                 : 
     552                 :     Range* range_;
     553                 :     when_any_io_homogeneous_state<PayloadT>* state_;
     554                 : 
     555                 : public:
     556              16 :     when_any_io_homogeneous_launcher(
     557                 :         Range* range,
     558                 :         when_any_io_homogeneous_state<PayloadT>* state)
     559              16 :         : range_(range)
     560              16 :         , state_(state)
     561                 :     {
     562              16 :     }
     563                 : 
     564              16 :     bool await_ready() const noexcept
     565                 :     {
     566              16 :         return std::ranges::empty(*range_);
     567                 :     }
     568                 : 
     569              16 :     std::coroutine_handle<> await_suspend(
     570                 :         std::coroutine_handle<> continuation, io_env const* caller_env)
     571                 :     {
     572              16 :         state_->core_.continuation_.h = continuation;
     573              16 :         state_->core_.caller_env_ = caller_env;
     574                 : 
     575              16 :         if(caller_env->stop_token.stop_possible())
     576                 :         {
     577               4 :             state_->core_.parent_stop_callback_.emplace(
     578               2 :                 caller_env->stop_token,
     579               2 :                 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
     580                 : 
     581               2 :             if(caller_env->stop_token.stop_requested())
     582               1 :                 state_->core_.stop_source_.request_stop();
     583                 :         }
     584                 : 
     585              16 :         auto token = state_->core_.stop_source_.get_token();
     586                 : 
     587                 :         // Phase 1: Create all runners without dispatching.
     588              16 :         std::size_t index = 0;
     589              70 :         for(auto&& a : *range_)
     590                 :         {
     591              54 :             auto runner = make_when_any_io_homogeneous_runner(
     592              54 :                 std::move(a), state_, index);
     593                 : 
     594              54 :             auto h = runner.release();
     595              54 :             h.promise().state_ = state_;
     596              54 :             h.promise().index_ = index;
     597              54 :             h.promise().env_ = io_env{caller_env->executor, token,
     598              54 :                 caller_env->frame_allocator};
     599                 : 
     600              54 :             state_->runner_handles_[index].h = std::coroutine_handle<>{h};
     601              54 :             ++index;
     602                 :         }
     603                 : 
     604                 :         // Phase 2: Post all runners. Any may complete synchronously.
     605              16 :         auto* handles = state_->runner_handles_.get();
     606              16 :         std::size_t count = state_->core_.remaining_count_.load(std::memory_order_relaxed);
     607              70 :         for(std::size_t i = 0; i < count; ++i)
     608              54 :             caller_env->executor.post(handles[i]);
     609                 : 
     610              32 :         return std::noop_coroutine();
     611              70 :     }
     612                 : 
     613              16 :     void await_resume() const noexcept {}
     614                 : };
     615                 : 
     616                 : } // namespace detail
     617                 : 
     618                 : /** Race a range of io_result-returning awaitables (non-void payloads).
     619                 : 
     620                 :     Only a child returning !ec can win. Errors and exceptions do not
     621                 :     claim winner status. If all children fail, an unspecified one of
     622                 :     the failures is reported — either an error_code at variant index 0,
     623                 :     or a child's exception rethrown.
     624                 : 
     625                 :     @param awaitables Range of io_result-returning awaitables (must
     626                 :         not be empty).
     627                 : 
     628                 :     @return A task yielding variant<error_code, pair<size_t, PayloadT>>
     629                 :         where index 0 is failure and index 1 carries the winner's
     630                 :         index and payload.
     631                 : 
     632                 :     @throws std::invalid_argument if range is empty.
     633                 :     @throws Rethrows the winner's exception if extracting or
     634                 :         move-constructing the winning payload throws (a winner was
     635                 :         found but its result could not be produced).
     636                 :     @throws Rethrows a child's exception when all children fail and the
     637                 :         reported failure is an exception (which child is unspecified).
     638                 : 
     639                 :     @par Example
     640                 :     @code
     641                 :     task<void> example()
     642                 :     {
     643                 :         std::vector<io_task<size_t>> reads;
     644                 :         for (auto& buf : buffers)
     645                 :             reads.push_back(stream.read_some(buf));
     646                 : 
     647                 :         auto result = co_await when_any(std::move(reads));
     648                 :         if (result.index() == 1)
     649                 :         {
     650                 :             auto [idx, n] = std::get<1>(result);
     651                 :         }
     652                 :     }
     653                 :     @endcode
     654                 : 
     655                 :     @see IoAwaitableRange, when_any
     656                 : */
     657                 : template<IoAwaitableRange R>
     658                 :     requires detail::is_io_result_v<
     659                 :         awaitable_result_t<std::ranges::range_value_t<R>>>
     660                 :     && (!std::is_same_v<
     661                 :             detail::io_result_payload_t<
     662                 :                 awaitable_result_t<std::ranges::range_value_t<R>>>,
     663                 :             std::tuple<>>)
     664              14 : [[nodiscard]] auto when_any(R&& awaitables)
     665                 :     -> task<std::variant<std::error_code,
     666                 :         std::pair<std::size_t,
     667                 :             detail::io_result_payload_t<
     668                 :                 awaitable_result_t<std::ranges::range_value_t<R>>>>>>
     669                 : {
     670                 :     using Awaitable = std::ranges::range_value_t<R>;
     671                 :     using PayloadT = detail::io_result_payload_t<
     672                 :         awaitable_result_t<Awaitable>>;
     673                 :     using result_type = std::variant<std::error_code,
     674                 :         std::pair<std::size_t, PayloadT>>;
     675                 :     using OwnedRange = std::remove_cvref_t<R>;
     676                 : 
     677                 :     auto count = std::ranges::size(awaitables);
     678                 :     if(count == 0)
     679                 :         throw std::invalid_argument("when_any requires at least one awaitable");
     680                 : 
     681                 :     OwnedRange owned_awaitables = std::forward<R>(awaitables);
     682                 : 
     683                 :     detail::when_any_io_homogeneous_state<PayloadT> state(count);
     684                 : 
     685                 :     co_await detail::when_any_io_homogeneous_launcher<OwnedRange>(
     686                 :         &owned_awaitables, &state);
     687                 : 
     688                 :     // Winner found
     689                 :     if(state.core_.has_winner_.load(std::memory_order_acquire))
     690                 :     {
     691                 :         if(state.core_.winner_exception_)
     692                 :             std::rethrow_exception(state.core_.winner_exception_);
     693                 :         co_return result_type{std::in_place_index<1>,
     694                 :             std::pair{state.core_.winner_index_, std::move(*state.result_)}};
     695                 :     }
     696                 : 
     697                 :     // No winner — report the recorded failure
     698                 :     if(state.last_exception_)
     699                 :         std::rethrow_exception(state.last_exception_);
     700                 :     co_return result_type{std::in_place_index<0>, state.last_error_};
     701              28 : }
     702                 : 
     703                 : /** Race a range of void io_result-returning awaitables.
     704                 : 
     705                 :     Only a child returning !ec can win. Returns the winner's index
     706                 :     at variant index 1, or error_code at index 0 on all-fail.
     707                 : 
     708                 :     @param awaitables Range of io_result<>-returning awaitables (must
     709                 :         not be empty).
     710                 : 
     711                 :     @return A task yielding variant<error_code, size_t> where index 0
     712                 :         is failure and index 1 carries the winner's index.
     713                 : 
     714                 :     @throws std::invalid_argument if range is empty.
     715                 :     @throws Rethrows a child's exception when all children fail and the
     716                 :         reported failure is an exception (which child is unspecified).
     717                 : 
     718                 :     @par Example
     719                 :     @code
     720                 :     task<void> example()
     721                 :     {
     722                 :         std::vector<io_task<>> jobs;
     723                 :         jobs.push_back(background_work_a());
     724                 :         jobs.push_back(background_work_b());
     725                 : 
     726                 :         auto result = co_await when_any(std::move(jobs));
     727                 :         if (result.index() == 1)
     728                 :         {
     729                 :             auto winner = std::get<1>(result);
     730                 :         }
     731                 :     }
     732                 :     @endcode
     733                 : 
     734                 :     @see IoAwaitableRange, when_any
     735                 : */
     736                 : template<IoAwaitableRange R>
     737                 :     requires detail::is_io_result_v<
     738                 :         awaitable_result_t<std::ranges::range_value_t<R>>>
     739                 :     && std::is_same_v<
     740                 :             detail::io_result_payload_t<
     741                 :                 awaitable_result_t<std::ranges::range_value_t<R>>>,
     742                 :             std::tuple<>>
     743               3 : [[nodiscard]] auto when_any(R&& awaitables)
     744                 :     -> task<std::variant<std::error_code, std::size_t>>
     745                 : {
     746                 :     using OwnedRange = std::remove_cvref_t<R>;
     747                 :     using result_type = std::variant<std::error_code, std::size_t>;
     748                 : 
     749                 :     auto count = std::ranges::size(awaitables);
     750                 :     if(count == 0)
     751                 :         throw std::invalid_argument("when_any requires at least one awaitable");
     752                 : 
     753                 :     OwnedRange owned_awaitables = std::forward<R>(awaitables);
     754                 : 
     755                 :     detail::when_any_io_homogeneous_state<std::tuple<>> state(count);
     756                 : 
     757                 :     co_await detail::when_any_io_homogeneous_launcher<OwnedRange>(
     758                 :         &owned_awaitables, &state);
     759                 : 
     760                 :     // Winner found
     761                 :     if(state.core_.has_winner_.load(std::memory_order_acquire))
     762                 :     {
     763                 :         if(state.core_.winner_exception_)
     764                 :             std::rethrow_exception(state.core_.winner_exception_);
     765                 :         co_return result_type{std::in_place_index<1>,
     766                 :             state.core_.winner_index_};
     767                 :     }
     768                 : 
     769                 :     // No winner — report the recorded failure
     770                 :     if(state.last_exception_)
     771                 :         std::rethrow_exception(state.last_exception_);
     772                 :     co_return result_type{std::in_place_index<0>, state.last_error_};
     773               6 : }
     774                 : 
     775                 : /** Race io_result-returning awaitables, selecting the first success.
     776                 : 
     777                 :     Overload selected when all children return io_result<Ts...>.
     778                 :     Only a child returning !ec can win. Errors and exceptions do
     779                 :     not claim winner status.
     780                 : 
     781                 :     @param as The awaitables to race. Each must satisfy @ref
     782                 :         IoAwaitable and is consumed (moved-from) when `when_any`
     783                 :         is awaited.
     784                 : 
     785                 :     @return A task yielding variant<error_code, R1, ..., Rn> where
     786                 :         index 0 is the failure/no-winner case and index i+1
     787                 :         identifies the winning child. On all-fail, index 0 holds
     788                 :         an error_code from one of the failed children (unspecified
     789                 :         which; no priority between errors and exceptions).
     790                 : 
     791                 :     @throws Rethrows the winner's exception if extracting or
     792                 :         constructing the winning payload throws (a winner was found
     793                 :         but its result could not be produced).
     794                 :     @throws Rethrows a child's exception when all children fail and the
     795                 :         reported failure is an exception (which child is unspecified).
     796                 : 
     797                 :     @note A failing child does not cancel its siblings; `when_any`
     798                 :         waits for a success or for every child to finish. To make a
     799                 :         benign error (e.g. @c cond::canceled) count as a win, wrap
     800                 :         the child to translate the error into success. See the
     801                 :         Concurrent Composition tutorial.
     802                 : */
     803                 : template<IoAwaitable... As>
     804                 :     requires (sizeof...(As) > 0)
     805                 :           && detail::all_io_result_awaitables<As...>
     806              18 : [[nodiscard]] auto when_any(As... as)
     807                 :     -> task<std::variant<
     808                 :         std::error_code,
     809                 :         detail::io_result_payload_t<awaitable_result_t<As>>...>>
     810                 : {
     811                 :     using result_type = std::variant<
     812                 :         std::error_code,
     813                 :         detail::io_result_payload_t<awaitable_result_t<As>>...>;
     814                 : 
     815                 :     detail::when_any_io_state<
     816                 :         detail::io_result_payload_t<awaitable_result_t<As>>...> state;
     817                 :     std::tuple<As...> awaitable_tuple(std::move(as)...);
     818                 : 
     819                 :     co_await detail::when_any_io_launcher<As...>(
     820                 :         &awaitable_tuple, &state);
     821                 : 
     822                 :     // Winner found: return their result
     823                 :     if(state.result_.has_value())
     824                 :         co_return std::move(*state.result_);
     825                 : 
     826                 :     // Winner claimed but payload construction failed
     827                 :     if(state.core_.winner_exception_)
     828                 :         std::rethrow_exception(state.core_.winner_exception_);
     829                 : 
     830                 :     // No winner — report the recorded failure
     831                 :     if(state.last_exception_)
     832                 :         std::rethrow_exception(state.last_exception_);
     833                 :     co_return result_type{std::in_place_index<0>, state.last_error_};
     834              36 : }
     835                 : 
     836                 : } // namespace capy
     837                 : } // namespace boost
     838                 : 
     839                 : #endif
        

Generated by: LCOV version 2.3