Hi Shuyi, I am not sure. You could handle retries in the user code within org.apache.flink.streaming.api.functions.async.AsyncFunction#asyncInvoke without using a DLQ as described in my original answer to William. On the other hand, I agree that it could easier for the user and it is indeed a common scenario.
Two follow up questions come to mind: - When a Flink job fails and restarts, would you expect the "retry counter" to be reset or to continue where it left off? - With AsyncStream.orderedWait() when would you expect a record to be skipped? After the final timeout, after the first timeout? Would you like to create a JIRA ticket [1] for this improvement with answers to the questions above and we can continue to discuss it there. Best, Konstantin [1] https://issues.apache.org/jira/projects/FLINK/issues/FLINK-11835?filter=allopenissues On Sun, Mar 10, 2019 at 9:20 AM Shuyi Chen <suez1...@gmail.com> wrote: > Hi Konstantin, > > (cc Till since he owns the code) > > For async-IO, IO failure and retry is a common & expected pattern. In most > of the use cases, users will need to deal with IO failure and retry. > Therefore, I think it's better to address the problem in Flink rather than > user implementing its custom logic in user code for a better dev > experience. We do have similar problem in many of our use cases. To enable > backoff and retry, we need to put the failed message to a DLQ (another > Kafka topic) and re-ingest the message from the DLQ topic to retry, which > is manual/cumbersome and require setting up extra Kafka topic. > > Can we add multiple strategies to handle async IO failure in the > AsyncWaitOperator? I propose the following strategies: > > > - FAIL_OPERATOR (default & current behavior) > - FIX_INTERVAL_RETRY (retry with configurable fixed interval up to N > times) > - EXP_BACKOFF_RETRY (retry with exponential backoff up to N times) > > What do you guys think? Thanks a lot. > > Shuyi > > On Fri, Mar 8, 2019 at 3:17 PM Konstantin Knauf <konstan...@ververica.com> > wrote: > >> Hi William, >> >> the AsyncOperator does not have such a setting. It is "merely" a wrapper >> around an asynchronous call, which provides integration with Flink's state >> & time management. >> >> I think, the way to go would be to do the exponential back-off in the >> user code and set the timeout of the AsyncOperator to the sum of the >> timeouts in the user code (e.g. 2s + 4s + 8s + 16s). >> >> Cheers, >> >> Konstantin >> >> >> On Thu, Mar 7, 2019 at 5:20 PM William Saar <will...@saar.se> wrote: >> >>> Hi, >>> Is there a way to specify an exponential backoff strategy for when async >>> function calls fail? >>> >>> I have an async function that does web requests to a rate-limited API. >>> Can you handle that with settings on the async function call? >>> >>> Thanks, >>> William >>> >>> >>> >> >> -- >> >> Konstantin Knauf | Solutions Architect >> >> +49 160 91394525 >> >> <https://www.ververica.com/> >> >> Follow us @VervericaData >> >> -- >> >> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >> Conference >> >> Stream Processing | Event Driven | Real Time >> >> -- >> >> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >> >> -- >> Data Artisans GmbH >> Registered at Amtsgericht Charlottenburg: HRB 158244 B >> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen >> > -- Konstantin Knauf | Solutions Architect +49 160 91394525 <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Data Artisans GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen