GitHub user mdedetrich edited a comment on the discussion: 
`StreamConverters.asInputStream` not workable when using broadcast with 
`ByteString` source?

> IIUC your goal is to stay in streaming, to take `ByteString` elements, 
> accumulate them until `Tika().detect` returns and then push the output 
> downstream.

No thats not the goal, `InputStream` is itself a streaming interface which 
means that when its passed into `Tika().detect(inputStream)` it will take 
chunks of bytes out of `InputStream` one at a time.

> `StreamConverters.asInputStream` is a sink, so its goal is to materialize the 
> stream into a result, so that is not about staying in streaming.
 
I don't believe thats what its doing. While it may be a `Sink`, its not 
consuming all of the elements on materialization, rather its creating a new 
`InputStream` from the `Source[ByteString, _]` that is being fed into it. You 
can confirm this by looking at the implementation at 
https://github.com/apache/pekko/blob/3aacafe4774ae4cad3dad7acd970f675ef702be1/stream/src/main/scala/org/apache/pekko/stream/impl/io/InputStreamSinkStage.scala#L53.

Furthermore if it would just immediately consume the entire stream (as you 
state) before `inputStream.read()` I don't think I would even have these 
issues, as I think the underlying problem is that demand is not being triggered 
to start the `InputStream` (which is why the inputStream.close() is solving the 
deadlock issue as it triggers consumption but that in of-itself is causing 
other problems)

If I am right, it can be argued that the `Sink[ByteString, InputStream]` is a 
bit misleading as its not going to consume all of the stream elements upon 
materialization (this would only happen when you start consuming elements from 
the `InputStream` using `inputStream.read()`) but I do think this is done 
intentionally to prevent other issues, namely the fact that `InputStream` is 
blocking.

Like you could have implemented `StreamConverters.asInputStream` as a 
`Flow[ByteString, InputStream, _]` but doing so would make it incredibly easy 
for users to unintentionally block when reading from `InputStream` moreso than 
the current design. While both abstractions represent source streams 
(`Source`/`InputStream`) they are implemented in very different ways hence the 
usage of a `Sink`

@raboof @jrudolph Maybe you can shed some light on this, i.e. 
`StreamConverters.asInputStream` the right tool to use and/or does the fact 
that `StreamConverters.asInputStream` is implemented as a `Sink` mean that it 
eagerly consume the entire stream instead of keeping the streaming semantics?

> I am just suggesting an alternative solution, if you want to improve/fix 
> `StreamConverters.asInputStream` then this discussion can be about that, it 
> just was not clear to me whether you want to improve/fix 
> `StreamConverters.asInputStream` or just solve your task.

Well its both, but the critical question is whether it is a bug or I am doing 
something wrong (hence why I initially made a discussion). I would say one 
thing, if `StreamConverters.asInputStream` does as you say (eagerly consume the 
`Source` upon materialization) then I would say that this function is 
misleading/bordering on useless as the entire point of using `InputStream` is 
for it to be a stream (its even in the name).

> Maybe accumulate is not the correct term - I mean that you want to take 
> `ByteStream` elements, pass them to the `InputStream` passed to 
> `Tika().detect` until it returns, and then push the result downstream.

Yes that is what I am wanting to do

GitHub link: 
https://github.com/apache/pekko/discussions/1807#discussioncomment-12591932

----
This is an automatically sent email for notifications@pekko.apache.org.
To unsubscribe, please send an email to: 
notifications-unsubscr...@pekko.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org
For additional commands, e-mail: notifications-h...@pekko.apache.org

Reply via email to