Hi Yoni, Sorry for the late response! I checked the issue and it is indeed a bug, I have created a ticket(https://issues.apache.org/jira/browse/FLINK-30477) and open a pr to fix it, the reproduced case was added in AsyncWaitOperatorTest#testProcessingTimeWithTimeoutFunctionUnorderedWithRetry & testProcessingTimeWithTimeoutFunctionOrderedWithRetry. You can wait for the fix after merge or try the patch if urgently needed.
Thanks again for reporting this! Best, Lincoln Lee Yoni Gibbs <yonigi...@hotmail.com> 于2022年12月15日周四 18:10写道: > Hi Lincoln, > > Thanks very much for the reply! The issue seems to occur both in local > development in the IDE, and when running in a Flink cluster. Below is the > full Java code to replicate the issue. I generated an empty project > following the instructions at > https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/datastream/, > namely I ran this: > > $ mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ > -DarchetypeArtifactId=flink-walkthrough-datastream-java \ > -DarchetypeVersion=1.16.0 \ -DgroupId=timeoutretry \ > -DartifactId=timeoutretry \ -Dversion=0.1 \ -Dpackage=timeoutretry \ > -DinteractiveMode=false > > > I then deleted the two generated files and created one called > TimeoutRetry.java, with the code below. Here I've created a dummy source > that simply emits one value, for simplicity. (Note that I first came > across the issue when working with a Kinesis source, but in order to rule > Kinesis out of the equation I created the dummy source instead.) Then I > added an async function which I've hard-coded to wait 500ms then fail. > > package timeoutretry; > > import org.apache.flink.streaming.api.datastream.AsyncDataStream; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.async.ResultFuture; > import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; > import org.apache.flink.streaming.api.functions.source.RichSourceFunction; > import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies; > import org.apache.flink.streaming.util.retryable.RetryPredicates; > > import java.time.LocalDateTime; > import java.time.format.DateTimeFormatter; > import java.util.Collections; > import java.util.concurrent.CompletableFuture; > import java.util.concurrent.TimeUnit; > > public class TimeoutRetry { > private static void log(String message) { > > System.out.println(LocalDateTime.now().format(DateTimeFormatter.ISO_TIME) + " > :: " + message); > } > > public static class SingleValueSource extends RichSourceFunction<String> { > private volatile boolean cancelled = false; > private volatile boolean alreadySentValue = false; > > @Override > public void run(SourceContext<String> ctx) { > while (!cancelled) { > synchronized (ctx.getCheckpointLock()) { > if (!alreadySentValue) { > ctx.collect("foo"); > alreadySentValue = true; > } > } > } > } > > @Override > public void cancel() { > cancelled = true; > } > } > > public static class ExampleRichAsyncFunction extends > RichAsyncFunction<String, String> { > > @Override > public void asyncInvoke(String input, ResultFuture<String> > resultFuture) { > log("Received " + input); > > // resultFuture.completeExceptionally(new Exception("Dummy > error")); > // The line above gets expected output: > // 09:39:50.668851 :: Received foo > // 09:39:52.671624 :: Received foo > // 09:39:54.671417 :: Timed out handling fooo > > CompletableFuture.runAsync(() -> { > try { > Thread.sleep(500); > } catch (InterruptedException e) { > throw new RuntimeException(e); > } > resultFuture.completeExceptionally(new Exception("Dummy > error")); > }); > // The block above gets unexpected output: > // 09:57:01.574928 :: Received foo > // 09:57:04.084659 :: Received foo > // 09:57:05.581016 :: Timed out handling foo > // 09:57:06.590309 :: Received foo > // 09:57:09.099132 :: Received foo > // 09:57:11.605754 :: Received foo > // 09:57:14.114028 :: Received foo > // This will keep going for as long as the number of maxAttempts > set for the > // AsyncRetryStrategies.FixedDelayRetryStrategyBuilder. > } > > @Override > public void timeout(String input, ResultFuture<String> resultFuture) { > log("Timed out handling " + input); > resultFuture.complete(Collections.emptyList()); > } > } > > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > DataStream<String> source = env > .addSource(new SingleValueSource()) > .name("single-value"); > > AsyncDataStream > .unorderedWaitWithRetry( > source, > new ExampleRichAsyncFunction(), > 4, > TimeUnit.SECONDS, > 100, > new > AsyncRetryStrategies.FixedDelayRetryStrategyBuilder<String>(10, 2000) > > .ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE) > .build() > ) > .print(); > > env.execute("Timeout Retry Example"); > } > } > > > I hope that gives you the information you need? Let me know if there's > anything else I can to help troubleshoot. I'm still pretty new to Flink, so > I might well just have misunderstood something or set something up > incorrectly. > > Thanks in advance, > > Yoni. > ------------------------------ > *From:* Lincoln Lee <lincoln.8...@gmail.com> > *Sent:* 14 December 2022 13:51 > *To:* Yoni Gibbs <yonigi...@hotmail.com> > *Cc:* user@flink.apache.org <user@flink.apache.org> > *Subject:* Re: AsyncDataStream: Retries keep executing after timeout > > hi, > Is this case running like a it case locally, or a streaming job running > on a cluster? If it's the former, one thing I can think of is local testing > using bounded datasource(has few test records) that will end input very > fastly and then trigger the endOfInput logic of AsyncWaitOperator, that is > it finishes all in fight delayed retry items immediately(asyncInvoke will > be called as the last attempt before the operator exits and as the final > result, regardless of whether it has timed out or not), this may be one > more attempt than when the job does not end in normal running. > For a long running job, the retry will start from stratch when job > recover from restart(regardless of how many times it has been retried > before), this may also result more attempts and longer time for retry > elements. > If you can provide more information about the test, maybe we can > further clarify what the problem is. > > Best, > Lincoln Lee > > > Yoni Gibbs <yonigi...@hotmail.com> 于2022年12月13日周二 23:46写道: > > Hi, > > I've got a Kinesis consumer which reacts to each record by doing some > async work using an implementation of RichAsyncFunction. I'm adding a > retry strategy. After x failed attempts I want this to time out and give up > by returning no data (i.e. not be treated as a failure). > > Here is a cut down version of my code, which works as expected (in Kotlin, > I hope that's OK - can supply Java translation if required): > > val targetStream = AsyncDataStream > .unorderedWaitWithRetry( > inputStream, > object : RichAsyncFunction<String, String>() { > override fun asyncInvoke(input: String, resultFuture: > ResultFuture<String>) { > println("Received input: $input") > resultFuture.completeExceptionally(Exception("Error from > inside CompletableFuture")) > } > > override fun timeout(input: String, resultFuture: > ResultFuture<String>) { > println("Timeout") > resultFuture.complete(listOf()) > } > }, > 4, > TimeUnit.SECONDS, > 100, > AsyncRetryStrategies.FixedDelayRetryStrategyBuilder<String>(5, 2_000) > .ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE) > .build() > ) > > This will time out after 4 seconds, and the retry strategy is set to retry > every two seconds. If I run that I get the output I expect, namely: > > Received input: foo > Received input: foo > Timeout > > Importantly, I see that asyncInvoke is only called twice, because by the > time the third invocation is due to occur, the timeout has already kicked > in and marked this record as handled. > > However the above is clearly unrealistic as it calls > resultFuture.completeExceptionally immediately rather than asynchronously > after some work as taken place. So now I replace the asyncInvoke > implementation > above with the following: > > override fun asyncInvoke(input: String, resultFuture: ResultFuture<String>) { > println("Received input: $input") > CompletableFuture.supplyAsync { > Thread.sleep(500) > resultFuture.completeExceptionally(Exception("Error from inside > CompletableFuture")) > } > } > > Now I get output which I don't expect, which shows that after the timeout, > asyncInvoke continues to be called a few more times. > > That seems wrong to me: shouldn't it stop being called because timeout has > already been invoked and it called resultFuture.complete()? > > I might well just be misunderstanding something here. > > Thanks in advance, > > Yoni. > >