Hi everyone,

   By comparing the two internal implementations of delayed retries, we
prefer the timer-based solution, which obtains precise delay control
through simple logic and only needs to pay (what we consider to be
acceptable) timer instance cost for the retry element.  The FLIP[1] doc has
been updated.

[1]:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963

Best,
Lincoln Lee


Lincoln Lee <lincoln.8...@gmail.com> 于2022年5月16日周一 15:09写道:

> Hi Jinsong,
>
> Good question!
>
> The delayQueue is very similar to incompleteElements in
> UnorderedStreamElementQueue, it only records the references of in-flight
> retry elements, the core value is for the ease of a fast scan when force
> flush during endInput and less refactor for existing logic.
>
> Users needn't configure a new capacity for the delayQueue, just turn the
> original one up (if needed).
> And separately store the input data and retry state is mainly to implement
> backwards compatibility. The first version of Poc, I used a single combined
> state in order to reduce state costs, but hard to keep compatibility, and
> changed  into two via Yun Gao's concern about the compatibility.
>
> Best,
> Lincoln Lee
>
>
> Jingsong Li <jingsongl...@gmail.com> 于2022年5月16日周一 14:48写道:
>
>> Thanks  Lincoln for your reply.
>>
>> I'm a little confused about the relationship between Ordered/Unordered
>> Queue and DelayQueue. Why do we need to have a DelayQueue?
>> Can we remove the DelayQueue and put the state of the retry in the
>> StreamRecordQueueEntry (seems like it's already in the FLIP)
>> The advantages of doing this are:
>> 1. twice less data is stored in state
>> 2. the concept is unified, the user only needs to configure one queue
>> capacity
>>
>> Best,
>> Jingsong
>>
>> On Mon, May 16, 2022 at 12:10 PM Lincoln Lee <lincoln.8...@gmail.com>
>> wrote:
>>
>> > Hi Jinsong,
>> > Thanks for your feedback! Let me try to answer the two questions:
>> >
>> > For q1: Motivation
>> > Yes, users can implement retries themselves based on the external  async
>> > client, but this requires each user to do similar things, and if we can
>> > support retries uniformly, user code would become much simpler.
>> >
>> > > The real external call should happen in the asynchronous thread.
>> > My question is: If the user makes a retry in this asynchronous thread by
>> > themselves, is there a difference between this and the current FLIP's?
>> >
>> >
>> > For q2: Block Main Thread
>> > You're right, the queue data will be stored in the ListState which is an
>> > OperateState, though in fact, for ListState storage, the theoretical
>> upper
>> > limit is Integer.MAX_VALUE, but we can't increase the queue capacity too
>> > big in production because the risk of OOM increases when the queue
>> capacity
>> > grows, and increases the task parallelism maybe a more viable way when
>> > encounter too many retry items for a single task.
>> > We recommend using a proper estimate of queue capacity based on the
>> formula
>> > like this: 'inputRate * retryRate * avgRetryDuration', and also the
>> actual
>> > checkpoint duration in runtime.
>> >
>> > > If I understand correctly, the retry queue will be put into ListState,
>> > this
>> > state is OperatorState? As far as I know, OperatorState does not have
>> the
>> > ability to store a lot of data.
>> > So after we need to retry more data, we should need to block the main
>> > thread? What is the maximum size of the default retry queue?
>> >
>> >
>> >
>> > Best,
>> > Lincoln Lee
>> >
>> >
>> > Jingsong Li <jingsongl...@gmail.com> 于2022年5月16日周一 10:31写道:
>> >
>> > > Thank Lincoln for the proposal.
>> > >
>> > > ## Motivation:
>> > >
>> > > > asyncInvoke and callback functions are executed synchronously by the
>> > main
>> > > thread, which is not suitable adding long time blocking operations,
>> and
>> > > introducing additional thread will bring extra complexity for users
>> > >
>> > > According to the documentation of AsyncFunction:
>> > >
>> > > > For each #asyncInvoke, an async io operation can be triggered, and
>> once
>> > > it has been done, the result can be collected by calling {@link
>> > > ResultFuture#complete}. For each async operation, its context is
>> stored
>> > in
>> > > the operator immediately after invoking #asyncInvoke, avoiding
>> blocking
>> > for
>> > > each stream input as long as the internal buffer is not full.
>> > >
>> > > The real external call should happen in the asynchronous thread.
>> > >
>> > > My question is: If the user makes a retry in this asynchronous thread
>> by
>> > > themselves, is there a difference between this and the current FLIP's?
>> > >
>> > > ## Block Main Thread
>> > >
>> > > If I understand correctly, the retry queue will be put into ListState,
>> > this
>> > > state is OperatorState? As far as I know, OperatorState does not have
>> the
>> > > ability to store a lot of data.
>> > > So after we need to retry more data, we should need to block the main
>> > > thread? What is the maximum size of the default retry queue?
>> > >
>> > > Best,
>> > > Jingsong
>> > >
>> > > On Thu, May 12, 2022 at 8:56 PM Lincoln Lee <lincoln.8...@gmail.com>
>> > > wrote:
>> > >
>> > > > Dear Flink developers,
>> > > >
>> > > > I would like to open a discussion on FLIP 232 [1],  for an
>> extension of
>> > > > AsyncWaitOperator to support retry for user's asyncFunction.
>> > > >
>> > > > To do so, new user interface will added to define the trigger
>> condition
>> > > for
>> > > > retry and when should retry. Internally, a delayed retry mechanism
>> will
>> > > be
>> > > > introduced.
>> > > >
>> > > > There's PoC for this FLIP [2][3], thanks Yun Gao for offline
>> > discussions
>> > > > and valuable comments.
>> > > > The new feature is backwards compatible that can recover from state
>> > which
>> > > > was generated by prior flink versions, and if no retry strategy
>> enabled
>> > > the
>> > > > behavior is as before.
>> > > >
>> > > > [1]
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963
>> > > > [2] based on timer trigger
>> > > > https://github.com/lincoln-lil/flink/pull/new/async-retry-timer
>> > > > [3] based on DelayQueue with pull fashion
>> > > > https://github.com/lincoln-lil/flink/pull/new/async-op-retry
>> > > >
>> > > >
>> > > > Best,
>> > > > Lincoln Lee
>> > > >
>> > >
>> >
>>
>

Reply via email to