Hi, I've got a Kinesis consumer which reacts to each record by doing some async work using an implementation of RichAsyncFunction. I'm adding a retry strategy. After x failed attempts I want this to time out and give up by returning no data (i.e. not be treated as a failure).
Here is a cut down version of my code, which works as expected (in Kotlin, I hope that's OK - can supply Java translation if required): val targetStream = AsyncDataStream .unorderedWaitWithRetry( inputStream, object : RichAsyncFunction<String, String>() { override fun asyncInvoke(input: String, resultFuture: ResultFuture<String>) { println("Received input: $input") resultFuture.completeExceptionally(Exception("Error from inside CompletableFuture")) } override fun timeout(input: String, resultFuture: ResultFuture<String>) { println("Timeout") resultFuture.complete(listOf()) } }, 4, TimeUnit.SECONDS, 100, AsyncRetryStrategies.FixedDelayRetryStrategyBuilder<String>(5, 2_000) .ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE) .build() ) This will time out after 4 seconds, and the retry strategy is set to retry every two seconds. If I run that I get the output I expect, namely: Received input: foo Received input: foo Timeout Importantly, I see that asyncInvoke is only called twice, because by the time the third invocation is due to occur, the timeout has already kicked in and marked this record as handled. However the above is clearly unrealistic as it calls resultFuture.completeExceptionally immediately rather than asynchronously after some work as taken place. So now I replace the asyncInvoke implementation above with the following: override fun asyncInvoke(input: String, resultFuture: ResultFuture<String>) { println("Received input: $input") CompletableFuture.supplyAsync { Thread.sleep(500) resultFuture.completeExceptionally(Exception("Error from inside CompletableFuture")) } } Now I get output which I don't expect, which shows that after the timeout, asyncInvoke continues to be called a few more times. That seems wrong to me: shouldn't it stop being called because timeout has already been invoked and it called resultFuture.complete()? I might well just be misunderstanding something here. Thanks in advance, Yoni.