GitHub user mdedetrich edited a discussion: `StreamConverters.asInputStream`
not workable when using broadcast with `ByteString` source?
Currently I have this task where I have a `Source` (which is a `ByteString`)
and which I want to fan out via a `Broadcast` to the other flows. One flow goes
directly into a Sink (which I will call an upload sink) and the other flow
needs to be needs to convert the `ByteString` `Source` to an Java `InputStream`
since its going to an external API that only accepts `InputStream`,
specifically this is Apache tika which has a detect function as follows
```java
/**
* Detects the media type of the given document. The type detection is
* based on the content of the given document stream.
* <p>
* If the document stream supports the
* {@link InputStream#markSupported() mark feature}, then the stream is
* marked and reset to the original position before this method returns.
* Only a limited number of bytes are read from the stream.
* <p>
* The given document stream is <em>not</em> closed by this method.
*
* @param stream the document stream
* @return detected media type
* @throws IOException if the stream can not be read
*/
public String detect(InputStream stream) throws IOException {
return detect(stream, new Metadata());
}
```
(note the fact that this function does **NOT** close the `InputStream`, you
have to do this yourself)
Pekko Streams provides a `StreamConverters.asInputStream()` function which is a
`Sink[ByteString, InputStream]`. As discussed at
https://github.com/akka/akka/issues/23187#issuecomment-317646764 and noted in
the docs, you need to be careful when using the materialized `InputStream` as
its a blocking interface, which means that if you do something like this
```kotlin
StreamConverters.asInputStream().mapMaterializedValue { inputStream ->
Tika().detect(inputStream)
}
```
It will create a deadlock as materialized values need to return immediately and
the `Tika().detect(inputStream)` call will block. Due to this its advised that
you materialize the `StreamConverters.asInputStream()` stream immediately to
get the `InputStream` and then use the `InputStream` as desired.
My current predicament is that while this works if you have a single trivial
flow, when you have a broadcast style fanout it causes issues one way or
another. For example one way to solve this issue would be as follows
```kotlin
/**
* Uploads a file while at the same time computing the media type
*/
fun <Mat> computeMediaTypeAndUpload(
source: Source<ByteString, *>,
uploadSink: Sink<ByteString, CompletionStage<Mat>>
): Source<Pair<Optional<String>, Mat>, CompletionStage<NotUsed>> {
return Source.fromMaterializer { mat, attr ->
/**
* Due to InputStream being blocking, we CANNOT immediately read from
* the InputStream as it will cause a deadlock/timeout error, see
* https://github.com/akka/akka/issues/23187#issuecomment-317646764
*
* To get around this, we run the sink immediately to get the
InputStream
* and then we process it at a later stage.
*/
val inputStreamWithMat =
source.alsoToMat(StreamConverters.asInputStream(), Keep.right())
.toMat(uploadSink, Keep.both()).withAttributes(attr).run(mat)
val inputStream = inputStreamWithMat.first()
val blockingIODispatcher =
mat.system().dispatchers().lookup(Dispatchers.DefaultBlockingDispatcherId())
Source.lazyCompletionStage {
val computedFileMediaTypeFuture = CompletableFuture.supplyAsync({
try {
Optional.ofNullable(Tika().detect(inputStream))
} catch (ex: IOException) {
logger.error("Error detecting media type", ex)
Optional.empty()
}
}, blockingIODispatcher)
val matFuture = inputStreamWithMat.second().toCompletableFuture()
CompletableFuture.allOf(
computedFileMediaTypeFuture, matFuture
).thenApply {
Pair(computedFileMediaTypeFuture.join(), matFuture.join())
}
}
}
}
```
The premise here is to use a Java `CompletableFuture` to wrap the
`Tika().detect(inputStream)` computation so that `.mapMaterializedValue`
returns immediately. Doing this however still deadlocks, to avoid this one can
change
```kotlin
try {
Optional.ofNullable(Tika().detect(inputStream))
} catch (ex: IOException) {
logger.error("Error detecting media type with email id: $emailId", ex)
Optional.empty()
}
```
to
```kotlin
try {
Optional.ofNullable(Tika().detect(inputStream))
} catch (ex: IOException) {
logger.error("Error detecting media type with email id: $emailId", ex)
Optional.empty()
} finally {
inputStream.close()
}
```
The `inputStream.close()` appears to force demand on the stream, fixing the
deadlock however doing this causes other issues, namely that this exception
ends up being thrown.
```
java.util.concurrent.CompletionException:
org.apache.pekko.stream.SubscriptionWithCancelException$NoMoreElementsNeeded$
at
java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown
Source)
at java.base/java.util.concurrent.CompletableFuture$BiRelay.tryFire(Unknown
Source)
at
java.base/java.util.concurrent.CompletableFuture$CoCompletion.tryFire(Unknown
Source)
at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
at
java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown
Source)
at
scala.concurrent.impl.FutureConvertersImpl$CF.apply(FutureConvertersImpl.scala:26)
at
scala.concurrent.impl.FutureConvertersImpl$CF.apply(FutureConvertersImpl.scala:23)
at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:484)
at
scala.concurrent.ExecutionContext$parasitic$.execute(ExecutionContext.scala:222)
at
scala.concurrent.impl.Promise$Transformation.submitWithValue(Promise.scala:429)
```
With this error the logic of the stream still works, but it appears that Pekko
Streams cannot properly handle eagerly closing the `InputStream` (even though
calling `inputStream.close() multiple times is perfectly valid). More
importantly, doing `inputStream.close()` like this is a workaround because as
described in the docs
```scala
/**
* Creates a Sink which when materialized will return an
[[java.io.InputStream]] which it is possible
* to read the values produced by the stream this Sink is attached to.
*
* This method uses a default read timeout, use
[[#inputStream(FiniteDuration)]] or [[#inputStream(java.time.Duration)]] to
explicitly
* configure the timeout.
*
* This Sink is intended for inter-operation with legacy APIs since it is
inherently blocking.
*
* You can configure the internal buffer size by using
[[pekko.stream.ActorAttributes]].
*
* The [[InputStream]] will be closed when the stream flowing into this
[[Sink]] completes, and
* closing the [[InputStream]] will cancel this [[Sink]].
*/
def asInputStream(): Sink[ByteString, InputStream] = new
Sink(scaladsl.StreamConverters.asInputStream())
```
Pekko Streams is meant to handle the resource cleanup of the underlying stream
and so you shouldn't have to call `.close()` like this manually.
The thing is I have tried various different implementations of this (i.e. using
`preMaterialize`, custom graph with `GraphDSL` + `Broadcast`,
`Flow.fromSinkAndSource`, `fromMaterializer` etc etc) and nothing seems to have
solve the underlying issue. Either you have a deadlock (if you don't do
`inputStream.close()` or if you do have `inputStream.close()` then you get the
`SubscriptionWithCancelException$NoMoreElementsNeeded` exception (note that I
haven't been able to reproduce this exception with a trivial example locally
but it occurs in production when `computeMediaTypeAndUpload` is composed as
part of a bigger stream).
Of course one way to "solve" the problem is to just do
```kotlin
source.alsoToMat(uploadSink, Keep.right())
.toMat(Sink.fold(ByteString.emptyByteString()) { a, b -> a.concat(b) },
Keep.both())
.run(system)
```
But this basically kills the streaming flow into Apache Tika, as you are just
evaluate the entire stream into a single large `String` where as my aim is to
make sure that both flows in `BroadCast` are streaming based.
GitHub link: https://github.com/apache/pekko/discussions/1807
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]