@@ -1371,36 +1371,34 @@ Note that `cancel` and `close` notify in opposite directions:
13711371* ` close ` * must not* be called on a readable or writable end with an operation
13721372 pending, and thus ` close ` notifies the opposite end.
13731373
1374- Finally, the meat of the class is the ` read ` method that is called through the
1375- abstract ` ReadableStream ` interface (by the host or another component). There
1376- is also a symmetric ` write ` method that follows the same rules as ` read ` ,
1377- but in the opposite direction. Both are implemented by a single underlying
1378- ` copy ` method parameterized by the direction of the copy:
1379- ``` python
1380- def read (self , inst , dst , on_copy , on_copy_done ):
1381- self .copy(inst, dst, on_copy, on_copy_done, self .pending_buffer, dst)
1382-
1383- def write (self , inst , src , on_copy , on_copy_done ):
1384- self .copy(inst, src, on_copy, on_copy_done, src, self .pending_buffer)
1385-
1386- def copy (self , inst , buffer , on_copy , on_copy_done , src , dst ):
1374+ The ` read ` method implements the ` ReadableStream.read ` interface described
1375+ above and is called by either ` stream.read ` or the host, depending on who is
1376+ passed the readable end of the stream. If the reader is first to rendezvous,
1377+ then all the parameters are stored in the ` pending_* ` fields, requiring the
1378+ reader to wait for the writer to rendezvous. If the writer was first to
1379+ rendezvous, then there is already a pending ` ReadableBuffer ` to read from, and
1380+ so the reader copies as much as it can (which may be less than a full buffer's
1381+ worth) and eagerly completes the copy without blocking. In the final special
1382+ case where both the reader and pending writer have zero-length buffers, the
1383+ writer is notified, but the reader remains blocked:
1384+ ``` python
1385+ def read (self , inst , dst_buffer , on_copy , on_copy_done ):
13871386 if self .closed_:
13881387 on_copy_done(CopyResult.CLOSED )
13891388 elif not self .pending_buffer:
1390- self .set_pending(inst, buffer , on_copy, on_copy_done)
1389+ self .set_pending(inst, dst_buffer , on_copy, on_copy_done)
13911390 else :
1392- assert (self .t == src .t == dst .t)
1391+ assert (self .t == dst_buffer .t == self .pending_buffer .t)
13931392 trap_if(inst is self .pending_inst and self .t is not None ) # temporary
13941393 if self .pending_buffer.remain() > 0 :
1395- if buffer.remain() > 0 :
1396- dst.write(src.read(min (src.remain(), dst.remain())))
1394+ if dst_buffer.remain() > 0 :
1395+ n = min (dst_buffer.remain(), self .pending_buffer.remain())
1396+ dst_buffer.write(self .pending_buffer.read(n))
13971397 self .pending_on_copy(self .reset_pending)
13981398 on_copy_done(CopyResult.COMPLETED )
1399- elif buffer is src and buffer.remain() == 0 and self .pending_buffer.is_zero_length():
1400- on_copy_done(CopyResult.COMPLETED )
14011399 else :
14021400 self .reset_and_notify_pending(CopyResult.COMPLETED )
1403- self .set_pending(inst, buffer , on_copy, on_copy_done)
1401+ self .set_pending(inst, dst_buffer , on_copy, on_copy_done)
14041402```
14051403Currently, there is a trap when both the ` read ` and ` write ` come from the same
14061404component instance and there is a non-empty element type. This trap will be
@@ -1409,25 +1407,52 @@ and lowering can alias the same memory, interleavings can be complex and must
14091407be handled carefully. Future improvements to the Canonical ABI ([ lazy lowering] )
14101408can greatly simplify this interleaving and be more practical to implement.
14111409
1412- The meaning of a ` read ` or ` write ` when the length is ` 0 ` is that the caller is
1413- querying the "readiness" of the other side. When a ` 0 ` -length read/write
1414- rendezvous with a non-` 0 ` -length read/write, only the ` 0 ` -length read/write
1415- completes; the non-` 0 ` -length read/write is kept pending (and ready for a
1416- subsequent rendezvous).
1417-
1418- In the corner case where a ` 0 ` -length read * and* write rendezvous, only the
1419- * writer* is notified of readiness. To avoid livelock, the Canonical ABI
1420- requires that a writer * must* (eventually) follow a completed ` 0 ` -length write
1421- with a non-` 0 ` -length write that is allowed to block (allowing the reader end
1422- to run and rendezvous with its own non-` 0 ` -length read). To implement a
1423- traditional ` O_NONBLOCK ` ` write() ` or ` sendmsg() ` API, a writer can use a
1424- buffering scheme in which, after ` select() ` (or a similar API) signals a file
1425- descriptor is ready to write, the next ` O_NONBLOCK ` ` write() ` /` sendmsg() ` on
1426- that file descriptor copies to an internal buffer and suceeds, issuing an
1427- ` async ` ` stream.write ` in the background and waiting for completion before
1428- signalling readiness again. Note that buffering only occurs when streaming
1429- between two components using non-blocking I/O; if either side is the host or a
1430- component using blocking or completion-based I/O, no buffering is necessary.
1410+ The ` write ` method is symmetric to ` read ` (being given a ` ReadableBuffer `
1411+ instead of a ` WritableBuffer ` ) and is called by the ` stream.write ` built-in.
1412+ (noting that the host cannot be passed the writable end of a stream but may
1413+ instead * implement* the ` ReadableStream ` interface and pass the readable end
1414+ into a component). The steps for ` write ` are the same as ` read ` except for
1415+ when a zero-length ` write ` rendezvous with a zero-length ` read ` , in which case
1416+ the ` write ` eagerly completes, leaving the ` read ` pending:
1417+ ``` python
1418+ def write (self , inst , src_buffer , on_copy , on_copy_done ):
1419+ if self .closed_:
1420+ on_copy_done(CopyResult.CLOSED )
1421+ elif not self .pending_buffer:
1422+ self .set_pending(inst, src_buffer, on_copy, on_copy_done)
1423+ else :
1424+ assert (self .t == src_buffer.t == self .pending_buffer.t)
1425+ trap_if(inst is self .pending_inst and self .t is not None ) # temporary
1426+ if self .pending_buffer.remain() > 0 :
1427+ if src_buffer.remain() > 0 :
1428+ n = min (src_buffer.remain(), self .pending_buffer.remain())
1429+ self .pending_buffer.write(src_buffer.read(n))
1430+ self .pending_on_copy(self .reset_pending)
1431+ on_copy_done(CopyResult.COMPLETED )
1432+ elif src_buffer.is_zero_length() and self .pending_buffer.is_zero_length():
1433+ on_copy_done(CopyResult.COMPLETED )
1434+ else :
1435+ self .reset_and_notify_pending(CopyResult.COMPLETED )
1436+ self .set_pending(inst, src_buffer, on_copy, on_copy_done)
1437+ ```
1438+ Putting together the behavior of zero-length ` read ` and ` write ` above, we can
1439+ see that, when * both* the reader and writer are zero-length, regardless of who
1440+ was first, the zero-length ` write ` always completes, leaving the zero-length
1441+ ` read ` pending. To avoid livelock, the Canonical ABI requires that a writer
1442+ * must* (eventually) follow a completed zero-length ` write ` with a
1443+ non-zero-length ` write ` that is allowed to block. This will break the loop,
1444+ notifying the reader end and allowing it to rendezvous with a non-zero-length
1445+ ` read ` and make progress. Based on this rule, to implement a traditional
1446+ ` O_NONBLOCK ` ` write() ` or ` sendmsg() ` API, a writer can use a buffering scheme
1447+ in which, after ` select() ` (or a similar API) signals a file descriptor is
1448+ ready to write, the next ` O_NONBLOCK ` ` write() ` /` sendmsg() ` on that file
1449+ descriptor copies to an internal buffer and suceeds, issuing an ` async `
1450+ ` stream.write ` in the background and waiting for completion before signalling
1451+ readiness again. Note that buffering only occurs when streaming between two
1452+ components using non-blocking I/O; if either side is the host or a component
1453+ using blocking or completion-based I/O, no buffering is necessary. This
1454+ buffering is analogous to the buffering performed in kernel memory by a
1455+ ` pipe() ` .
14311456
14321457Given the above, we can define the ` {Readable,Writable}StreamEnd ` classes that
14331458are actually stored in the component instance table. The classes are almost
0 commit comments