Sorry for joining the discussion so late. I agree that we could add some
more syntactic sugar for handling failure cases. Looking at the existing
interfaces, I think it should be fairly easy to create an abstract class
AsyncFunctionWithRetry which extends AsyncFunction and encapsulates the
retry logic for asynchronous operations. I think it is not strictly
necessary to change the AsyncWaitOperator to add this functionality.

Cheers,
Till

On Wed, Mar 13, 2019 at 5:42 PM Rong Rong <walter...@gmail.com> wrote:

> Thanks for raising the concern @shuyi and the explanation @konstantin.
>
> Upon glancing on the Flink document, it seems like user have full control
> on the timeout behavior [1]. But unlike AsyncWaitOperator, it is not
> straightforward to access the internal state of the operator to, for
> example, put the message back to the async buffer with a retry tag. Thus, I
> also think that giving a set of common timeout handling seems to be a good
> idea for Flink users and this could be very useful feature.
>
> Regarding the questions and concerns
> 1. should the "retry counter" to be reset or to continue where it left
> off?
> - This is definitely a good point as this counter might need to go into
> the operator state if we decided to carry over the retry counter.
> Functionality-wise I think this should be reset because it doesn't
> represent the same transient state at the time of failure once restart.
>
> 2. When should AsyncStream.orderedWait() skip a record?
> - This should be configurable by user I am assuming, for example we can
> have additional properties for each strategy described by @shuyi like a
> combination of:
>   - (RETRY_STRATEGY, MAX_RETRY_COUNT, RETRY_FAILURE_POLICY)
>
> I've also created a JIRA ticket [2] for the discussion, please feel free
> to share your thoughts and comments.
>
> --
> Rong
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html#timeout-handling
> [2] https://issues.apache.org/jira/browse/FLINK-11909
>
>
>
> On Tue, Mar 12, 2019 at 6:29 AM Konstantin Knauf <konstan...@ververica.com>
> wrote:
>
>> Hi Shuyi,
>>
>> I am not sure. You could handle retries in the user code within
>> org.apache.flink.streaming.api.functions.async.AsyncFunction#asyncInvoke
>> without using a DLQ as described in my original answer to William.  On the
>> other hand, I agree that it could easier for the user and it is indeed a
>> common scenario.
>>
>> Two follow up questions come to mind:
>>
>>    - When a Flink job fails and restarts, would you expect the "retry
>>    counter" to be reset or to continue where it left off?
>>    - With AsyncStream.orderedWait() when would you expect a record to be
>>    skipped? After the final timeout, after the first timeout?
>>
>> Would you like to create a JIRA ticket [1] for this improvement with
>> answers to the questions above and we can continue to discuss it there.
>>
>> Best,
>>
>> Konstantin
>>
>> [1]
>> https://issues.apache.org/jira/projects/FLINK/issues/FLINK-11835?filter=allopenissues
>>
>>
>> On Sun, Mar 10, 2019 at 9:20 AM Shuyi Chen <suez1...@gmail.com> wrote:
>>
>>> Hi Konstantin,
>>>
>>> (cc Till since he owns the code)
>>>
>>> For async-IO, IO failure and retry is a common & expected pattern. In
>>> most of the use cases, users will need to deal with IO failure and retry.
>>> Therefore, I think it's better to address the problem in Flink rather than
>>> user implementing its custom logic in user code for a better dev
>>> experience. We do have similar problem in many of our use cases. To enable
>>> backoff and retry, we need to put the failed message to a DLQ (another
>>> Kafka topic) and re-ingest the message from the DLQ topic to retry, which
>>> is manual/cumbersome and require setting up extra Kafka topic.
>>>
>>> Can we add multiple strategies to handle async IO failure in the
>>> AsyncWaitOperator? I propose the following strategies:
>>>
>>>
>>>    - FAIL_OPERATOR (default & current behavior)
>>>    - FIX_INTERVAL_RETRY (retry with configurable fixed interval up to N
>>>    times)
>>>    - EXP_BACKOFF_RETRY (retry with exponential backoff up to N times)
>>>
>>> What do you guys think? Thanks a lot.
>>>
>>> Shuyi
>>>
>>> On Fri, Mar 8, 2019 at 3:17 PM Konstantin Knauf <
>>> konstan...@ververica.com> wrote:
>>>
>>>> Hi William,
>>>>
>>>> the AsyncOperator does not have such a setting. It is "merely" a
>>>> wrapper around an asynchronous call, which provides integration with
>>>> Flink's state & time management.
>>>>
>>>> I think, the way to go would be to do the exponential back-off in the
>>>> user code and set the timeout of the AsyncOperator to the sum of the
>>>> timeouts in the user code (e.g. 2s + 4s + 8s + 16s).
>>>>
>>>> Cheers,
>>>>
>>>> Konstantin
>>>>
>>>>
>>>> On Thu, Mar 7, 2019 at 5:20 PM William Saar <will...@saar.se> wrote:
>>>>
>>>>> Hi,
>>>>> Is there a way to specify an exponential backoff strategy for when
>>>>> async function calls fail?
>>>>>
>>>>> I have an async function that does web requests to a rate-limited API.
>>>>> Can you handle that with settings on the async function call?
>>>>>
>>>>> Thanks,
>>>>> William
>>>>>
>>>>>
>>>>>
>>>>
>>>> --
>>>>
>>>> Konstantin Knauf | Solutions Architect
>>>>
>>>> +49 160 91394525
>>>>
>>>> <https://www.ververica.com/>
>>>>
>>>> Follow us @VervericaData
>>>>
>>>> --
>>>>
>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>> Conference
>>>>
>>>> Stream Processing | Event Driven | Real Time
>>>>
>>>> --
>>>>
>>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>
>>>> --
>>>> Data Artisans GmbH
>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>>>
>>>
>>
>> --
>>
>> Konstantin Knauf | Solutions Architect
>>
>> +49 160 91394525
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Data Artisans GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>
>

Reply via email to