GitHub user mdedetrich edited a discussion: `StreamConverters.asInputStream` not workable when using broadcast with `ByteString` source?
Currently I have this task where I have a `Source` (which is a `ByteString`) and which I want to fan out via a `Broadcast` to the other flows. One flow goes directly into a Sink (which I will call an upload sink) and the other flow needs to be needs to convert the `ByteString` `Source` to an Java `InputStream` since its going to an external API that only accepts `InputStream`, specifically this is Apache tika which has a detect function as follows ```java /** * Detects the media type of the given document. The type detection is * based on the content of the given document stream. * <p> * If the document stream supports the * {@link InputStream#markSupported() mark feature}, then the stream is * marked and reset to the original position before this method returns. * Only a limited number of bytes are read from the stream. * <p> * The given document stream is <em>not</em> closed by this method. * * @param stream the document stream * @return detected media type * @throws IOException if the stream can not be read */ public String detect(InputStream stream) throws IOException { return detect(stream, new Metadata()); } ``` (note the fact that this function does **NOT** close the `InputStream`, you have to do this yourself) Pekko Streams provides a `StreamConverters.asInputStream()` function which is a `Sink[ByteString, InputStream]`. As discussed at https://github.com/akka/akka/issues/23187#issuecomment-317646764 and noted in the docs, you need to be careful when using the materialized `InputStream` as its a blocking interface, which means that if you do something like this ```kotlin StreamConverters.asInputStream().mapMaterializedValue { inputStream -> Tika().detect(inputStream) } ``` It will create a deadlock as materialized values need to return immediately and the `Tika().detect(inputStream)` call will block. Due to this its advised that you materialize the `StreamConverters.asInputStream()` stream immediately to get the `InputStream` and then use the `InputStream` as desired. My current predicament is that while this works if you have a single trivial flow, when you have a broadcast style fanout it causes issues one way or another. For example one way to solve this issue would be as follows ```kotlin /** * Uploads a file while at the same time computing the media type */ fun <Mat> computeMediaTypeAndUpload( source: Source<ByteString, *>, uploadSink: Sink<ByteString, CompletionStage<Mat>> ): Source<Pair<Optional<String>, Mat>, CompletionStage<NotUsed>> { return Source.fromMaterializer { mat, attr -> /** * Due to InputStream being blocking, we CANNOT immediately read from * the InputStream as it will cause a deadlock/timeout error, see * https://github.com/akka/akka/issues/23187#issuecomment-317646764 * * To get around this, we run the sink immediately to get the InputStream * and then we process it at a later stage. */ val inputStreamWithMat = source.alsoToMat(StreamConverters.asInputStream(), Keep.right()) .toMat(uploadSink, Keep.both()).withAttributes(attr).run(mat) val inputStream = inputStreamWithMat.first() val blockingIODispatcher = mat.system().dispatchers().lookup(Dispatchers.DefaultBlockingDispatcherId()) Source.lazyCompletionStage { val computedFileMediaTypeFuture = CompletableFuture.supplyAsync({ try { Optional.ofNullable(Tika().detect(inputStream)) } catch (ex: IOException) { logger.error("Error detecting media type", ex) Optional.empty() } }, blockingIODispatcher) val matFuture = inputStreamWithMat.second().toCompletableFuture() CompletableFuture.allOf( computedFileMediaTypeFuture, matFuture ).thenApply { Pair(computedFileMediaTypeFuture.join(), matFuture.join()) } } } } ``` The premise here is to use a Java `CompletableFuture` to wrap the `Tika().detect(inputStream)` computation so that `.mapMaterializedValue` returns immediately. Doing this however still deadlocks, to avoid this one can change ```kotlin try { Optional.ofNullable(Tika().detect(inputStream)) } catch (ex: IOException) { logger.error("Error detecting media type with email id: $emailId", ex) Optional.empty() } ``` to ```kotlin try { Optional.ofNullable(Tika().detect(inputStream)) } catch (ex: IOException) { logger.error("Error detecting media type with email id: $emailId", ex) Optional.empty() } finally { inputStream.close() } ``` The `inputStream.close()` appears to force demand on the stream, fixing the deadlock however doing this causes other issues, namely that this exception ends up being thrown. ``` java.util.concurrent.CompletionException: org.apache.pekko.stream.SubscriptionWithCancelException$NoMoreElementsNeeded$ at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) at java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) at java.base/java.util.concurrent.CompletableFuture$BiRelay.tryFire(Unknown Source) at java.base/java.util.concurrent.CompletableFuture$CoCompletion.tryFire(Unknown Source) at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source) at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source) at scala.concurrent.impl.FutureConvertersImpl$CF.apply(FutureConvertersImpl.scala:26) at scala.concurrent.impl.FutureConvertersImpl$CF.apply(FutureConvertersImpl.scala:23) at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:484) at scala.concurrent.ExecutionContext$parasitic$.execute(ExecutionContext.scala:222) at scala.concurrent.impl.Promise$Transformation.submitWithValue(Promise.scala:429) ``` With this error the logic of the stream still works, but it appears that Pekko Streams cannot properly handle eagerly closing the `InputStream` (even though calling `inputStream.close() multiple times is perfectly valid). More importantly, doing `inputStream.close()` like this is a workaround because as described in the docs ```scala /** * Creates a Sink which when materialized will return an [[java.io.InputStream]] which it is possible * to read the values produced by the stream this Sink is attached to. * * This method uses a default read timeout, use [[#inputStream(FiniteDuration)]] or [[#inputStream(java.time.Duration)]] to explicitly * configure the timeout. * * This Sink is intended for inter-operation with legacy APIs since it is inherently blocking. * * You can configure the internal buffer size by using [[pekko.stream.ActorAttributes]]. * * The [[InputStream]] will be closed when the stream flowing into this [[Sink]] completes, and * closing the [[InputStream]] will cancel this [[Sink]]. */ def asInputStream(): Sink[ByteString, InputStream] = new Sink(scaladsl.StreamConverters.asInputStream()) ``` Pekko Streams is meant to handle the resource cleanup of the underlying stream and so you shouldn't have to call `.close()` like this manually. The thing is I have tried various different implementations of this (i.e. using `preMaterialize`, custom graph with `GraphDSL` + `Broadcast`, `Flow.fromSinkAndSource`, `fromMaterializer` etc etc) and nothing seems to have solve the underlying issue. Either you have a deadlock (if you don't do `inputStream.close()` or if you do have `inputStream.close()` then you get the `SubscriptionWithCancelException$NoMoreElementsNeeded` exception (note that I haven't been able to reproduce this exception with a trivial example locally but it occurs in production when `computeMediaTypeAndUpload` is composed as part of a bigger stream). Of course one way to "solve" the problem is to just do ```kotlin source.alsoToMat(uploadSink, Keep.right()) .toMat(Sink.fold(ByteString.emptyByteString()) { a, b -> a.concat(b) }, Keep.both()) .run(system) ``` But this basically kills the streaming flow into Apache Tika, as you are just evaluate the entire stream into a single large `String` where as my aim is to make sure that both flows in `BroadCast` are streaming based. GitHub link: https://github.com/apache/pekko/discussions/1807 ---- This is an automatically sent email for notifications@pekko.apache.org. To unsubscribe, please send an email to: notifications-unsubscr...@pekko.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org For additional commands, e-mail: notifications-h...@pekko.apache.org