GitHub user mdedetrich edited a discussion: `StreamConverters.asInputStream` 
not workable when using broadcast with `ByteString` source?

Currently I have this task where I have a `Source` (which is a `ByteString`) 
and which I want to fan out via a `Broadcast` to the other flows. One flow goes 
directly into a Sink (which I will call an upload sink) and the other flow 
needs to be needs to convert the `ByteString` `Source` to an Java `InputStream` 
since its going to an external API that only accepts `InputStream`, 
specifically this is Apache tika which has a detect function as follows

```java
    /**
     * Detects the media type of the given document. The type detection is
     * based on the content of the given document stream.
     * <p>
     * If the document stream supports the
     * {@link InputStream#markSupported() mark feature}, then the stream is
     * marked and reset to the original position before this method returns.
     * Only a limited number of bytes are read from the stream.
     * <p>
     * The given document stream is <em>not</em> closed by this method.
     *
     * @param stream the document stream
     * @return detected media type
     * @throws IOException if the stream can not be read
     */
    public String detect(InputStream stream) throws IOException {
        return detect(stream, new Metadata());
    }
```

(note the fact that this function does **NOT** close the `InputStream`, you 
have to do this yourself)

Pekko Streams provides a `StreamConverters.asInputStream()` function which is a 
`Sink[ByteString, InputStream]`. As discussed at 
https://github.com/akka/akka/issues/23187#issuecomment-317646764 and noted in 
the docs, you need to be careful when using the materialized `InputStream` as 
its a blocking interface, which means that if you do something like this

```kotlin
StreamConverters.asInputStream().mapMaterializedValue { inputStream ->
  Tika().detect(inputStream)
}
```

It will create a deadlock as materialized values need to return immediately and 
the `Tika().detect(inputStream)` call will block. Due to this its advised that 
you materialize the `StreamConverters.asInputStream()` stream immediately to 
get the `InputStream` and then use the `InputStream` as desired.

My current predicament is that while this works if you have a single trivial 
flow, when you have a broadcast style fanout it causes issues one way or 
another. For example one way to solve this issue would be as follows

```kotlin
/**
 * Uploads a file while at the same time computing the media type
 */
fun <Mat> computeMediaTypeAndUpload(
    source: Source<ByteString, *>,
    uploadSink: Sink<ByteString, CompletionStage<Mat>>
): Source<Pair<Optional<String>, Mat>, CompletionStage<NotUsed>> {
    return Source.fromMaterializer { mat, attr ->
        /**
         * Due to InputStream being blocking, we CANNOT immediately read from
         * the InputStream as it will cause a deadlock/timeout error, see
         * https://github.com/akka/akka/issues/23187#issuecomment-317646764
         *
         * To get around this, we run the sink immediately to get the 
InputStream
         * and then we process it at a later stage.
         */
        val inputStreamWithMat = 
source.alsoToMat(StreamConverters.asInputStream(), Keep.right())
            .toMat(uploadSink, Keep.both()).withAttributes(attr).run(mat)

        val inputStream = inputStreamWithMat.first()

        val blockingIODispatcher =
            
mat.system().dispatchers().lookup(Dispatchers.DefaultBlockingDispatcherId())

        Source.lazyCompletionStage {
            val computedFileMediaTypeFuture = CompletableFuture.supplyAsync({
                try {
                    Optional.ofNullable(Tika().detect(inputStream))
                } catch (ex: IOException) {
                    logger.error("Error detecting media type", ex)
                    Optional.empty()
                }
            }, blockingIODispatcher)

            val matFuture = inputStreamWithMat.second().toCompletableFuture()

            CompletableFuture.allOf(
                computedFileMediaTypeFuture, matFuture
            ).thenApply {
                Pair(computedFileMediaTypeFuture.join(), matFuture.join())
            }
        }
    }
}
```

The premise here is to use a Java `CompletableFuture` to wrap the 
`Tika().detect(inputStream)` computation so that `.mapMaterializedValue` 
returns immediately. Doing this however still deadlocks, to avoid this one can 
change

```kotlin
try {
    Optional.ofNullable(Tika().detect(inputStream))
} catch (ex: IOException) {
    logger.error("Error detecting media type with email id: $emailId", ex)
    Optional.empty()
}
```

to

```kotlin
try {
    Optional.ofNullable(Tika().detect(inputStream))
} catch (ex: IOException) {
    logger.error("Error detecting media type with email id: $emailId", ex)
    Optional.empty()
} finally {
    inputStream.close()
}
```

The `inputStream.close()` appears to force demand on the stream, fixing the 
deadlock however doing this causes other issues, namely that this exception 
ends up being thrown.

```
java.util.concurrent.CompletionException: 
org.apache.pekko.stream.SubscriptionWithCancelException$NoMoreElementsNeeded$   
                                                                                
              at 
java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown 
Source)
at java.base/java.util.concurrent.CompletableFuture$BiRelay.tryFire(Unknown 
Source)
at 
java.base/java.util.concurrent.CompletableFuture$CoCompletion.tryFire(Unknown 
Source)
at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
at 
java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown 
Source)
at 
scala.concurrent.impl.FutureConvertersImpl$CF.apply(FutureConvertersImpl.scala:26)
at 
scala.concurrent.impl.FutureConvertersImpl$CF.apply(FutureConvertersImpl.scala:23)
at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:484)
at 
scala.concurrent.ExecutionContext$parasitic$.execute(ExecutionContext.scala:222)
at 
scala.concurrent.impl.Promise$Transformation.submitWithValue(Promise.scala:429)
```

With this error the logic of the stream still works, but it appears that Pekko 
Streams cannot properly handle eagerly closing the `InputStream` (even though 
calling `inputStream.close() multiple times is perfectly valid). More 
importantly, doing `inputStream.close()` like this is a workaround because as 
described in the docs

```scala
  /**
   * Creates a Sink which when materialized will return an 
[[java.io.InputStream]] which it is possible
   * to read the values produced by the stream this Sink is attached to.
   *
   * This method uses a default read timeout, use 
[[#inputStream(FiniteDuration)]] or [[#inputStream(java.time.Duration)]] to 
explicitly
   * configure the timeout.
   *
   * This Sink is intended for inter-operation with legacy APIs since it is 
inherently blocking.
   *
   * You can configure the internal buffer size by using 
[[pekko.stream.ActorAttributes]].
   *
   * The [[InputStream]] will be closed when the stream flowing into this 
[[Sink]] completes, and
   * closing the [[InputStream]] will cancel this [[Sink]].
   */
  def asInputStream(): Sink[ByteString, InputStream] = new 
Sink(scaladsl.StreamConverters.asInputStream())
  ```

Pekko Streams is meant to handle the resource cleanup of the underlying stream 
and so you shouldn't have to call `.close()` like this manually.

The thing is I have tried various different implementations of this (i.e. using 
`preMaterialize`, custom graph with `GraphDSL` + `Broadcast`, 
`Flow.fromSinkAndSource`, `fromMaterializer` etc etc) and nothing seems to have 
solve the underlying issue. Either you have a deadlock (if you don't do 
`inputStream.close()` or if you do have `inputStream.close()` then you get the 
`SubscriptionWithCancelException$NoMoreElementsNeeded` exception (note that I 
haven't been able to reproduce this exception with a trivial example locally 
but it occurs in production when `computeMediaTypeAndUpload` is composed as 
part of a bigger stream).

Of course one way to "solve" the problem is to just do

```kotlin
source.alsoToMat(uploadSink, Keep.right())
  .toMat(Sink.fold(ByteString.emptyByteString()) { a, b -> a.concat(b) }, 
Keep.both())
  .run(system)
```

But this basically kills the streaming flow into Apache Tika, as you are just 
evaluate the entire stream into a single large `String` where as my aim is to 
make sure that both flows in `BroadCast` are streaming based.

GitHub link: https://github.com/apache/pekko/discussions/1807

----
This is an automatically sent email for notifications@pekko.apache.org.
To unsubscribe, please send an email to: 
notifications-unsubscr...@pekko.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org
For additional commands, e-mail: notifications-h...@pekko.apache.org

Reply via email to