gbrd opened a new pull request, #1475:
URL: https://github.com/apache/pekko-connectors/pull/1475
## Problem
`Archive.tarReader()` could crash with the following error when processing
tar archives from an async upstream source (e.g. S3, network streams):
```
java.lang.IllegalArgumentException: Cannot push port
(SubSourceOutlet(fileOut)) twice,
or before it being pulled
at
org.apache.pekko.stream.stage.GraphStageLogic$SubSourceOutlet.push(GraphStage.scala:1561)
at ...TarReaderStage$CollectFile.subPush(TarReaderStage.scala:222)
at ...TarReaderStage$CollectFile.onPush(TarReaderStage.scala:228)
```
## Root Cause
In `CollectFile`, `onPush()` called `subPush()` unconditionally, which in
turn called
`subSource.push()`. However, `subSource.push()` requires a prior `pull()`
from downstream
before it can be called. When upstream data arrived before the downstream
consumer had
subscribed to and pulled the `subSource` outlet, this violated Pekko's
back-pressure
contract and caused the crash.
## Fix
Three changes in `CollectFile`:
1. **`onPush()`** now checks `subSource.isAvailable` before pushing. If the
subSource has
not yet been pulled by downstream, the incoming data is accumulated in
the existing
`buffer` field and will be flushed on the next `subSource.onPull()`.
2. **`subSource.onPull()`** after flushing the buffer, if more file content
is still
expected (`emitted < metadata.size`), `tryPullIfNeeded()` is now called
to unblock
the upstream.
3. **`subPush()`** now correctly increments `emitted` and clears `buffer`
before handing
off to `readTrailer()`, fixing a potential accounting inconsistency on
the last chunk
of a file.
## Test
A functional regression test `"handle chunked delivery of a two-file
archive"` is added
to `TarArchiveSpec`. It verifies correct output when a two-file tar archive
is delivered
in 32-byte chunks, exercising all transitions between `CollectHeader`,
`CollectFile`, and
`ReadPastTrailer` across multiple upstream pushes — the scenario that
triggered the crash
in production.
> Note: the crash itself depends on interpreter-internal timing (whether
`flowOut` is
> available at the exact moment `ReadPastTrailer` transitions to the next
file) which
> cannot be controlled deterministically from a unit test. The fix addresses
the root
> cause regardless of timing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]