GitHub user mdedetrich edited a comment on the discussion: `StreamConverters.asInputStream` not workable when using broadcast with `ByteString` source?
> IIUC your goal is to stay in streaming, to take `ByteString` elements, > accumulate them until `Tika().detect` returns and then push the output > downstream. No thats not the goal, `InputStream` is itself a streaming interface which means that when its passed into `Tika().detect(inputStream)` it will take chunks of bytes out of `InputStream` one at a time. > `StreamConverters.asInputStream` is a sink, so its goal is to materialize the > stream into a result, so that is not about staying in streaming. I don't believe thats what its doing. While it may be a `Sink`, its not consuming all of the elements on materialization, rather its creating a new `InputStream` from the `Source[ByteString, _]` that is being fed into it. You can confirm this by looking at the implementation at https://github.com/apache/pekko/blob/3aacafe4774ae4cad3dad7acd970f675ef702be1/stream/src/main/scala/org/apache/pekko/stream/impl/io/InputStreamSinkStage.scala#L53. Furthermore if it would just immediately consume the entire stream (as you state) before `inputStream.read()` I don't think I would even have these issues, as I think the underlying problem is that demand is not being triggered to start the `InputStream` (which is why the inputStream.close() is solving the deadlock issue as it triggers consumption but that in of-itself is causing other problems) If I am right, it can be argued that the `Sink[ByteString, InputStream]` is a bit misleading as its not going to consume all of the stream elements upon materialization (this would only happen when you start consuming elements from the `InputStream` using `inputStream.read()`) but I do think this is done intentionally to prevent other issues, namely the fact that `InputStream` is blocking. Like you could have implemented `StreamConverters.asInputStream` as a `Flow[ByteString, InputStream, _]` but doing so would make it incredibly easy for users to unintentionally block when reading from `InputStream` moreso than the current design. While both abstractions represent source streams (`Source`/`InputStream`) they are implemented in very different ways hence the usage of a `Sink` @raboof @jrudolph Maybe you can shed some light on this, i.e. `StreamConverters.asInputStream` the right tool to use and/or does the fact that `StreamConverters.asInputStream` is implemented as a `Sink` mean that it eagerly consume the entire stream instead of keeping the streaming semantics? > I am just suggesting an alternative solution, if you want to improve/fix > `StreamConverters.asInputStream` then this discussion can be about that, it > just was not clear to me whether you want to improve/fix > `StreamConverters.asInputStream` or just solve your task. Well its both, but the critical question is whether it is a bug or I am doing something wrong (hence why I initially made a discussion). I would say one thing, if `StreamConverters.asInputStream` does as you say (eagerly consume the `Source` upon materialization) then I would say that this function is misleading/bordering on useless as the entire point of using `InputStream` is for it to be a stream (its even in the name). > Maybe accumulate is not the correct term - I mean that you want to take > `ByteStream` elements, pass them to the `InputStream` passed to > `Tika().detect` until it returns, and then push the result downstream. Yes that is what I am wanting to do GitHub link: https://github.com/apache/pekko/discussions/1807#discussioncomment-12591932 ---- This is an automatically sent email for notifications@pekko.apache.org. To unsubscribe, please send an email to: notifications-unsubscr...@pekko.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org For additional commands, e-mail: notifications-h...@pekko.apache.org