94.78% Lines (109/115) 95.00% Functions (19/20)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2 - // Copyright (c) 2026 Michael Vandeberg  
3   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) 2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
4   // 3   //
5   // Distributed under the Boost Software License, Version 1.0. (See accompanying 4   // Distributed under the Boost Software License, Version 1.0. (See accompanying
6   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7   // 6   //
8   // Official repository: https://github.com/cppalliance/capy 7   // Official repository: https://github.com/cppalliance/capy
9   // 8   //
10   9  
11   #ifndef BOOST_CAPY_TEST_STREAM_HPP 10   #ifndef BOOST_CAPY_TEST_STREAM_HPP
12   #define BOOST_CAPY_TEST_STREAM_HPP 11   #define BOOST_CAPY_TEST_STREAM_HPP
13   12  
14   #include <boost/capy/detail/config.hpp> 13   #include <boost/capy/detail/config.hpp>
15   #include <boost/capy/buffers.hpp> 14   #include <boost/capy/buffers.hpp>
16   #include <boost/capy/buffers/buffer_copy.hpp> 15   #include <boost/capy/buffers/buffer_copy.hpp>
17   #include <boost/capy/buffers/make_buffer.hpp> 16   #include <boost/capy/buffers/make_buffer.hpp>
18   #include <boost/capy/continuation.hpp> 17   #include <boost/capy/continuation.hpp>
19   #include <coroutine> 18   #include <coroutine>
20   #include <boost/capy/ex/io_env.hpp> 19   #include <boost/capy/ex/io_env.hpp>
21   #include <boost/capy/io_result.hpp> 20   #include <boost/capy/io_result.hpp>
22   #include <boost/capy/error.hpp> 21   #include <boost/capy/error.hpp>
23   #include <boost/capy/read.hpp> 22   #include <boost/capy/read.hpp>
24   #include <boost/capy/task.hpp> 23   #include <boost/capy/task.hpp>
25   #include <boost/capy/test/fuse.hpp> 24   #include <boost/capy/test/fuse.hpp>
26   #include <boost/capy/test/run_blocking.hpp> 25   #include <boost/capy/test/run_blocking.hpp>
27 - #include <atomic>  
28   26  
29 - #include <new>  
30   #include <memory> 27   #include <memory>
31   #include <stop_token> 28   #include <stop_token>
32   #include <string> 29   #include <string>
33   #include <string_view> 30   #include <string_view>
34   #include <utility> 31   #include <utility>
35   32  
36   namespace boost { 33   namespace boost {
37   namespace capy { 34   namespace capy {
38   namespace test { 35   namespace test {
39   36  
40   /** A connected stream for testing bidirectional I/O. 37   /** A connected stream for testing bidirectional I/O.
41   38  
42   Streams are created in pairs via @ref make_stream_pair. 39   Streams are created in pairs via @ref make_stream_pair.
43   Data written to one end becomes available for reading on 40   Data written to one end becomes available for reading on
44   the other. If no data is available when @ref read_some 41   the other. If no data is available when @ref read_some
45   is called, the calling coroutine suspends until the peer 42   is called, the calling coroutine suspends until the peer
46   calls @ref write_some. The shared @ref fuse enables error 43   calls @ref write_some. The shared @ref fuse enables error
47   injection at controlled points in both directions. 44   injection at controlled points in both directions.
48   45  
49   When the fuse injects an error or throws on one end, the 46   When the fuse injects an error or throws on one end, the
50   other end is automatically closed: any suspended reader is 47   other end is automatically closed: any suspended reader is
51   resumed with `error::eof`, and subsequent operations on 48   resumed with `error::eof`, and subsequent operations on
52   both ends return `error::eof`. Calling @ref close on one 49   both ends return `error::eof`. Calling @ref close on one
53   end signals eof to the peer's reads after draining any 50   end signals eof to the peer's reads after draining any
54   buffered data, while the peer may still write. 51   buffered data, while the peer may still write.
55   52  
56   @par Thread Safety 53   @par Thread Safety
57   Single-threaded only. Both ends of the pair must be 54   Single-threaded only. Both ends of the pair must be
58   accessed from the same thread. Concurrent access is 55   accessed from the same thread. Concurrent access is
59   undefined behavior. 56   undefined behavior.
60   57  
61   @par Example 58   @par Example
62   @code 59   @code
63   fuse f; 60   fuse f;
64   auto [a, b] = make_stream_pair( f ); 61   auto [a, b] = make_stream_pair( f );
65   62  
66   auto r = f.armed( [&]( fuse& ) -> task<> { 63   auto r = f.armed( [&]( fuse& ) -> task<> {
67   auto [ec, n] = co_await a.write_some( 64   auto [ec, n] = co_await a.write_some(
68   const_buffer( "hello", 5 ) ); 65   const_buffer( "hello", 5 ) );
69   if( ec ) 66   if( ec )
70   co_return; 67   co_return;
71   68  
72   char buf[32]; 69   char buf[32];
73   auto [ec2, n2] = co_await b.read_some( 70   auto [ec2, n2] = co_await b.read_some(
74   mutable_buffer( buf, sizeof( buf ) ) ); 71   mutable_buffer( buf, sizeof( buf ) ) );
75   if( ec2 ) 72   if( ec2 )
76   co_return; 73   co_return;
77   // buf contains "hello" 74   // buf contains "hello"
78   } ); 75   } );
79   @endcode 76   @endcode
80   77  
81   @see make_stream_pair, fuse 78   @see make_stream_pair, fuse
82   */ 79   */
83   class stream 80   class stream
84   { 81   {
85   // Single-threaded only. No concurrent access to either 82   // Single-threaded only. No concurrent access to either
86   // end of the pair. Both streams and all operations must 83   // end of the pair. Both streams and all operations must
87   // run on the same thread. 84   // run on the same thread.
88   85  
89   struct half 86   struct half
90   { 87   {
91   std::string buf; 88   std::string buf;
92   std::size_t max_read_size = std::size_t(-1); 89   std::size_t max_read_size = std::size_t(-1);
93   continuation pending_cont_; 90   continuation pending_cont_;
94 - // Points at the suspended reader's claim flag (owned by the  
95 - // read awaitable). Lets a peer wake coordinate with a stop  
96 - // callback so the parked read is resumed exactly once.  
97 - std::atomic<bool>* pending_claimed = nullptr;  
98   executor_ref pending_ex; 91   executor_ref pending_ex;
99   bool eof = false; 92   bool eof = false;
100   }; 93   };
101   94  
102   struct state 95   struct state
103   { 96   {
104   fuse f; 97   fuse f;
105   bool closed = false; 98   bool closed = false;
106   half sides[2]; 99   half sides[2];
107   100  
HITCBC 108   286 explicit state(fuse f_) noexcept 101   280 explicit state(fuse f_) noexcept
HITCBC 109   858 : f(std::move(f_)) 102   840 : f(std::move(f_))
110   { 103   {
HITCBC 111   286 } 104   280 }
112 - // Resume a suspended reader on this side, if any. Claims the  
113 - // reader's atomic so it is never double-resumed by a racing  
114 - // stop callback; the loser of the race skips the post.  
115 - static void wake(half& side)  
DCB 116 - 662 {  
117 - if(! side.pending_cont_.h)  
DCB 118 - 662 return;  
DCB 119 - 637 if(! side.pending_claimed ||  
DCB 120 - 50 ! side.pending_claimed->exchange(  
DCB 121 - 25 true, std::memory_order_acq_rel))  
122 - {  
123 - side.pending_ex.post(side.pending_cont_);  
DCB 124 - 25 }  
125 - side.pending_cont_.h = {};  
DCB 126 - 25 side.pending_ex = {};  
DCB 127 - 25 side.pending_claimed = nullptr;  
DCB 128 - 25 }  
129 -  
130   105  
131   // Set closed and resume any suspended readers 106   // Set closed and resume any suspended readers
132   // with eof on both sides. 107   // with eof on both sides.
HITCBC 133   208 void close() 108   208 void close()
134   { 109   {
HITCBC 135   208 closed = true; 110   208 closed = true;
HITCBC 136   624 for(auto& side : sides) 111   624 for(auto& side : sides)
ECB 137 - 416 wake(side); 112 + {
HITGNC   113 + 416 if(side.pending_cont_.h)
  114 + {
HITGNC   115 + 12 side.pending_ex.post(side.pending_cont_);
HITGNC   116 + 12 side.pending_cont_.h = {};
HITGNC   117 + 12 side.pending_ex = {};
  118 + }
  119 + }
HITCBC 138   208 } 120   208 }
139   }; 121   };
140   122  
141   // Wraps the maybe_fail() call. If the guard is 123   // Wraps the maybe_fail() call. If the guard is
142   // not disarmed before destruction (fuse returned 124   // not disarmed before destruction (fuse returned
143   // an error, or threw an exception), closes both 125   // an error, or threw an exception), closes both
144   // ends so any suspended peer gets eof. 126   // ends so any suspended peer gets eof.
145   struct close_guard 127   struct close_guard
146   { 128   {
147   state* st; 129   state* st;
148   bool armed = true; 130   bool armed = true;
HITCBC 149   301 void disarm() noexcept { armed = false; } 131   300 void disarm() noexcept { armed = false; }
HITCBC 150   509 ~close_guard() noexcept(false) { if(armed) st->close(); } 132   508 ~close_guard() noexcept(false) { if(armed) st->close(); }
151   }; 133   };
152   134  
153   std::shared_ptr<state> state_; 135   std::shared_ptr<state> state_;
154   int index_; 136   int index_;
155   137  
HITCBC 156   572 stream( 138   560 stream(
157   std::shared_ptr<state> sp, 139   std::shared_ptr<state> sp,
158   int index) noexcept 140   int index) noexcept
HITCBC 159   572 : state_(std::move(sp)) 141   560 : state_(std::move(sp))
HITCBC 160   572 , index_(index) 142   560 , index_(index)
161   { 143   {
HITCBC 162   572 } 144   560 }
163   145  
164   friend std::pair<stream, stream> 146   friend std::pair<stream, stream>
165   make_stream_pair(fuse); 147   make_stream_pair(fuse);
166   148  
167   public: 149   public:
168   stream(stream const&) = delete; 150   stream(stream const&) = delete;
169   stream& operator=(stream const&) = delete; 151   stream& operator=(stream const&) = delete;
HITCBC 170   672 stream(stream&&) = default; 152   660 stream(stream&&) = default;
171   stream& operator=(stream&&) = default; 153   stream& operator=(stream&&) = default;
172   154  
173   /** Signal end-of-stream to the peer. 155   /** Signal end-of-stream to the peer.
174   156  
175   Marks the peer's read direction as closed. 157   Marks the peer's read direction as closed.
176   If the peer is suspended in @ref read_some, 158   If the peer is suspended in @ref read_some,
177   it is resumed. The peer drains any buffered 159   it is resumed. The peer drains any buffered
178   data before receiving `error::eof`. Writes 160   data before receiving `error::eof`. Writes
179   from the peer are unaffected. 161   from the peer are unaffected.
180   */ 162   */
181   void 163   void
HITCBC 182   3 close() 164   3 close()
183   { 165   {
HITCBC 184   3 int peer = 1 - index_; 166   3 int peer = 1 - index_;
HITCBC 185   3 auto& side = state_->sides[peer]; 167   3 auto& side = state_->sides[peer];
HITCBC 186   3 side.eof = true; 168   3 side.eof = true;
HITCBC 187 - 3 state::wake(side); 169 + 3 if(side.pending_cont_.h)
  170 + {
HITGNC   171 + 1 side.pending_ex.post(side.pending_cont_);
HITGNC   172 + 1 side.pending_cont_.h = {};
HITGNC   173 + 1 side.pending_ex = {};
  174 + }
HITCBC 188   3 } 175   3 }
189   176  
190   /** Set the maximum bytes returned per read. 177   /** Set the maximum bytes returned per read.
191   178  
192   Limits how many bytes @ref read_some returns in 179   Limits how many bytes @ref read_some returns in
193   a single call, simulating chunked network delivery. 180   a single call, simulating chunked network delivery.
194   The default is unlimited. 181   The default is unlimited.
195   182  
196   @param n Maximum bytes per read. 183   @param n Maximum bytes per read.
197   */ 184   */
198   void 185   void
HITCBC 199   54 set_max_read_size(std::size_t n) noexcept 186   54 set_max_read_size(std::size_t n) noexcept
200   { 187   {
HITCBC 201   54 state_->sides[index_].max_read_size = n; 188   54 state_->sides[index_].max_read_size = n;
HITCBC 202   54 } 189   54 }
203   190  
204   /** Asynchronously read data from the stream. 191   /** Asynchronously read data from the stream.
205   192  
206   Transfers up to `buffer_size(buffers)` bytes from 193   Transfers up to `buffer_size(buffers)` bytes from
207   data written by the peer. If no data is available, 194   data written by the peer. If no data is available,
208   the calling coroutine suspends until the peer calls 195   the calling coroutine suspends until the peer calls
209   @ref write_some. Before every read, the attached 196   @ref write_some. Before every read, the attached
210   @ref fuse is consulted to possibly inject an error. 197   @ref fuse is consulted to possibly inject an error.
211   If the fuse fires, the peer is automatically closed. 198   If the fuse fires, the peer is automatically closed.
212   If the stream is closed, returns `error::eof`. 199   If the stream is closed, returns `error::eof`.
213   The returned `std::size_t` is the number of bytes 200   The returned `std::size_t` is the number of bytes
214   transferred. 201   transferred.
215   202  
216   @param buffers The mutable buffer sequence to receive data. 203   @param buffers The mutable buffer sequence to receive data.
217   204  
218   @return An awaitable that await-returns `(error_code,std::size_t)`. 205   @return An awaitable that await-returns `(error_code,std::size_t)`.
219 - @par Cancellation  
220 - Cancellation applies only to a read that would otherwise suspend:  
221 - if no data is available and the environment's stop token is  
222 - requested (before or during the wait), the read resumes with  
223 - `error::canceled`. A read that can complete immediately from  
224 - buffered data is unaffected by the stop token.  
225 -  
226   206  
227   @see fuse, close 207   @see fuse, close
228   */ 208   */
229   template<MutableBufferSequence MB> 209   template<MutableBufferSequence MB>
230   auto 210   auto
HITCBC 231   280 read_some(MB buffers) 211   275 read_some(MB buffers)
232 - // The read suspends when no data is available, parking its  
233 - // continuation on the side until the peer writes/closes. To  
234 - // support cancellation it follows the same pattern as  
235 - // delay_awaitable: a stop callback claims the resume (racing  
236 - // the peer wake via an atomic) and posts the continuation  
237 - // through the executor. Because it owns a std::atomic and a  
238 - // std::stop_callback, the awaitable needs explicit move and  
239 - // destruction (the task promise moves it into its  
240 - // transform_awaiter before awaiting).  
241   { 212   {
242   struct awaitable 213   struct awaitable
243   { 214   {
244   stream* self_; 215   stream* self_;
245   MB buffers_; 216   MB buffers_;
246 - // Declared before stop_cb_buf_: the stop callback reads  
247 - // these, so they must outlive a blocking stop_cb_ destructor.  
248 - continuation cont_;  
249 - executor_ref ex_;  
250 - half* side_ = nullptr;  
251 - std::atomic<bool> claimed_{false};  
252 - bool canceled_ = false;  
253 - bool stop_cb_active_ = false;  
254 -  
255 - struct cancel_fn  
256 - {  
257 - awaitable* self_;  
258 -  
259 - void operator()() const noexcept  
DCB 260 - 15 {  
261 - if(! self_->claimed_.exchange(  
DCB 262 - 15 true, std::memory_order_acq_rel))  
263 - {  
264 - self_->canceled_ = true;  
DCB 265 - 3 self_->ex_.post(self_->cont_);  
DCB 266 - 3 }  
267 - }  
DCB 268 - 15 };  
269 -  
270 - using stop_cb_t = std::stop_callback<cancel_fn>;  
271 -  
272 - // Declared last: its destructor may block while the callback  
273 - // accesses the members above. A union gives correct alignment  
274 - // for stop_cb_t without an alignas specifier, which avoids  
275 - // MSVC's C4324 padding warning on this function-local class  
276 - // (the member-level pragma used by delay_awaitable does not  
277 - // suppress it here). Lifetime is managed manually: placement  
278 - // new in await_suspend, explicit destruction once done.  
279 - union { stop_cb_t stop_cb_; };  
280 -  
281 - awaitable(stream* self, MB buffers) noexcept  
DCB 282 - 280 : self_(self)  
DCB 283 - 280 , buffers_(buffers)  
DCB 284 - 280 {  
285 - }  
DCB 286 - 280  
287 - /// @pre Not yet awaited (no active stop callback).  
288 - awaitable(awaitable&& o) noexcept  
DCB 289 - 280 : self_(o.self_)  
DCB 290 - 280 , buffers_(o.buffers_)  
DCB 291 - 280 , cont_(o.cont_)  
DCB 292 - 280 , ex_(o.ex_)  
DCB 293 - 280 , side_(o.side_)  
DCB 294 - 280 , claimed_(o.claimed_.load(std::memory_order_relaxed))  
DCB 295 - 280 , canceled_(o.canceled_)  
DCB 296 - 280 , stop_cb_active_(std::exchange(o.stop_cb_active_, false))  
DCB 297 - 280 {  
298 - }  
DCB 299 - 280  
300 - ~awaitable()  
DCB 301 - 560 {  
302 - if(stop_cb_active_)  
DCB 303 - 560 stop_cb_.~stop_cb_t();  
DCB 304 - 1 // Unlink from the side if still parked (e.g. the  
305 - // coroutine was destroyed while suspended), so a later  
306 - // peer wake does not dereference a freed claim flag.  
307 - if(side_ && side_->pending_claimed == &claimed_)  
DCB 308 - 560 {  
309 - side_->pending_cont_.h = {};  
DCB 310 - 1 side_->pending_ex = {};  
DCB 311 - 1 side_->pending_claimed = nullptr;  
DCB 312 - 1 }  
313 - }  
DCB 314 - 560  
315 - awaitable(awaitable const&) = delete;  
316 - awaitable& operator=(awaitable const&) = delete;  
317 - awaitable& operator=(awaitable&&) = delete;  
318 -  
319   217  
HITCBC 320   280 bool await_ready() const noexcept 218   275 bool await_ready() const noexcept
321   { 219   {
HITCBC 322   280 if(buffer_empty(buffers_)) 220   275 if(buffer_empty(buffers_))
HITCBC 323   8 return true; 221   8 return true;
HITCBC 324   272 auto* st = self_->state_.get(); 222   267 auto* st = self_->state_.get();
HITCBC 325   272 auto& side = st->sides[self_->index_]; 223   267 auto& side = st->sides[self_->index_];
HITCBC 326   542 return st->closed || side.eof || 224   532 return st->closed || side.eof ||
HITCBC 327   542 !side.buf.empty(); 225   532 !side.buf.empty();
328   } 226   }
329   227  
HITCBC 330   29 std::coroutine_handle<> await_suspend( 228   25 std::coroutine_handle<> await_suspend(
331   std::coroutine_handle<> h, 229   std::coroutine_handle<> h,
332   io_env const* env) noexcept 230   io_env const* env) noexcept
333 - // Park the continuation, then register the stop callback.  
334 - // If stop is already requested, the callback fires inline  
335 - // during construction: it claims the resume and posts the  
336 - // continuation through the executor (never a symmetric  
337 - // self-transfer, which would leak this frame under  
338 - // run_async). The parked read is then resumed with  
339 - // error::canceled by the run loop.  
340   { 231   {
HITCBC 341   29 auto& side = self_->state_->sides[ 232   25 auto& side = self_->state_->sides[
DCB 342 - 29 cont_.h = h;  
DCB 343 - 29 ex_ = env->executor;  
DCB 344 - 29 side_ = &side;  
HITCBC 345   29 self_->index_]; 233   25 self_->index_];
HITCBC 346   29 side.pending_cont_.h = h; 234   25 side.pending_cont_.h = h;
DCB 347 - 29 side.pending_claimed = &claimed_;  
DCB 348 - 29  
349 - ::new(static_cast<void*>(&stop_cb_)) stop_cb_t(  
DCB 350 - 29 env->stop_token, cancel_fn{this});  
DCB 351 - 29 stop_cb_active_ = true;  
DCB 352 - 29  
HITGIC 353   side.pending_ex = env->executor; 235   25 side.pending_ex = env->executor;
HITCBC 354   29 return std::noop_coroutine(); 236   25 return std::noop_coroutine();
355   } 237   }
356   238  
357   io_result<std::size_t> 239   io_result<std::size_t>
HITCBC 358   279 await_resume() 240   275 await_resume()
359 - if(stop_cb_active_)  
DCB 360 - 279 {  
361 - stop_cb_.~stop_cb_t();  
DCB 362 - 28 stop_cb_active_ = false;  
DCB 363 - 28 }  
364 -  
365   { 241   {
HITCBC 366   279 if(buffer_empty(buffers_)) 242   275 if(buffer_empty(buffers_))
HITCBC 367   8 return {{}, 0}; 243   8 return {{}, 0};
368 - if(canceled_)  
DCB 369 - 271 {  
370 - // The stop callback posted us but left the side  
371 - // untouched; unlink if a peer wake has not already.  
372 - if(side_ && side_->pending_claimed == &claimed_)  
DCB 373 - 3 {  
374 - side_->pending_cont_.h = {};  
DCB 375 - 3 side_->pending_ex = {};  
DCB 376 - 3 side_->pending_claimed = nullptr;  
DCB 377 - 3 }  
378 - return {error::canceled, 0};  
DCB 379 - 3 }  
380 -  
381   244  
HITCBC 382   268 auto* st = self_->state_.get(); 245   267 auto* st = self_->state_.get();
HITCBC 383   268 auto& side = st->sides[ 246   267 auto& side = st->sides[
HITCBC 384   268 self_->index_]; 247   267 self_->index_];
385   248  
HITCBC 386   268 if(st->closed) 249   267 if(st->closed)
HITCBC 387   12 return {error::eof, 0}; 250   12 return {error::eof, 0};
388   251  
HITCBC 389   256 if(side.eof && side.buf.empty()) 252   255 if(side.eof && side.buf.empty())
HITCBC 390   3 return {error::eof, 0}; 253   3 return {error::eof, 0};
391   254  
HITCBC 392   253 if(!side.eof) 255   252 if(!side.eof)
393   { 256   {
HITCBC 394   253 close_guard g{st}; 257   252 close_guard g{st};
HITCBC 395   253 auto ec = st->f.maybe_fail(); 258   252 auto ec = st->f.maybe_fail();
HITCBC 396   200 if(ec) 259   199 if(ec)
HITCBC 397   53 return {ec, 0}; 260   53 return {ec, 0};
HITCBC 398   147 g.disarm(); 261   146 g.disarm();
HITCBC 399   253 } 262   252 }
400   263  
HITCBC 401   294 std::size_t const n = buffer_copy( 264   292 std::size_t const n = buffer_copy(
HITCBC 402   147 buffers_, make_buffer(side.buf), 265   146 buffers_, make_buffer(side.buf),
403   side.max_read_size); 266   side.max_read_size);
HITCBC 404   147 side.buf.erase(0, n); 267   146 side.buf.erase(0, n);
HITCBC 405   147 return {{}, n}; 268   146 return {{}, n};
406   } 269   }
407   }; 270   };
HITCBC 408   280 return awaitable{this, buffers}; 271   275 return awaitable{this, buffers};
409   } 272   }
410   273  
411   /** Asynchronously write data to the stream. 274   /** Asynchronously write data to the stream.
412   275  
413   Transfers up to `buffer_size(buffers)` bytes to the 276   Transfers up to `buffer_size(buffers)` bytes to the
414   peer's incoming buffer. If the peer is suspended in 277   peer's incoming buffer. If the peer is suspended in
415   @ref read_some, it is resumed. Before every write, 278   @ref read_some, it is resumed. Before every write,
416   the attached @ref fuse is consulted to possibly inject 279   the attached @ref fuse is consulted to possibly inject
417   an error. If the fuse fires, the peer is automatically 280   an error. If the fuse fires, the peer is automatically
418   closed. If the stream is closed, returns `error::eof`. 281   closed. If the stream is closed, returns `error::eof`.
419   The returned `std::size_t` is the number of bytes 282   The returned `std::size_t` is the number of bytes
420   transferred. 283   transferred.
421   284  
422   @param buffers The const buffer sequence containing 285   @param buffers The const buffer sequence containing
423   data to write. 286   data to write.
424   287  
425   @return An awaitable that await-returns `(error_code,std::size_t)`. 288   @return An awaitable that await-returns `(error_code,std::size_t)`.
426 - @par Cancellation  
427 - If the environment's stop token has been requested, the write  
428 - completes immediately with `error::canceled` and transfers no  
429 - data. An empty buffer sequence is a no-op that completes  
430 - successfully regardless of the stop token.  
431 -  
432   289  
433   @see fuse, close 290   @see fuse, close
434   */ 291   */
435   template<ConstBufferSequence CB> 292   template<ConstBufferSequence CB>
436   auto 293   auto
HITCBC 437   261 write_some(CB buffers) 294   260 write_some(CB buffers)
438   { 295   {
439   struct awaitable 296   struct awaitable
440   { 297   {
441   stream* self_; 298   stream* self_;
442 - bool canceled_ = false;  
443   CB buffers_; 299   CB buffers_;
444   300  
HITCBC 445 - 261 bool await_ready() const noexcept { return false; } 301 + 260 bool await_ready() const noexcept { return true; }
446   302  
MISUIC 447 - // The write completes synchronously; await_suspend is only 303 + void await_suspend(
448 - // used to observe the environment's stop token. Returning  
449 - // false means the coroutine does not actually suspend.  
450 - bool  
DCB 451 - 261 await_suspend(  
452   std::coroutine_handle<>, 304   std::coroutine_handle<>,
453 - io_env const* env) noexcept 305 + io_env const*) const noexcept
454 - canceled_ = env->stop_token.stop_requested();  
DCB 455 - 261 return false;  
ECB 456   261 { 306   {
MISUIC 457   } 307   }
458   308  
459   io_result<std::size_t> 309   io_result<std::size_t>
HITCBC 460   261 await_resume() 310   260 await_resume()
461   { 311   {
HITCBC 462   261 std::size_t n = buffer_size(buffers_); 312   260 std::size_t n = buffer_size(buffers_);
HITCBC 463   261 if(n == 0) 313   260 if(n == 0)
HITCBC 464   4 return {{}, 0}; 314   4 return {{}, 0};
465 - if(canceled_)  
DCB 466 - 257 return {error::canceled, 0};  
DCB 467 - 1  
468   315  
HITCBC 469   256 auto* st = self_->state_.get(); 316   256 auto* st = self_->state_.get();
470   317  
HITCBC 471   256 if(st->closed) 318   256 if(st->closed)
MISUBC 472   return {error::eof, 0}; 319   return {error::eof, 0};
473   320  
HITCBC 474   256 close_guard g{st}; 321   256 close_guard g{st};
HITCBC 475   256 auto ec = st->f.maybe_fail(); 322   256 auto ec = st->f.maybe_fail();
HITCBC 476   205 if(ec) 323   205 if(ec)
HITCBC 477   51 return {ec, 0}; 324   51 return {ec, 0};
HITCBC 478   154 g.disarm(); 325   154 g.disarm();
479   326  
HITCBC 480   154 int peer = 1 - self_->index_; 327   154 int peer = 1 - self_->index_;
HITCBC 481   154 auto& side = st->sides[peer]; 328   154 auto& side = st->sides[peer];
482   329  
HITCBC 483   154 std::size_t const old_size = side.buf.size(); 330   154 std::size_t const old_size = side.buf.size();
HITCBC 484   154 side.buf.resize(old_size + n); 331   154 side.buf.resize(old_size + n);
HITCBC 485   154 buffer_copy(make_buffer( 332   154 buffer_copy(make_buffer(
HITCBC 486   154 side.buf.data() + old_size, n), 333   154 side.buf.data() + old_size, n),
HITCBC 487   154 buffers_, n); 334   154 buffers_, n);
488   335  
HITCBC 489 - 154 state::wake(side); 336 + 154 if(side.pending_cont_.h)
  337 + {
HITGNC   338 + 12 side.pending_ex.post(side.pending_cont_);
HITGNC   339 + 12 side.pending_cont_.h = {};
HITGNC   340 + 12 side.pending_ex = {};
  341 + }
490   342  
HITCBC 491   154 return {{}, n}; 343   154 return {{}, n};
HITCBC 492   256 } 344   256 }
493   }; 345   };
HITCBC 494   261 return awaitable{this, buffers}; 346   260 return awaitable{this, buffers};
495   } 347   }
496   348  
497   /** Inject data into this stream's peer for reading. 349   /** Inject data into this stream's peer for reading.
498   350  
499   Appends data directly to the peer's incoming buffer, 351   Appends data directly to the peer's incoming buffer,
500   bypassing the fuse. If the peer is suspended in 352   bypassing the fuse. If the peer is suspended in
501   @ref read_some, it is resumed. This is test setup, 353   @ref read_some, it is resumed. This is test setup,
502   not an operation under test. 354   not an operation under test.
503   355  
504   @param sv The data to inject. 356   @param sv The data to inject.
505   357  
506   @see make_stream_pair 358   @see make_stream_pair
507   */ 359   */
508   void 360   void
HITCBC 509   89 provide(std::string_view sv) 361   87 provide(std::string_view sv)
510   { 362   {
HITCBC 511   89 int peer = 1 - index_; 363   87 int peer = 1 - index_;
HITCBC 512   89 auto& side = state_->sides[peer]; 364   87 auto& side = state_->sides[peer];
HITCBC 513   89 side.buf.append(sv); 365   87 side.buf.append(sv);
HITCBC 514 - 89 state::wake(side); 366 + 87 if(side.pending_cont_.h)
  367 + {
MISUNC   368 + side.pending_ex.post(side.pending_cont_);
MISUNC   369 + side.pending_cont_.h = {};
MISUNC   370 + side.pending_ex = {};
  371 + }
HITCBC 515   89 } 372   87 }
516   373  
517   /** Read from this stream and verify the content. 374   /** Read from this stream and verify the content.
518   375  
519   Reads exactly `expected.size()` bytes from the stream 376   Reads exactly `expected.size()` bytes from the stream
520   and compares against the expected string. The read goes 377   and compares against the expected string. The read goes
521   through the normal path including the fuse. 378   through the normal path including the fuse.
522   379  
523   @param expected The expected content. 380   @param expected The expected content.
524   381  
525   @return A pair of `(error_code, bool)`. The error_code 382   @return A pair of `(error_code, bool)`. The error_code
526   is set if a read error occurs (e.g. fuse injection). 383   is set if a read error occurs (e.g. fuse injection).
527   The bool is true if the data matches. 384   The bool is true if the data matches.
528   385  
529   @see provide 386   @see provide
530   */ 387   */
531   std::pair<std::error_code, bool> 388   std::pair<std::error_code, bool>
HITCBC 532   38 expect(std::string_view expected) 389   38 expect(std::string_view expected)
533   { 390   {
HITCBC 534   38 std::error_code result; 391   38 std::error_code result;
HITCBC 535   38 bool match = false; 392   38 bool match = false;
HITCBC 536   141 run_blocking()([]( 393   141 run_blocking()([](
537   stream& self, 394   stream& self,
538   std::string_view expected, 395   std::string_view expected,
539   std::error_code& result, 396   std::error_code& result,
540   bool& match) -> task<> 397   bool& match) -> task<>
541   { 398   {
542   std::string buf(expected.size(), '\0'); 399   std::string buf(expected.size(), '\0');
543   auto [ec, n] = co_await read( 400   auto [ec, n] = co_await read(
544   self, mutable_buffer( 401   self, mutable_buffer(
545   buf.data(), buf.size())); 402   buf.data(), buf.size()));
546   if(ec) 403   if(ec)
547   { 404   {
548   result = ec; 405   result = ec;
549   co_return; 406   co_return;
550   } 407   }
551   match = (std::string_view( 408   match = (std::string_view(
552   buf.data(), n) == expected); 409   buf.data(), n) == expected);
HITCBC 553   161 }(*this, expected, result, match)); 410   161 }(*this, expected, result, match));
HITCBC 554   58 return {result, match}; 411   58 return {result, match};
555   } 412   }
556   413  
557   /** Return the stream's pending read data. 414   /** Return the stream's pending read data.
558   415  
559   Returns a view of the data waiting to be read 416   Returns a view of the data waiting to be read
560   from this stream. This is a direct peek at the 417   from this stream. This is a direct peek at the
561   internal buffer, bypassing the fuse. 418   internal buffer, bypassing the fuse.
562   419  
563   @return A view of the pending data. 420   @return A view of the pending data.
564   421  
565   @see provide, expect 422   @see provide, expect
566   */ 423   */
567   std::string_view 424   std::string_view
568   data() const noexcept 425   data() const noexcept
569   { 426   {
570   return state_->sides[index_].buf; 427   return state_->sides[index_].buf;
571   } 428   }
572   }; 429   };
573   430  
574   /** Create a connected pair of test streams. 431   /** Create a connected pair of test streams.
575   432  
576   Data written to one stream becomes readable on the other. 433   Data written to one stream becomes readable on the other.
577   If a coroutine calls @ref stream::read_some when no data 434   If a coroutine calls @ref stream::read_some when no data
578   is available, it suspends until the peer writes. Before 435   is available, it suspends until the peer writes. Before
579   every read or write, the @ref fuse is consulted to 436   every read or write, the @ref fuse is consulted to
580   possibly inject an error for testing fault scenarios. 437   possibly inject an error for testing fault scenarios.
581   When the fuse fires, the peer is automatically closed. 438   When the fuse fires, the peer is automatically closed.
582   439  
583   @param f The fuse used to inject errors during operations. 440   @param f The fuse used to inject errors during operations.
584   441  
585   @return A pair of connected streams. 442   @return A pair of connected streams.
586   443  
587   @see stream, fuse 444   @see stream, fuse
588   */ 445   */
589   inline std::pair<stream, stream> 446   inline std::pair<stream, stream>
HITCBC 590   286 make_stream_pair(fuse f = {}) 447   280 make_stream_pair(fuse f = {})
591   { 448   {
HITCBC 592   286 auto sp = std::make_shared<stream::state>(std::move(f)); 449   280 auto sp = std::make_shared<stream::state>(std::move(f));
HITCBC 593   572 return {stream(sp, 0), stream(sp, 1)}; 450   560 return {stream(sp, 0), stream(sp, 1)};
HITCBC 594   286 } 451   280 }
595   452  
596   } // test 453   } // test
597   } // capy 454   } // capy
598   } // boost 455   } // boost
599   456  
600   #endif 457   #endif