94.78% Lines (109/115) 95.00% Functions (19/20)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) 2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3   // 3   //
4   // Distributed under the Boost Software License, Version 1.0. (See accompanying 4   // Distributed under the Boost Software License, Version 1.0. (See accompanying
5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6   // 6   //
7   // Official repository: https://github.com/cppalliance/capy 7   // Official repository: https://github.com/cppalliance/capy
8   // 8   //
9   9  
10   #ifndef BOOST_CAPY_TEST_STREAM_HPP 10   #ifndef BOOST_CAPY_TEST_STREAM_HPP
11   #define BOOST_CAPY_TEST_STREAM_HPP 11   #define BOOST_CAPY_TEST_STREAM_HPP
12   12  
13   #include <boost/capy/detail/config.hpp> 13   #include <boost/capy/detail/config.hpp>
14   #include <boost/capy/buffers.hpp> 14   #include <boost/capy/buffers.hpp>
15   #include <boost/capy/buffers/buffer_copy.hpp> 15   #include <boost/capy/buffers/buffer_copy.hpp>
16   #include <boost/capy/buffers/make_buffer.hpp> 16   #include <boost/capy/buffers/make_buffer.hpp>
17   #include <boost/capy/continuation.hpp> 17   #include <boost/capy/continuation.hpp>
18   #include <coroutine> 18   #include <coroutine>
19   #include <boost/capy/ex/io_env.hpp> 19   #include <boost/capy/ex/io_env.hpp>
20   #include <boost/capy/io_result.hpp> 20   #include <boost/capy/io_result.hpp>
21   #include <boost/capy/error.hpp> 21   #include <boost/capy/error.hpp>
22   #include <boost/capy/read.hpp> 22   #include <boost/capy/read.hpp>
23   #include <boost/capy/task.hpp> 23   #include <boost/capy/task.hpp>
24   #include <boost/capy/test/fuse.hpp> 24   #include <boost/capy/test/fuse.hpp>
25   #include <boost/capy/test/run_blocking.hpp> 25   #include <boost/capy/test/run_blocking.hpp>
26   26  
27   #include <memory> 27   #include <memory>
28   #include <stop_token> 28   #include <stop_token>
29   #include <string> 29   #include <string>
30   #include <string_view> 30   #include <string_view>
31   #include <utility> 31   #include <utility>
32   32  
33   namespace boost { 33   namespace boost {
34   namespace capy { 34   namespace capy {
35   namespace test { 35   namespace test {
36   36  
37   /** A connected stream for testing bidirectional I/O. 37   /** A connected stream for testing bidirectional I/O.
38   38  
39   Streams are created in pairs via @ref make_stream_pair. 39   Streams are created in pairs via @ref make_stream_pair.
40   Data written to one end becomes available for reading on 40   Data written to one end becomes available for reading on
41   the other. If no data is available when @ref read_some 41   the other. If no data is available when @ref read_some
42   is called, the calling coroutine suspends until the peer 42   is called, the calling coroutine suspends until the peer
43   calls @ref write_some. The shared @ref fuse enables error 43   calls @ref write_some. The shared @ref fuse enables error
44   injection at controlled points in both directions. 44   injection at controlled points in both directions.
45   45  
46   When the fuse injects an error or throws on one end, the 46   When the fuse injects an error or throws on one end, the
47   other end is automatically closed: any suspended reader is 47   other end is automatically closed: any suspended reader is
48   resumed with `error::eof`, and subsequent operations on 48   resumed with `error::eof`, and subsequent operations on
49   both ends return `error::eof`. Calling @ref close on one 49   both ends return `error::eof`. Calling @ref close on one
50   end signals eof to the peer's reads after draining any 50   end signals eof to the peer's reads after draining any
51   buffered data, while the peer may still write. 51   buffered data, while the peer may still write.
52   52  
53   @par Thread Safety 53   @par Thread Safety
54   Single-threaded only. Both ends of the pair must be 54   Single-threaded only. Both ends of the pair must be
55   accessed from the same thread. Concurrent access is 55   accessed from the same thread. Concurrent access is
56   undefined behavior. 56   undefined behavior.
57   57  
58   @par Example 58   @par Example
59   @code 59   @code
60   fuse f; 60   fuse f;
61   auto [a, b] = make_stream_pair( f ); 61   auto [a, b] = make_stream_pair( f );
62   62  
63   auto r = f.armed( [&]( fuse& ) -> task<> { 63   auto r = f.armed( [&]( fuse& ) -> task<> {
64   auto [ec, n] = co_await a.write_some( 64   auto [ec, n] = co_await a.write_some(
65   const_buffer( "hello", 5 ) ); 65   const_buffer( "hello", 5 ) );
66   if( ec ) 66   if( ec )
67   co_return; 67   co_return;
68   68  
69   char buf[32]; 69   char buf[32];
70   auto [ec2, n2] = co_await b.read_some( 70   auto [ec2, n2] = co_await b.read_some(
71   mutable_buffer( buf, sizeof( buf ) ) ); 71   mutable_buffer( buf, sizeof( buf ) ) );
72   if( ec2 ) 72   if( ec2 )
73   co_return; 73   co_return;
74   // buf contains "hello" 74   // buf contains "hello"
75   } ); 75   } );
76   @endcode 76   @endcode
77   77  
78   @see make_stream_pair, fuse 78   @see make_stream_pair, fuse
79   */ 79   */
80   class stream 80   class stream
81   { 81   {
82   // Single-threaded only. No concurrent access to either 82   // Single-threaded only. No concurrent access to either
83   // end of the pair. Both streams and all operations must 83   // end of the pair. Both streams and all operations must
84   // run on the same thread. 84   // run on the same thread.
85   85  
86   struct half 86   struct half
87   { 87   {
88   std::string buf; 88   std::string buf;
89   std::size_t max_read_size = std::size_t(-1); 89   std::size_t max_read_size = std::size_t(-1);
90   continuation pending_cont_; 90   continuation pending_cont_;
91   executor_ref pending_ex; 91   executor_ref pending_ex;
92   bool eof = false; 92   bool eof = false;
93   }; 93   };
94   94  
95   struct state 95   struct state
96   { 96   {
97   fuse f; 97   fuse f;
98   bool closed = false; 98   bool closed = false;
99   half sides[2]; 99   half sides[2];
100   100  
HITCBC 101   280 explicit state(fuse f_) noexcept 101   280 explicit state(fuse f_) noexcept
HITCBC 102   840 : f(std::move(f_)) 102   840 : f(std::move(f_))
103   { 103   {
HITCBC 104   280 } 104   280 }
105   105  
106   // Set closed and resume any suspended readers 106   // Set closed and resume any suspended readers
107   // with eof on both sides. 107   // with eof on both sides.
HITCBC 108   208 void close() 108   208 void close()
109   { 109   {
HITCBC 110   208 closed = true; 110   208 closed = true;
HITCBC 111   624 for(auto& side : sides) 111   624 for(auto& side : sides)
112   { 112   {
HITCBC 113   416 if(side.pending_cont_.h) 113   416 if(side.pending_cont_.h)
114   { 114   {
HITCBC 115   12 side.pending_ex.post(side.pending_cont_); 115   12 side.pending_ex.post(side.pending_cont_);
HITCBC 116   12 side.pending_cont_.h = {}; 116   12 side.pending_cont_.h = {};
HITCBC 117   12 side.pending_ex = {}; 117   12 side.pending_ex = {};
118   } 118   }
119   } 119   }
HITCBC 120   208 } 120   208 }
121   }; 121   };
122   122  
123   // Wraps the maybe_fail() call. If the guard is 123   // Wraps the maybe_fail() call. If the guard is
124   // not disarmed before destruction (fuse returned 124   // not disarmed before destruction (fuse returned
125   // an error, or threw an exception), closes both 125   // an error, or threw an exception), closes both
126   // ends so any suspended peer gets eof. 126   // ends so any suspended peer gets eof.
127   struct close_guard 127   struct close_guard
128   { 128   {
129   state* st; 129   state* st;
130   bool armed = true; 130   bool armed = true;
HITCBC 131   300 void disarm() noexcept { armed = false; } 131   300 void disarm() noexcept { armed = false; }
HITCBC 132   508 ~close_guard() noexcept(false) { if(armed) st->close(); } 132   508 ~close_guard() noexcept(false) { if(armed) st->close(); }
133   }; 133   };
134   134  
135   std::shared_ptr<state> state_; 135   std::shared_ptr<state> state_;
136   int index_; 136   int index_;
137   137  
HITCBC 138   560 stream( 138   560 stream(
139   std::shared_ptr<state> sp, 139   std::shared_ptr<state> sp,
140   int index) noexcept 140   int index) noexcept
HITCBC 141   560 : state_(std::move(sp)) 141   560 : state_(std::move(sp))
HITCBC 142   560 , index_(index) 142   560 , index_(index)
143   { 143   {
HITCBC 144   560 } 144   560 }
145   145  
146   friend std::pair<stream, stream> 146   friend std::pair<stream, stream>
147   make_stream_pair(fuse); 147   make_stream_pair(fuse);
148   148  
149   public: 149   public:
150   stream(stream const&) = delete; 150   stream(stream const&) = delete;
151   stream& operator=(stream const&) = delete; 151   stream& operator=(stream const&) = delete;
HITCBC 152   660 stream(stream&&) = default; 152   660 stream(stream&&) = default;
153   stream& operator=(stream&&) = default; 153   stream& operator=(stream&&) = default;
154   154  
155   /** Signal end-of-stream to the peer. 155   /** Signal end-of-stream to the peer.
156   156  
157   Marks the peer's read direction as closed. 157   Marks the peer's read direction as closed.
158   If the peer is suspended in @ref read_some, 158   If the peer is suspended in @ref read_some,
159   it is resumed. The peer drains any buffered 159   it is resumed. The peer drains any buffered
160   data before receiving `error::eof`. Writes 160   data before receiving `error::eof`. Writes
161   from the peer are unaffected. 161   from the peer are unaffected.
162   */ 162   */
163   void 163   void
HITCBC 164   3 close() 164   3 close()
165   { 165   {
HITCBC 166   3 int peer = 1 - index_; 166   3 int peer = 1 - index_;
HITCBC 167   3 auto& side = state_->sides[peer]; 167   3 auto& side = state_->sides[peer];
HITCBC 168   3 side.eof = true; 168   3 side.eof = true;
HITCBC 169   3 if(side.pending_cont_.h) 169   3 if(side.pending_cont_.h)
170   { 170   {
HITCBC 171   1 side.pending_ex.post(side.pending_cont_); 171   1 side.pending_ex.post(side.pending_cont_);
HITCBC 172   1 side.pending_cont_.h = {}; 172   1 side.pending_cont_.h = {};
HITCBC 173   1 side.pending_ex = {}; 173   1 side.pending_ex = {};
174   } 174   }
HITCBC 175   3 } 175   3 }
176   176  
177   /** Set the maximum bytes returned per read. 177   /** Set the maximum bytes returned per read.
178   178  
179   Limits how many bytes @ref read_some returns in 179   Limits how many bytes @ref read_some returns in
180   a single call, simulating chunked network delivery. 180   a single call, simulating chunked network delivery.
181   The default is unlimited. 181   The default is unlimited.
182   182  
183   @param n Maximum bytes per read. 183   @param n Maximum bytes per read.
184   */ 184   */
185   void 185   void
HITCBC 186   54 set_max_read_size(std::size_t n) noexcept 186   54 set_max_read_size(std::size_t n) noexcept
187   { 187   {
HITCBC 188   54 state_->sides[index_].max_read_size = n; 188   54 state_->sides[index_].max_read_size = n;
HITCBC 189   54 } 189   54 }
190   190  
191   /** Asynchronously read data from the stream. 191   /** Asynchronously read data from the stream.
192   192  
193   Transfers up to `buffer_size(buffers)` bytes from 193   Transfers up to `buffer_size(buffers)` bytes from
194   data written by the peer. If no data is available, 194   data written by the peer. If no data is available,
195   the calling coroutine suspends until the peer calls 195   the calling coroutine suspends until the peer calls
196   @ref write_some. Before every read, the attached 196   @ref write_some. Before every read, the attached
197   @ref fuse is consulted to possibly inject an error. 197   @ref fuse is consulted to possibly inject an error.
198   If the fuse fires, the peer is automatically closed. 198   If the fuse fires, the peer is automatically closed.
199   If the stream is closed, returns `error::eof`. 199   If the stream is closed, returns `error::eof`.
200   The returned `std::size_t` is the number of bytes 200   The returned `std::size_t` is the number of bytes
201   transferred. 201   transferred.
202   202  
203   @param buffers The mutable buffer sequence to receive data. 203   @param buffers The mutable buffer sequence to receive data.
204   204  
205   @return An awaitable that await-returns `(error_code,std::size_t)`. 205   @return An awaitable that await-returns `(error_code,std::size_t)`.
206   206  
207   @see fuse, close 207   @see fuse, close
208   */ 208   */
209   template<MutableBufferSequence MB> 209   template<MutableBufferSequence MB>
210   auto 210   auto
HITCBC 211   275 read_some(MB buffers) 211   275 read_some(MB buffers)
212   { 212   {
213   struct awaitable 213   struct awaitable
214   { 214   {
215   stream* self_; 215   stream* self_;
216   MB buffers_; 216   MB buffers_;
217   217  
HITCBC 218   275 bool await_ready() const noexcept 218   275 bool await_ready() const noexcept
219   { 219   {
HITCBC 220   275 if(buffer_empty(buffers_)) 220   275 if(buffer_empty(buffers_))
HITCBC 221   8 return true; 221   8 return true;
HITCBC 222   267 auto* st = self_->state_.get(); 222   267 auto* st = self_->state_.get();
HITCBC 223   267 auto& side = st->sides[self_->index_]; 223   267 auto& side = st->sides[self_->index_];
HITCBC 224   532 return st->closed || side.eof || 224   532 return st->closed || side.eof ||
HITCBC 225   532 !side.buf.empty(); 225   532 !side.buf.empty();
226   } 226   }
227   227  
HITCBC 228   25 std::coroutine_handle<> await_suspend( 228   25 std::coroutine_handle<> await_suspend(
229   std::coroutine_handle<> h, 229   std::coroutine_handle<> h,
230   io_env const* env) noexcept 230   io_env const* env) noexcept
231   { 231   {
HITCBC 232   25 auto& side = self_->state_->sides[ 232   25 auto& side = self_->state_->sides[
HITCBC 233   25 self_->index_]; 233   25 self_->index_];
HITCBC 234   25 side.pending_cont_.h = h; 234   25 side.pending_cont_.h = h;
HITCBC 235   25 side.pending_ex = env->executor; 235   25 side.pending_ex = env->executor;
HITCBC 236   25 return std::noop_coroutine(); 236   25 return std::noop_coroutine();
237   } 237   }
238   238  
239   io_result<std::size_t> 239   io_result<std::size_t>
HITCBC 240   275 await_resume() 240   275 await_resume()
241   { 241   {
HITCBC 242   275 if(buffer_empty(buffers_)) 242   275 if(buffer_empty(buffers_))
HITCBC 243   8 return {{}, 0}; 243   8 return {{}, 0};
244   244  
HITCBC 245   267 auto* st = self_->state_.get(); 245   267 auto* st = self_->state_.get();
HITCBC 246   267 auto& side = st->sides[ 246   267 auto& side = st->sides[
HITCBC 247   267 self_->index_]; 247   267 self_->index_];
248   248  
HITCBC 249   267 if(st->closed) 249   267 if(st->closed)
HITCBC 250   12 return {error::eof, 0}; 250   12 return {error::eof, 0};
251   251  
HITCBC 252   255 if(side.eof && side.buf.empty()) 252   255 if(side.eof && side.buf.empty())
HITCBC 253   3 return {error::eof, 0}; 253   3 return {error::eof, 0};
254   254  
HITCBC 255   252 if(!side.eof) 255   252 if(!side.eof)
256   { 256   {
HITCBC 257   252 close_guard g{st}; 257   252 close_guard g{st};
HITCBC 258   252 auto ec = st->f.maybe_fail(); 258   252 auto ec = st->f.maybe_fail();
HITCBC 259   199 if(ec) 259   199 if(ec)
HITCBC 260   53 return {ec, 0}; 260   53 return {ec, 0};
HITCBC 261   146 g.disarm(); 261   146 g.disarm();
HITCBC 262   252 } 262   252 }
263   263  
HITCBC 264   292 std::size_t const n = buffer_copy( 264   292 std::size_t const n = buffer_copy(
HITCBC 265   146 buffers_, make_buffer(side.buf), 265   146 buffers_, make_buffer(side.buf),
266   side.max_read_size); 266   side.max_read_size);
HITCBC 267   146 side.buf.erase(0, n); 267   146 side.buf.erase(0, n);
HITCBC 268   146 return {{}, n}; 268   146 return {{}, n};
269   } 269   }
270   }; 270   };
HITCBC 271   275 return awaitable{this, buffers}; 271   275 return awaitable{this, buffers};
272   } 272   }
273   273  
274   /** Asynchronously write data to the stream. 274   /** Asynchronously write data to the stream.
275   275  
276   Transfers up to `buffer_size(buffers)` bytes to the 276   Transfers up to `buffer_size(buffers)` bytes to the
277   peer's incoming buffer. If the peer is suspended in 277   peer's incoming buffer. If the peer is suspended in
278   @ref read_some, it is resumed. Before every write, 278   @ref read_some, it is resumed. Before every write,
279   the attached @ref fuse is consulted to possibly inject 279   the attached @ref fuse is consulted to possibly inject
280   an error. If the fuse fires, the peer is automatically 280   an error. If the fuse fires, the peer is automatically
281   closed. If the stream is closed, returns `error::eof`. 281   closed. If the stream is closed, returns `error::eof`.
282   The returned `std::size_t` is the number of bytes 282   The returned `std::size_t` is the number of bytes
283   transferred. 283   transferred.
284   284  
285   @param buffers The const buffer sequence containing 285   @param buffers The const buffer sequence containing
286   data to write. 286   data to write.
287   287  
288   @return An awaitable that await-returns `(error_code,std::size_t)`. 288   @return An awaitable that await-returns `(error_code,std::size_t)`.
289   289  
290   @see fuse, close 290   @see fuse, close
291   */ 291   */
292   template<ConstBufferSequence CB> 292   template<ConstBufferSequence CB>
293   auto 293   auto
HITCBC 294   260 write_some(CB buffers) 294   260 write_some(CB buffers)
295   { 295   {
296   struct awaitable 296   struct awaitable
297   { 297   {
298   stream* self_; 298   stream* self_;
299   CB buffers_; 299   CB buffers_;
300   300  
HITCBC 301   260 bool await_ready() const noexcept { return true; } 301   260 bool await_ready() const noexcept { return true; }
302   302  
MISUBC 303   void await_suspend( 303   void await_suspend(
304   std::coroutine_handle<>, 304   std::coroutine_handle<>,
305   io_env const*) const noexcept 305   io_env const*) const noexcept
306   { 306   {
MISUBC 307   } 307   }
308   308  
309   io_result<std::size_t> 309   io_result<std::size_t>
HITCBC 310   260 await_resume() 310   260 await_resume()
311   { 311   {
HITCBC 312   260 std::size_t n = buffer_size(buffers_); 312   260 std::size_t n = buffer_size(buffers_);
HITCBC 313   260 if(n == 0) 313   260 if(n == 0)
HITCBC 314   4 return {{}, 0}; 314   4 return {{}, 0};
315   315  
HITCBC 316   256 auto* st = self_->state_.get(); 316   256 auto* st = self_->state_.get();
317   317  
HITCBC 318   256 if(st->closed) 318   256 if(st->closed)
MISUBC 319   return {error::eof, 0}; 319   return {error::eof, 0};
320   320  
HITCBC 321   256 close_guard g{st}; 321   256 close_guard g{st};
HITCBC 322   256 auto ec = st->f.maybe_fail(); 322   256 auto ec = st->f.maybe_fail();
HITCBC 323   205 if(ec) 323   205 if(ec)
HITCBC 324   51 return {ec, 0}; 324   51 return {ec, 0};
HITCBC 325   154 g.disarm(); 325   154 g.disarm();
326   326  
HITCBC 327   154 int peer = 1 - self_->index_; 327   154 int peer = 1 - self_->index_;
HITCBC 328   154 auto& side = st->sides[peer]; 328   154 auto& side = st->sides[peer];
329   329  
HITCBC 330   154 std::size_t const old_size = side.buf.size(); 330   154 std::size_t const old_size = side.buf.size();
HITCBC 331   154 side.buf.resize(old_size + n); 331   154 side.buf.resize(old_size + n);
HITCBC 332   154 buffer_copy(make_buffer( 332   154 buffer_copy(make_buffer(
HITCBC 333   154 side.buf.data() + old_size, n), 333   154 side.buf.data() + old_size, n),
HITCBC 334   154 buffers_, n); 334   154 buffers_, n);
335   335  
HITCBC 336   154 if(side.pending_cont_.h) 336   154 if(side.pending_cont_.h)
337   { 337   {
HITCBC 338   12 side.pending_ex.post(side.pending_cont_); 338   12 side.pending_ex.post(side.pending_cont_);
HITCBC 339   12 side.pending_cont_.h = {}; 339   12 side.pending_cont_.h = {};
HITCBC 340   12 side.pending_ex = {}; 340   12 side.pending_ex = {};
341   } 341   }
342   342  
HITCBC 343   154 return {{}, n}; 343   154 return {{}, n};
HITCBC 344   256 } 344   256 }
345   }; 345   };
HITCBC 346   260 return awaitable{this, buffers}; 346   260 return awaitable{this, buffers};
347   } 347   }
348   348  
349   /** Inject data into this stream's peer for reading. 349   /** Inject data into this stream's peer for reading.
350   350  
351   Appends data directly to the peer's incoming buffer, 351   Appends data directly to the peer's incoming buffer,
352   bypassing the fuse. If the peer is suspended in 352   bypassing the fuse. If the peer is suspended in
353   @ref read_some, it is resumed. This is test setup, 353   @ref read_some, it is resumed. This is test setup,
354   not an operation under test. 354   not an operation under test.
355   355  
356   @param sv The data to inject. 356   @param sv The data to inject.
357   357  
358   @see make_stream_pair 358   @see make_stream_pair
359   */ 359   */
360   void 360   void
HITCBC 361   87 provide(std::string_view sv) 361   87 provide(std::string_view sv)
362   { 362   {
HITCBC 363   87 int peer = 1 - index_; 363   87 int peer = 1 - index_;
HITCBC 364   87 auto& side = state_->sides[peer]; 364   87 auto& side = state_->sides[peer];
HITCBC 365   87 side.buf.append(sv); 365   87 side.buf.append(sv);
HITCBC 366   87 if(side.pending_cont_.h) 366   87 if(side.pending_cont_.h)
367   { 367   {
MISUBC 368   side.pending_ex.post(side.pending_cont_); 368   side.pending_ex.post(side.pending_cont_);
MISUBC 369   side.pending_cont_.h = {}; 369   side.pending_cont_.h = {};
MISUBC 370   side.pending_ex = {}; 370   side.pending_ex = {};
371   } 371   }
HITCBC 372   87 } 372   87 }
373   373  
374   /** Read from this stream and verify the content. 374   /** Read from this stream and verify the content.
375   375  
376   Reads exactly `expected.size()` bytes from the stream 376   Reads exactly `expected.size()` bytes from the stream
377   and compares against the expected string. The read goes 377   and compares against the expected string. The read goes
378   through the normal path including the fuse. 378   through the normal path including the fuse.
379   379  
380   @param expected The expected content. 380   @param expected The expected content.
381   381  
382   @return A pair of `(error_code, bool)`. The error_code 382   @return A pair of `(error_code, bool)`. The error_code
383   is set if a read error occurs (e.g. fuse injection). 383   is set if a read error occurs (e.g. fuse injection).
384   The bool is true if the data matches. 384   The bool is true if the data matches.
385   385  
386   @see provide 386   @see provide
387   */ 387   */
388   std::pair<std::error_code, bool> 388   std::pair<std::error_code, bool>
HITCBC 389   38 expect(std::string_view expected) 389   38 expect(std::string_view expected)
390   { 390   {
HITCBC 391   38 std::error_code result; 391   38 std::error_code result;
HITCBC 392   38 bool match = false; 392   38 bool match = false;
HITCBC 393   141 run_blocking()([]( 393   141 run_blocking()([](
394   stream& self, 394   stream& self,
395   std::string_view expected, 395   std::string_view expected,
396   std::error_code& result, 396   std::error_code& result,
397   bool& match) -> task<> 397   bool& match) -> task<>
398   { 398   {
399   std::string buf(expected.size(), '\0'); 399   std::string buf(expected.size(), '\0');
400   auto [ec, n] = co_await read( 400   auto [ec, n] = co_await read(
401   self, mutable_buffer( 401   self, mutable_buffer(
402   buf.data(), buf.size())); 402   buf.data(), buf.size()));
403   if(ec) 403   if(ec)
404   { 404   {
405   result = ec; 405   result = ec;
406   co_return; 406   co_return;
407   } 407   }
408   match = (std::string_view( 408   match = (std::string_view(
409   buf.data(), n) == expected); 409   buf.data(), n) == expected);
HITCBC 410   161 }(*this, expected, result, match)); 410   161 }(*this, expected, result, match));
HITCBC 411   58 return {result, match}; 411   58 return {result, match};
412   } 412   }
413   413  
414   /** Return the stream's pending read data. 414   /** Return the stream's pending read data.
415   415  
416   Returns a view of the data waiting to be read 416   Returns a view of the data waiting to be read
417   from this stream. This is a direct peek at the 417   from this stream. This is a direct peek at the
418   internal buffer, bypassing the fuse. 418   internal buffer, bypassing the fuse.
419   419  
420   @return A view of the pending data. 420   @return A view of the pending data.
421   421  
422   @see provide, expect 422   @see provide, expect
423   */ 423   */
424   std::string_view 424   std::string_view
425   data() const noexcept 425   data() const noexcept
426   { 426   {
427   return state_->sides[index_].buf; 427   return state_->sides[index_].buf;
428   } 428   }
429   }; 429   };
430   430  
431   /** Create a connected pair of test streams. 431   /** Create a connected pair of test streams.
432   432  
433   Data written to one stream becomes readable on the other. 433   Data written to one stream becomes readable on the other.
434   If a coroutine calls @ref stream::read_some when no data 434   If a coroutine calls @ref stream::read_some when no data
435   is available, it suspends until the peer writes. Before 435   is available, it suspends until the peer writes. Before
436   every read or write, the @ref fuse is consulted to 436   every read or write, the @ref fuse is consulted to
437   possibly inject an error for testing fault scenarios. 437   possibly inject an error for testing fault scenarios.
438   When the fuse fires, the peer is automatically closed. 438   When the fuse fires, the peer is automatically closed.
439   439  
440   @param f The fuse used to inject errors during operations. 440   @param f The fuse used to inject errors during operations.
441   441  
442   @return A pair of connected streams. 442   @return A pair of connected streams.
443   443  
444   @see stream, fuse 444   @see stream, fuse
445   */ 445   */
446   inline std::pair<stream, stream> 446   inline std::pair<stream, stream>
HITCBC 447   280 make_stream_pair(fuse f = {}) 447   280 make_stream_pair(fuse f = {})
448   { 448   {
HITCBC 449   280 auto sp = std::make_shared<stream::state>(std::move(f)); 449   280 auto sp = std::make_shared<stream::state>(std::move(f));
HITCBC 450   560 return {stream(sp, 0), stream(sp, 1)}; 450   560 return {stream(sp, 0), stream(sp, 1)};
HITCBC 451   280 } 451   280 }
452   452  
453   } // test 453   } // test
454   } // capy 454   } // capy
455   } // boost 455   } // boost
456   456  
457   #endif 457   #endif