Hi Yoni,

Sorry for the late response! I checked the issue and it is indeed a bug, I
have created a ticket(https://issues.apache.org/jira/browse/FLINK-30477)
and open a pr  to fix it, the reproduced case was added
in AsyncWaitOperatorTest#testProcessingTimeWithTimeoutFunctionUnorderedWithRetry
& testProcessingTimeWithTimeoutFunctionOrderedWithRetry. You can wait for
the fix after merge or try the patch if urgently needed.

Thanks again for reporting this!

Best,
Lincoln Lee


Yoni Gibbs <yonigi...@hotmail.com> 于2022年12月15日周四 18:10写道:

> Hi Lincoln,
>
> Thanks very much for the reply! The issue seems to occur both in local
> development in the IDE, and when running in a Flink cluster. Below is the
> full Java code to replicate the issue. I generated an empty project
> following the instructions at
> https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/datastream/,
> namely I ran this:
>
> $ mvn archetype:generate \    -DarchetypeGroupId=org.apache.flink \    
> -DarchetypeArtifactId=flink-walkthrough-datastream-java \    
> -DarchetypeVersion=1.16.0 \    -DgroupId=timeoutretry \    
> -DartifactId=timeoutretry \    -Dversion=0.1 \    -Dpackage=timeoutretry \    
> -DinteractiveMode=false
>
>
> I then deleted the two generated files and created one called
> TimeoutRetry.java, with the code below. Here I've created a dummy source
> that simply emits one value, for simplicity. (Note that I first came
> across the issue when working with a Kinesis source, but in order to rule
> Kinesis out of the equation I created the dummy source instead.) Then I
> added an async function which I've hard-coded to wait 500ms then fail.
>
> package timeoutretry;
>
> import org.apache.flink.streaming.api.datastream.AsyncDataStream;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.async.ResultFuture;
> import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
> import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
> import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies;
> import org.apache.flink.streaming.util.retryable.RetryPredicates;
>
> import java.time.LocalDateTime;
> import java.time.format.DateTimeFormatter;
> import java.util.Collections;
> import java.util.concurrent.CompletableFuture;
> import java.util.concurrent.TimeUnit;
>
> public class TimeoutRetry {
>     private static void log(String message) {
>         
> System.out.println(LocalDateTime.now().format(DateTimeFormatter.ISO_TIME) + " 
> :: " + message);
>     }
>
>     public static class SingleValueSource extends RichSourceFunction<String> {
>         private volatile boolean cancelled = false;
>         private volatile boolean alreadySentValue = false;
>
>         @Override
>         public void run(SourceContext<String> ctx) {
>             while (!cancelled) {
>                 synchronized (ctx.getCheckpointLock()) {
>                     if (!alreadySentValue) {
>                         ctx.collect("foo");
>                         alreadySentValue = true;
>                     }
>                 }
>             }
>         }
>
>         @Override
>         public void cancel() {
>             cancelled = true;
>         }
>     }
>
>     public static class ExampleRichAsyncFunction extends 
> RichAsyncFunction<String, String> {
>
>         @Override
>         public void asyncInvoke(String input, ResultFuture<String> 
> resultFuture) {
>             log("Received " + input);
>
>             // resultFuture.completeExceptionally(new Exception("Dummy 
> error"));
>             // The line above gets expected output:
>             //   09:39:50.668851 :: Received foo
>             //   09:39:52.671624 :: Received foo
>             //   09:39:54.671417 :: Timed out handling fooo
>
>             CompletableFuture.runAsync(() -> {
>                 try {
>                     Thread.sleep(500);
>                 } catch (InterruptedException e) {
>                     throw new RuntimeException(e);
>                 }
>                 resultFuture.completeExceptionally(new Exception("Dummy 
> error"));
>             });
>             // The block above gets unexpected output:
>             //    09:57:01.574928 :: Received foo
>             //    09:57:04.084659 :: Received foo
>             //    09:57:05.581016 :: Timed out handling foo
>             //    09:57:06.590309 :: Received foo
>             //    09:57:09.099132 :: Received foo
>             //    09:57:11.605754 :: Received foo
>             //    09:57:14.114028 :: Received foo
>             // This will keep going for as long as the number of maxAttempts 
> set for the
>             // AsyncRetryStrategies.FixedDelayRetryStrategyBuilder.
>         }
>
>         @Override
>         public void timeout(String input, ResultFuture<String> resultFuture) {
>             log("Timed out handling " + input);
>             resultFuture.complete(Collections.emptyList());
>         }
>     }
>
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>         DataStream<String> source = env
>                 .addSource(new SingleValueSource())
>                 .name("single-value");
>
>         AsyncDataStream
>                 .unorderedWaitWithRetry(
>                         source,
>                         new ExampleRichAsyncFunction(),
>                         4,
>                         TimeUnit.SECONDS,
>                         100,
>                         new 
> AsyncRetryStrategies.FixedDelayRetryStrategyBuilder<String>(10, 2000)
>                                 
> .ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
>                                 .build()
>                 )
>                 .print();
>
>         env.execute("Timeout Retry Example");
>     }
> }
>
>
> I hope that gives you the information you need? Let me know if there's
> anything else I can to help troubleshoot. I'm still pretty new to Flink, so
> I might well just have misunderstood something or set something up
> incorrectly.
>
> Thanks in advance,
>
> Yoni.
> ------------------------------
> *From:* Lincoln Lee <lincoln.8...@gmail.com>
> *Sent:* 14 December 2022 13:51
> *To:* Yoni Gibbs <yonigi...@hotmail.com>
> *Cc:* user@flink.apache.org <user@flink.apache.org>
> *Subject:* Re: AsyncDataStream: Retries keep executing after timeout
>
> hi,
>    Is this case running like a it case locally, or a streaming job running
> on a cluster? If it's the former, one thing I can think of is local testing
> using bounded datasource(has few test records) that will end input very
> fastly and then trigger the endOfInput logic of AsyncWaitOperator, that is
> it finishes all in fight delayed retry items immediately(asyncInvoke will
> be called as the last attempt before the operator exits and as the final
> result, regardless of whether it has timed out or not), this may be one
> more attempt than when the job does not end in normal running.
>    For a long running job, the retry will start from stratch when job
> recover from restart(regardless of how many times it has been retried
> before), this may also result more attempts and longer time for retry
> elements.
>    If you can provide more information about the test, maybe we can
> further clarify what the problem is.
>
> Best,
> Lincoln Lee
>
>
> Yoni Gibbs <yonigi...@hotmail.com> 于2022年12月13日周二 23:46写道:
>
> Hi,
>
> I've got a Kinesis consumer which reacts to each record by doing some
> async work using an implementation of RichAsyncFunction. I'm adding a
> retry strategy. After x failed attempts I want this to time out and give up
> by returning no data (i.e. not be treated as a failure).
>
> Here is a cut down version of my code, which works as expected (in Kotlin,
> I hope that's OK - can supply Java translation if required):
>
> val targetStream = AsyncDataStream
>     .unorderedWaitWithRetry(
>         inputStream,
>         object : RichAsyncFunction<String, String>() {
>             override fun asyncInvoke(input: String, resultFuture: 
> ResultFuture<String>) {
>                 println("Received input: $input")
>                 resultFuture.completeExceptionally(Exception("Error from 
> inside CompletableFuture"))
>             }
>
>             override fun timeout(input: String, resultFuture: 
> ResultFuture<String>) {
>                 println("Timeout")
>                 resultFuture.complete(listOf())
>             }
>         },
>         4,
>         TimeUnit.SECONDS,
>         100,
>         AsyncRetryStrategies.FixedDelayRetryStrategyBuilder<String>(5, 2_000)
>             .ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
>             .build()
>     )
>
> This will time out after 4 seconds, and the retry strategy is set to retry
> every two seconds. If I run that I get the output I expect, namely:
>
> Received input: foo
> Received input: foo
> Timeout
>
> Importantly, I see that asyncInvoke is only called twice, because by the
> time the third invocation is due to occur, the timeout has already kicked
> in and marked this record as handled.
>
> However the above is clearly unrealistic as it calls
> resultFuture.completeExceptionally immediately rather than asynchronously
> after some work as taken place. So now I replace the asyncInvoke 
> implementation
> above with the following:
>
> override fun asyncInvoke(input: String, resultFuture: ResultFuture<String>) {
>     println("Received input: $input")
>     CompletableFuture.supplyAsync {
>         Thread.sleep(500)
>         resultFuture.completeExceptionally(Exception("Error from inside 
> CompletableFuture"))
>     }
> }
>
> Now I get output which I don't expect, which shows that after the timeout,
> asyncInvoke continues to be called a few more times.
>
> That seems wrong to me: shouldn't it stop being called because timeout has
> already been invoked and it called resultFuture.complete()?
>
> I might well just be misunderstanding something here.
>
> Thanks in advance,
>
> Yoni.
>
>

Reply via email to