gbrd opened a new pull request, #1475:
URL: https://github.com/apache/pekko-connectors/pull/1475

   
   ## Problem
   
   `Archive.tarReader()` could crash with the following error when processing 
tar archives from an async upstream source (e.g. S3, network streams):
   
   ```
   java.lang.IllegalArgumentException: Cannot push port 
(SubSourceOutlet(fileOut)) twice,
     or before it being pulled
     at 
org.apache.pekko.stream.stage.GraphStageLogic$SubSourceOutlet.push(GraphStage.scala:1561)
     at ...TarReaderStage$CollectFile.subPush(TarReaderStage.scala:222)
     at ...TarReaderStage$CollectFile.onPush(TarReaderStage.scala:228)
   ```
   
   ## Root Cause
   
   In `CollectFile`, `onPush()` called `subPush()` unconditionally, which in 
turn called
   `subSource.push()`. However, `subSource.push()` requires a prior `pull()` 
from downstream
   before it can be called. When upstream data arrived before the downstream 
consumer had
   subscribed to and pulled the `subSource` outlet, this violated Pekko's 
back-pressure
   contract and caused the crash.
   
   ## Fix
   
   Three changes in `CollectFile`:
   
   1. **`onPush()`** now checks `subSource.isAvailable` before pushing. If the 
subSource has
      not yet been pulled by downstream, the incoming data is accumulated in 
the existing
      `buffer` field and will be flushed on the next `subSource.onPull()`.
   
   2. **`subSource.onPull()`** after flushing the buffer, if more file content 
is still
      expected (`emitted < metadata.size`), `tryPullIfNeeded()` is now called 
to unblock
      the upstream.
   
   3. **`subPush()`** now correctly increments `emitted` and clears `buffer` 
before handing
      off to `readTrailer()`, fixing a potential accounting inconsistency on 
the last chunk
      of a file.
   
   ## Test
   
   A functional regression test `"handle chunked delivery of a two-file 
archive"` is added
   to `TarArchiveSpec`. It verifies correct output when a two-file tar archive 
is delivered
   in 32-byte chunks, exercising all transitions between `CollectHeader`, 
`CollectFile`, and
   `ReadPastTrailer` across multiple upstream pushes — the scenario that 
triggered the crash
   in production.
   
   > Note: the crash itself depends on interpreter-internal timing (whether 
`flowOut` is
   > available at the exact moment `ReadPastTrailer` transitions to the next 
file) which
   > cannot be controlled deterministically from a unit test. The fix addresses 
the root
   > cause regardless of timing.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to