TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Michael Vandeberg
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_DELAY_HPP
11 : #define BOOST_CAPY_DELAY_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/continuation.hpp>
15 : #include <boost/capy/error.hpp>
16 : #include <boost/capy/ex/executor_ref.hpp>
17 : #include <boost/capy/ex/io_env.hpp>
18 : #include <boost/capy/ex/detail/timer_service.hpp>
19 : #include <boost/capy/io_result.hpp>
20 :
21 : #include <atomic>
22 : #include <chrono>
23 : #include <coroutine>
24 : #include <new>
25 : #include <stop_token>
26 : #include <utility>
27 :
28 : namespace boost {
29 : namespace capy {
30 :
31 : /** IoAwaitable returned by @ref delay.
32 :
33 : Suspends the calling coroutine until the deadline elapses
34 : or the environment's stop token is activated, whichever
35 : comes first. Resumption is always posted through the
36 : executor, never inline on the timer thread.
37 :
38 : Not intended to be named directly; use the @ref delay
39 : factory function instead.
40 :
41 : @par Return Value
42 :
43 : Returns `io_result<>{}` (no error) when the timer fires
44 : normally, or `io_result<>{error::canceled}` when
45 : cancellation claims the resume before the deadline.
46 :
47 : @par Cancellation
48 :
49 : If `stop_requested()` is true before suspension, the
50 : coroutine resumes immediately without scheduling a timer
51 : and returns `io_result<>{error::canceled}`. If stop is
52 : requested while suspended, the stop callback claims the
53 : resume and posts it through the executor; the pending
54 : timer is cancelled on the next `await_resume` or
55 : destructor call.
56 :
57 : @par Thread Safety
58 :
59 : A single `delay_awaitable` must not be awaited concurrently.
60 : Multiple independent `delay()` calls on the same
61 : execution_context are safe and share one timer thread.
62 :
63 : @see delay, timeout
64 : */
65 : class delay_awaitable
66 : {
67 : std::chrono::nanoseconds dur_;
68 :
69 : detail::timer_service* ts_ = nullptr;
70 : detail::timer_service::timer_id tid_ = 0;
71 :
72 : // Declared before stop_cb_buf_: the callback
73 : // accesses these members, so they must still be
74 : // alive if the stop_cb_ destructor blocks.
75 : continuation cont_;
76 : std::atomic<bool> claimed_{false};
77 : bool canceled_ = false;
78 : bool stop_cb_active_ = false;
79 :
80 : struct cancel_fn
81 : {
82 : delay_awaitable* self_;
83 : executor_ref ex_;
84 :
85 HIT 2 : void operator()() const noexcept
86 : {
87 2 : if(!self_->claimed_.exchange(
88 : true, std::memory_order_acq_rel))
89 : {
90 2 : self_->canceled_ = true;
91 2 : ex_.post(self_->cont_);
92 : }
93 2 : }
94 : };
95 :
96 : using stop_cb_t = std::stop_callback<cancel_fn>;
97 :
98 : // Aligned storage for the stop callback.
99 : // Declared last: its destructor may block while
100 : // the callback accesses the members above.
101 : BOOST_CAPY_MSVC_WARNING_PUSH
102 : BOOST_CAPY_MSVC_WARNING_DISABLE(4324)
103 : alignas(stop_cb_t)
104 : unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
105 : BOOST_CAPY_MSVC_WARNING_POP
106 :
107 20 : stop_cb_t& stop_cb_() noexcept
108 : {
109 20 : return *reinterpret_cast<stop_cb_t*>(stop_cb_buf_);
110 : }
111 :
112 : public:
113 : /// Construct an awaitable that waits for `dur` nanoseconds.
114 : /// Prefer the @ref delay factory over constructing directly.
115 29 : explicit delay_awaitable(std::chrono::nanoseconds dur) noexcept
116 29 : : dur_(dur)
117 : {
118 29 : }
119 :
120 : /// @pre The stop callback must not be active
121 : /// (i.e. the object has not yet been awaited).
122 46 : delay_awaitable(delay_awaitable&& o) noexcept
123 46 : : dur_(o.dur_)
124 46 : , ts_(o.ts_)
125 46 : , tid_(o.tid_)
126 46 : , cont_(o.cont_)
127 46 : , claimed_(o.claimed_.load(std::memory_order_relaxed))
128 46 : , canceled_(o.canceled_)
129 46 : , stop_cb_active_(std::exchange(o.stop_cb_active_, false))
130 : {
131 46 : }
132 :
133 : /// Tear down any registered stop callback and cancel the
134 : /// pending timer if one is still scheduled.
135 75 : ~delay_awaitable()
136 : {
137 75 : if(stop_cb_active_)
138 2 : stop_cb_().~stop_cb_t();
139 75 : if(ts_)
140 20 : ts_->cancel(tid_);
141 75 : }
142 :
143 : delay_awaitable(delay_awaitable const&) = delete;
144 : delay_awaitable& operator=(delay_awaitable const&) = delete;
145 : delay_awaitable& operator=(delay_awaitable&&) = delete;
146 :
147 : /// Return true for zero or negative durations, completing
148 : /// synchronously without scheduling a timer.
149 28 : bool await_ready() const noexcept
150 : {
151 28 : return dur_.count() <= 0;
152 : }
153 :
154 : /// Suspend the coroutine, scheduling the timer and a stop
155 : /// callback on the environment's executor and stop token.
156 : /// Resumes `h` immediately if stop was already requested.
157 : std::coroutine_handle<>
158 26 : await_suspend(
159 : std::coroutine_handle<> h,
160 : io_env const* env) noexcept
161 : {
162 : // Already stopped: resume immediately
163 26 : if(env->stop_token.stop_requested())
164 : {
165 6 : canceled_ = true;
166 6 : return h;
167 : }
168 :
169 20 : cont_.h = h;
170 20 : ts_ = &env->executor.context().use_service<detail::timer_service>();
171 :
172 : // Schedule timer (won't fire inline since deadline is in the future)
173 20 : tid_ = ts_->schedule_after(dur_,
174 20 : [this, ex = env->executor]()
175 : {
176 17 : if(!claimed_.exchange(
177 : true, std::memory_order_acq_rel))
178 : {
179 17 : ex.post(cont_);
180 : }
181 17 : });
182 :
183 : // Register stop callback (may fire inline)
184 60 : ::new(stop_cb_buf_) stop_cb_t(
185 20 : env->stop_token,
186 20 : cancel_fn{this, env->executor});
187 20 : stop_cb_active_ = true;
188 :
189 20 : return std::noop_coroutine();
190 : }
191 :
192 : /// Clean up the stop callback and timer, then return
193 : /// `io_result<>{error::canceled}` if cancellation claimed
194 : /// the resume, or an empty `io_result<>` otherwise.
195 27 : io_result<> await_resume() noexcept
196 : {
197 27 : if(stop_cb_active_)
198 : {
199 18 : stop_cb_().~stop_cb_t();
200 18 : stop_cb_active_ = false;
201 : }
202 27 : if(ts_)
203 18 : ts_->cancel(tid_);
204 27 : if(canceled_)
205 7 : return io_result<>{make_error_code(error::canceled)};
206 20 : return io_result<>{};
207 : }
208 : };
209 :
210 : /** Suspend the current coroutine for a duration.
211 :
212 : Returns an IoAwaitable that completes at or after the
213 : specified duration, or earlier if the environment's stop
214 : token is activated.
215 :
216 : Zero or negative durations complete synchronously without
217 : scheduling a timer.
218 :
219 : @par Example
220 : @code
221 : auto [ec] = co_await delay(std::chrono::milliseconds(100));
222 : @endcode
223 :
224 : @param dur The duration to wait.
225 :
226 : @return A @ref delay_awaitable whose `await_resume`
227 : returns `io_result<>`. On normal completion, `ec`
228 : is clear. On cancellation, `ec == error::canceled`.
229 :
230 : @throws Nothing.
231 :
232 : @see timeout, delay_awaitable
233 : */
234 : template<typename Rep, typename Period>
235 : delay_awaitable
236 28 : delay(std::chrono::duration<Rep, Period> dur) noexcept
237 : {
238 : return delay_awaitable{
239 28 : std::chrono::duration_cast<std::chrono::nanoseconds>(dur)};
240 : }
241 :
242 : } // capy
243 : } // boost
244 :
245 : #endif
|