Sorry for joining the discussion so late. I agree that we could add some more syntactic sugar for handling failure cases. Looking at the existing interfaces, I think it should be fairly easy to create an abstract class AsyncFunctionWithRetry which extends AsyncFunction and encapsulates the retry logic for asynchronous operations. I think it is not strictly necessary to change the AsyncWaitOperator to add this functionality.
Cheers, Till On Wed, Mar 13, 2019 at 5:42 PM Rong Rong <walter...@gmail.com> wrote: > Thanks for raising the concern @shuyi and the explanation @konstantin. > > Upon glancing on the Flink document, it seems like user have full control > on the timeout behavior [1]. But unlike AsyncWaitOperator, it is not > straightforward to access the internal state of the operator to, for > example, put the message back to the async buffer with a retry tag. Thus, I > also think that giving a set of common timeout handling seems to be a good > idea for Flink users and this could be very useful feature. > > Regarding the questions and concerns > 1. should the "retry counter" to be reset or to continue where it left > off? > - This is definitely a good point as this counter might need to go into > the operator state if we decided to carry over the retry counter. > Functionality-wise I think this should be reset because it doesn't > represent the same transient state at the time of failure once restart. > > 2. When should AsyncStream.orderedWait() skip a record? > - This should be configurable by user I am assuming, for example we can > have additional properties for each strategy described by @shuyi like a > combination of: > - (RETRY_STRATEGY, MAX_RETRY_COUNT, RETRY_FAILURE_POLICY) > > I've also created a JIRA ticket [2] for the discussion, please feel free > to share your thoughts and comments. > > -- > Rong > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html#timeout-handling > [2] https://issues.apache.org/jira/browse/FLINK-11909 > > > > On Tue, Mar 12, 2019 at 6:29 AM Konstantin Knauf <konstan...@ververica.com> > wrote: > >> Hi Shuyi, >> >> I am not sure. You could handle retries in the user code within >> org.apache.flink.streaming.api.functions.async.AsyncFunction#asyncInvoke >> without using a DLQ as described in my original answer to William. On the >> other hand, I agree that it could easier for the user and it is indeed a >> common scenario. >> >> Two follow up questions come to mind: >> >> - When a Flink job fails and restarts, would you expect the "retry >> counter" to be reset or to continue where it left off? >> - With AsyncStream.orderedWait() when would you expect a record to be >> skipped? After the final timeout, after the first timeout? >> >> Would you like to create a JIRA ticket [1] for this improvement with >> answers to the questions above and we can continue to discuss it there. >> >> Best, >> >> Konstantin >> >> [1] >> https://issues.apache.org/jira/projects/FLINK/issues/FLINK-11835?filter=allopenissues >> >> >> On Sun, Mar 10, 2019 at 9:20 AM Shuyi Chen <suez1...@gmail.com> wrote: >> >>> Hi Konstantin, >>> >>> (cc Till since he owns the code) >>> >>> For async-IO, IO failure and retry is a common & expected pattern. In >>> most of the use cases, users will need to deal with IO failure and retry. >>> Therefore, I think it's better to address the problem in Flink rather than >>> user implementing its custom logic in user code for a better dev >>> experience. We do have similar problem in many of our use cases. To enable >>> backoff and retry, we need to put the failed message to a DLQ (another >>> Kafka topic) and re-ingest the message from the DLQ topic to retry, which >>> is manual/cumbersome and require setting up extra Kafka topic. >>> >>> Can we add multiple strategies to handle async IO failure in the >>> AsyncWaitOperator? I propose the following strategies: >>> >>> >>> - FAIL_OPERATOR (default & current behavior) >>> - FIX_INTERVAL_RETRY (retry with configurable fixed interval up to N >>> times) >>> - EXP_BACKOFF_RETRY (retry with exponential backoff up to N times) >>> >>> What do you guys think? Thanks a lot. >>> >>> Shuyi >>> >>> On Fri, Mar 8, 2019 at 3:17 PM Konstantin Knauf < >>> konstan...@ververica.com> wrote: >>> >>>> Hi William, >>>> >>>> the AsyncOperator does not have such a setting. It is "merely" a >>>> wrapper around an asynchronous call, which provides integration with >>>> Flink's state & time management. >>>> >>>> I think, the way to go would be to do the exponential back-off in the >>>> user code and set the timeout of the AsyncOperator to the sum of the >>>> timeouts in the user code (e.g. 2s + 4s + 8s + 16s). >>>> >>>> Cheers, >>>> >>>> Konstantin >>>> >>>> >>>> On Thu, Mar 7, 2019 at 5:20 PM William Saar <will...@saar.se> wrote: >>>> >>>>> Hi, >>>>> Is there a way to specify an exponential backoff strategy for when >>>>> async function calls fail? >>>>> >>>>> I have an async function that does web requests to a rate-limited API. >>>>> Can you handle that with settings on the async function call? >>>>> >>>>> Thanks, >>>>> William >>>>> >>>>> >>>>> >>>> >>>> -- >>>> >>>> Konstantin Knauf | Solutions Architect >>>> >>>> +49 160 91394525 >>>> >>>> <https://www.ververica.com/> >>>> >>>> Follow us @VervericaData >>>> >>>> -- >>>> >>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>> Conference >>>> >>>> Stream Processing | Event Driven | Real Time >>>> >>>> -- >>>> >>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>> >>>> -- >>>> Data Artisans GmbH >>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen >>>> >>> >> >> -- >> >> Konstantin Knauf | Solutions Architect >> >> +49 160 91394525 >> >> <https://www.ververica.com/> >> >> Follow us @VervericaData >> >> -- >> >> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >> Conference >> >> Stream Processing | Event Driven | Real Time >> >> -- >> >> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >> >> -- >> Data Artisans GmbH >> Registered at Amtsgericht Charlottenburg: HRB 158244 B >> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen >> >