hi, Is this case running like a it case locally, or a streaming job running on a cluster? If it's the former, one thing I can think of is local testing using bounded datasource(has few test records) that will end input very fastly and then trigger the endOfInput logic of AsyncWaitOperator, that is it finishes all in fight delayed retry items immediately(asyncInvoke will be called as the last attempt before the operator exits and as the final result, regardless of whether it has timed out or not), this may be one more attempt than when the job does not end in normal running. For a long running job, the retry will start from stratch when job recover from restart(regardless of how many times it has been retried before), this may also result more attempts and longer time for retry elements. If you can provide more information about the test, maybe we can further clarify what the problem is.
Best, Lincoln Lee Yoni Gibbs <yonigi...@hotmail.com> 于2022年12月13日周二 23:46写道: > Hi, > > I've got a Kinesis consumer which reacts to each record by doing some > async work using an implementation of RichAsyncFunction. I'm adding a > retry strategy. After x failed attempts I want this to time out and give up > by returning no data (i.e. not be treated as a failure). > > Here is a cut down version of my code, which works as expected (in Kotlin, > I hope that's OK - can supply Java translation if required): > > val targetStream = AsyncDataStream > .unorderedWaitWithRetry( > inputStream, > object : RichAsyncFunction<String, String>() { > override fun asyncInvoke(input: String, resultFuture: > ResultFuture<String>) { > println("Received input: $input") > resultFuture.completeExceptionally(Exception("Error from > inside CompletableFuture")) > } > > override fun timeout(input: String, resultFuture: > ResultFuture<String>) { > println("Timeout") > resultFuture.complete(listOf()) > } > }, > 4, > TimeUnit.SECONDS, > 100, > AsyncRetryStrategies.FixedDelayRetryStrategyBuilder<String>(5, 2_000) > .ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE) > .build() > ) > > This will time out after 4 seconds, and the retry strategy is set to retry > every two seconds. If I run that I get the output I expect, namely: > > Received input: foo > Received input: foo > Timeout > > Importantly, I see that asyncInvoke is only called twice, because by the > time the third invocation is due to occur, the timeout has already kicked > in and marked this record as handled. > > However the above is clearly unrealistic as it calls > resultFuture.completeExceptionally immediately rather than asynchronously > after some work as taken place. So now I replace the asyncInvoke > implementation > above with the following: > > override fun asyncInvoke(input: String, resultFuture: ResultFuture<String>) { > println("Received input: $input") > CompletableFuture.supplyAsync { > Thread.sleep(500) > resultFuture.completeExceptionally(Exception("Error from inside > CompletableFuture")) > } > } > > Now I get output which I don't expect, which shows that after the timeout, > asyncInvoke continues to be called a few more times. > > That seems wrong to me: shouldn't it stop being called because timeout has > already been invoked and it called resultFuture.complete()? > > I might well just be misunderstanding something here. > > Thanks in advance, > > Yoni. >