Hi,

I've got a Kinesis consumer which reacts to each record by doing some async 
work using an implementation of RichAsyncFunction. I'm adding a retry strategy. 
After x failed attempts I want this to time out and give up by returning no 
data (i.e. not be treated as a failure).

Here is a cut down version of my code, which works as expected (in Kotlin, I 
hope that's OK - can supply Java translation if required):

val targetStream = AsyncDataStream
    .unorderedWaitWithRetry(
        inputStream,
        object : RichAsyncFunction<String, String>() {
            override fun asyncInvoke(input: String, resultFuture: 
ResultFuture<String>) {
                println("Received input: $input")
                resultFuture.completeExceptionally(Exception("Error from inside 
CompletableFuture"))
            }

            override fun timeout(input: String, resultFuture: 
ResultFuture<String>) {
                println("Timeout")
                resultFuture.complete(listOf())
            }
        },
        4,
        TimeUnit.SECONDS,
        100,
        AsyncRetryStrategies.FixedDelayRetryStrategyBuilder<String>(5, 2_000)
            .ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
            .build()
    )

This will time out after 4 seconds, and the retry strategy is set to retry 
every two seconds. If I run that I get the output I expect, namely:

Received input: foo
Received input: foo
Timeout

Importantly, I see that asyncInvoke is only called twice, because by the time 
the third invocation is due to occur, the timeout has already kicked in and 
marked this record as handled.

However the above is clearly unrealistic as it calls 
resultFuture.completeExceptionally immediately rather than asynchronously after 
some work as taken place. So now I replace the asyncInvoke implementation above 
with the following:


override fun asyncInvoke(input: String, resultFuture: ResultFuture<String>) {
    println("Received input: $input")
    CompletableFuture.supplyAsync {
        Thread.sleep(500)
        resultFuture.completeExceptionally(Exception("Error from inside 
CompletableFuture"))
    }
}

Now I get output which I don't expect, which shows that after the timeout, 
asyncInvoke continues to be called a few more times.

That seems wrong to me: shouldn't it stop being called because timeout has 
already been invoked and it called resultFuture.complete()?

I might well just be misunderstanding something here.

Thanks in advance,

Yoni.

Reply via email to