Thanks for the feedback @Till. Yes I agree as well that opening up or changing the AsyncWaitOperator doesn't seem to be a necessity here. I think making "AsyncFunctionBase", making the current AsyncFunction as a extension of it with a some of the default behaviors like Shuyi suggested seems to be a good starting point. To some extend we can also provide some of these strategies discussed as default building blocks but I am not sure this is a must once we have the "AsyncFunctionBase".
I would try to create a POC for the change and gather some feedbacks and see if the abstract class contains too much or too little flexibilities. Best, Rong On Tue, Mar 19, 2019 at 10:32 AM Till Rohrmann <trohrm...@apache.org> wrote: > Sorry for joining the discussion so late. I agree that we could add some > more syntactic sugar for handling failure cases. Looking at the existing > interfaces, I think it should be fairly easy to create an abstract class > AsyncFunctionWithRetry which extends AsyncFunction and encapsulates the > retry logic for asynchronous operations. I think it is not strictly > necessary to change the AsyncWaitOperator to add this functionality. > > Cheers, > Till > > On Wed, Mar 13, 2019 at 5:42 PM Rong Rong <walter...@gmail.com> wrote: > >> Thanks for raising the concern @shuyi and the explanation @konstantin. >> >> Upon glancing on the Flink document, it seems like user have full control >> on the timeout behavior [1]. But unlike AsyncWaitOperator, it is not >> straightforward to access the internal state of the operator to, for >> example, put the message back to the async buffer with a retry tag. Thus, I >> also think that giving a set of common timeout handling seems to be a good >> idea for Flink users and this could be very useful feature. >> >> Regarding the questions and concerns >> 1. should the "retry counter" to be reset or to continue where it left >> off? >> - This is definitely a good point as this counter might need to go into >> the operator state if we decided to carry over the retry counter. >> Functionality-wise I think this should be reset because it doesn't >> represent the same transient state at the time of failure once restart. >> >> 2. When should AsyncStream.orderedWait() skip a record? >> - This should be configurable by user I am assuming, for example we can >> have additional properties for each strategy described by @shuyi like a >> combination of: >> - (RETRY_STRATEGY, MAX_RETRY_COUNT, RETRY_FAILURE_POLICY) >> >> I've also created a JIRA ticket [2] for the discussion, please feel free >> to share your thoughts and comments. >> >> -- >> Rong >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html#timeout-handling >> [2] https://issues.apache.org/jira/browse/FLINK-11909 >> >> >> >> On Tue, Mar 12, 2019 at 6:29 AM Konstantin Knauf < >> konstan...@ververica.com> wrote: >> >>> Hi Shuyi, >>> >>> I am not sure. You could handle retries in the user code within >>> org.apache.flink.streaming.api.functions.async.AsyncFunction#asyncInvoke >>> without using a DLQ as described in my original answer to William. On the >>> other hand, I agree that it could easier for the user and it is indeed a >>> common scenario. >>> >>> Two follow up questions come to mind: >>> >>> - When a Flink job fails and restarts, would you expect the "retry >>> counter" to be reset or to continue where it left off? >>> - With AsyncStream.orderedWait() when would you expect a record to >>> be skipped? After the final timeout, after the first timeout? >>> >>> Would you like to create a JIRA ticket [1] for this improvement with >>> answers to the questions above and we can continue to discuss it there. >>> >>> Best, >>> >>> Konstantin >>> >>> [1] >>> https://issues.apache.org/jira/projects/FLINK/issues/FLINK-11835?filter=allopenissues >>> >>> >>> On Sun, Mar 10, 2019 at 9:20 AM Shuyi Chen <suez1...@gmail.com> wrote: >>> >>>> Hi Konstantin, >>>> >>>> (cc Till since he owns the code) >>>> >>>> For async-IO, IO failure and retry is a common & expected pattern. In >>>> most of the use cases, users will need to deal with IO failure and retry. >>>> Therefore, I think it's better to address the problem in Flink rather than >>>> user implementing its custom logic in user code for a better dev >>>> experience. We do have similar problem in many of our use cases. To enable >>>> backoff and retry, we need to put the failed message to a DLQ (another >>>> Kafka topic) and re-ingest the message from the DLQ topic to retry, which >>>> is manual/cumbersome and require setting up extra Kafka topic. >>>> >>>> Can we add multiple strategies to handle async IO failure in the >>>> AsyncWaitOperator? I propose the following strategies: >>>> >>>> >>>> - FAIL_OPERATOR (default & current behavior) >>>> - FIX_INTERVAL_RETRY (retry with configurable fixed interval up to >>>> N times) >>>> - EXP_BACKOFF_RETRY (retry with exponential backoff up to N times) >>>> >>>> What do you guys think? Thanks a lot. >>>> >>>> Shuyi >>>> >>>> On Fri, Mar 8, 2019 at 3:17 PM Konstantin Knauf < >>>> konstan...@ververica.com> wrote: >>>> >>>>> Hi William, >>>>> >>>>> the AsyncOperator does not have such a setting. It is "merely" a >>>>> wrapper around an asynchronous call, which provides integration with >>>>> Flink's state & time management. >>>>> >>>>> I think, the way to go would be to do the exponential back-off in the >>>>> user code and set the timeout of the AsyncOperator to the sum of the >>>>> timeouts in the user code (e.g. 2s + 4s + 8s + 16s). >>>>> >>>>> Cheers, >>>>> >>>>> Konstantin >>>>> >>>>> >>>>> On Thu, Mar 7, 2019 at 5:20 PM William Saar <will...@saar.se> wrote: >>>>> >>>>>> Hi, >>>>>> Is there a way to specify an exponential backoff strategy for when >>>>>> async function calls fail? >>>>>> >>>>>> I have an async function that does web requests to a rate-limited >>>>>> API. Can you handle that with settings on the async function call? >>>>>> >>>>>> Thanks, >>>>>> William >>>>>> >>>>>> >>>>>> >>>>> >>>>> -- >>>>> >>>>> Konstantin Knauf | Solutions Architect >>>>> >>>>> +49 160 91394525 >>>>> >>>>> <https://www.ververica.com/> >>>>> >>>>> Follow us @VervericaData >>>>> >>>>> -- >>>>> >>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>>> Conference >>>>> >>>>> Stream Processing | Event Driven | Real Time >>>>> >>>>> -- >>>>> >>>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>>> >>>>> -- >>>>> Data Artisans GmbH >>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen >>>>> >>>> >>> >>> -- >>> >>> Konstantin Knauf | Solutions Architect >>> >>> +49 160 91394525 >>> >>> <https://www.ververica.com/> >>> >>> Follow us @VervericaData >>> >>> -- >>> >>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>> Conference >>> >>> Stream Processing | Event Driven | Real Time >>> >>> -- >>> >>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>> >>> -- >>> Data Artisans GmbH >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen >>> >>