Thanks Gen Luo!

Agree with you that prefer the simpler design.

I’d like to share my thoughts on this choice: whether store the retry state
or not only affect the recovery logic, not the per-record processing, so I
just compare the two:
1. w/ retry state:  simple recovery but lost precision
2. w/o retry state: one more state and little complexly but precise for
users
I prefer the second one for the user perspective, the additional complexity
is manageable.

One detail that not mentioned in the FLIP: we will check if any time left
 (now() - startTime > timeout) for next attempt, so the real total attempts
will always less than or equal to maxAttempts and the total cost time <=
timeout (one special case is job failover takes too long)

For the api, I've updated the FLIP[1]

[1]:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963

Best,
Lincoln Lee


Gen Luo <luogen...@gmail.com> 于2022年5月23日周一 16:54写道:

> Hi Lincoln,
>
> Thanks for the quick reply.
>
>
>
> 1. I understand when restarting a job with a savepoint, the retry state can
> ensure the total retry attempts and delay is expected. However, when
> failover happens while a job is running, the remaining attempts recorded in
> the state are actually redid, and of course the total attempts are more
> than expected. The delay is indeed one of the concerns, but I'm wondering
> whether the retry state kept here is really important to users or not. In
> my opinion its benefit is limited but it makes the change much more
> complex. I would prefer a simpler solution, in which the retry state is
> still possible to add if the need really arises in the future, but I
> respect your decision.
>
>
>
> 2. I think adding a currentAttempts parameter to the method is good enough.
>
> Lincoln Lee <lincoln.8...@gmail.com> 于 2022年5月23日周一 14:52写道:
>
> > Hi Gen Luo,
> >     Thanks a lot for your feedback!
> >
> > 1. About the retry state:
> > I considered dropping the retry state which really simplifies state
> changes
> > and avoids compatibility handling. The only reason I changed my mind was
> > that it might be lossy to the user. Elements that has been tried several
> > times but not exhausted its retry opportunities will reset the retry
> state
> > after a job failover-restart and start the retry process again (if the
> > retry condition persists true), which may cause a greater delay for the
> > retried elements, actually retrying more times and for longer than
> > expected. (Although in the PoC may also have a special case when
> > recovering: if the remaining timeout is exhausted for the recalculation,
> it
> > will execute immediately but will have to register a timeout timer for
> the
> > async, here using an extra backoffTimeMillis)
> > For example, '60s fixed-delay retry if empty result, max-attempts: 5,
> > timeout 300s'
> > When checkpointing, some data has been retry 2 times, then suppose the
> job
> > is restarted and it takes 2min when the restart succeeds, if we drop the
> > retry state, the worst case will take more 240s(60s * 2 + 2min) delay for
> > users to finish retry.
> >
> > For my understanding(please correct me if I missed something), if a job
> is
> > resumed from a previous state and the retry strategy is changed, the
> > elements that need to be recovered in the retry state just needs the new
> > strategy to take over the current attempts and time that has been used,
> or
> > give up retry if no retry strategy was set.
> > > and can be more compatible when the user restart a job with a changed
> > retry strategy.
> >
> > 2.  About the interface, do you think it would be helpful if add the
> > currentAttempts into getBackoffTimeMillis()? e.g.,  long
> > getBackoffTimeMillis(int currentAttempts)
> > The existing RetryStrategy and RestartBackoffTimeStrategy were in my
> > candidate list but not exactly match, and I want to avoid creating the
> new
> > instances for every attempt in RetryStrategy.
> >
> > WDYT?
> >
> > Best,
> > Lincoln Lee
> >
> >
> > Gen Luo <luogen...@gmail.com> 于2022年5月23日周一 11:37写道:
> >
> > > Thank Lincoln for the proposal!
> > >
> > > The FLIP looks good to me. I'm in favor of the timer based
> > implementation,
> > > and I'd like to share some thoughts.
> > >
> > > I'm thinking if we have to store the retry status in the state. I
> suppose
> > > the retrying requests can just submit as the first attempt when the job
> > > restores from a checkpoint, since in fact the side effect of the
> retries
> > > can not draw back by the restoring. This makes the state simpler and
> > makes
> > > it unnecessary to do the state migration, and can be more compatible
> when
> > > the user restart a job with a changed retry strategy.
> > >
> > > Besides, I find it hard to implement a flexible backoff strategy with
> the
> > > current AsyncRetryStrategy interface, for example an
> > > ExponentialBackoffRetryStrategy. Maybe we can add a parameter of the
> > > attempt or just use the org.apache.flink.util.concurrent.RetryStrategy
> to
> > > take the place of the retry strategy part in the AsyncRetryStrategy?
> > >
> > > Lincoln Lee <lincoln.8...@gmail.com> 于 2022年5月20日周五 14:24写道:
> > >
> > > > Hi everyone,
> > > >
> > > >    By comparing the two internal implementations of delayed retries,
> we
> > > > prefer the timer-based solution, which obtains precise delay control
> > > > through simple logic and only needs to pay (what we consider to be
> > > > acceptable) timer instance cost for the retry element.  The FLIP[1]
> doc
> > > has
> > > > been updated.
> > > >
> > > > [1]:
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963
> > > >
> > > > Best,
> > > > Lincoln Lee
> > > >
> > > >
> > > > Lincoln Lee <lincoln.8...@gmail.com> 于2022年5月16日周一 15:09写道:
> > > >
> > > > > Hi Jinsong,
> > > > >
> > > > > Good question!
> > > > >
> > > > > The delayQueue is very similar to incompleteElements in
> > > > > UnorderedStreamElementQueue, it only records the references of
> > > in-flight
> > > > > retry elements, the core value is for the ease of a fast scan when
> > > force
> > > > > flush during endInput and less refactor for existing logic.
> > > > >
> > > > > Users needn't configure a new capacity for the delayQueue, just
> turn
> > > the
> > > > > original one up (if needed).
> > > > > And separately store the input data and retry state is mainly to
> > > > implement
> > > > > backwards compatibility. The first version of Poc, I used a single
> > > > combined
> > > > > state in order to reduce state costs, but hard to keep
> compatibility,
> > > and
> > > > > changed  into two via Yun Gao's concern about the compatibility.
> > > > >
> > > > > Best,
> > > > > Lincoln Lee
> > > > >
> > > > >
> > > > > Jingsong Li <jingsongl...@gmail.com> 于2022年5月16日周一 14:48写道:
> > > > >
> > > > >> Thanks  Lincoln for your reply.
> > > > >>
> > > > >> I'm a little confused about the relationship between
> > Ordered/Unordered
> > > > >> Queue and DelayQueue. Why do we need to have a DelayQueue?
> > > > >> Can we remove the DelayQueue and put the state of the retry in the
> > > > >> StreamRecordQueueEntry (seems like it's already in the FLIP)
> > > > >> The advantages of doing this are:
> > > > >> 1. twice less data is stored in state
> > > > >> 2. the concept is unified, the user only needs to configure one
> > queue
> > > > >> capacity
> > > > >>
> > > > >> Best,
> > > > >> Jingsong
> > > > >>
> > > > >> On Mon, May 16, 2022 at 12:10 PM Lincoln Lee <
> > lincoln.8...@gmail.com>
> > > > >> wrote:
> > > > >>
> > > > >> > Hi Jinsong,
> > > > >> > Thanks for your feedback! Let me try to answer the two
> questions:
> > > > >> >
> > > > >> > For q1: Motivation
> > > > >> > Yes, users can implement retries themselves based on the
> external
> > > > async
> > > > >> > client, but this requires each user to do similar things, and if
> > we
> > > > can
> > > > >> > support retries uniformly, user code would become much simpler.
> > > > >> >
> > > > >> > > The real external call should happen in the asynchronous
> thread.
> > > > >> > My question is: If the user makes a retry in this asynchronous
> > > thread
> > > > by
> > > > >> > themselves, is there a difference between this and the current
> > > FLIP's?
> > > > >> >
> > > > >> >
> > > > >> > For q2: Block Main Thread
> > > > >> > You're right, the queue data will be stored in the ListState
> which
> > > is
> > > > an
> > > > >> > OperateState, though in fact, for ListState storage, the
> > theoretical
> > > > >> upper
> > > > >> > limit is Integer.MAX_VALUE, but we can't increase the queue
> > capacity
> > > > too
> > > > >> > big in production because the risk of OOM increases when the
> queue
> > > > >> capacity
> > > > >> > grows, and increases the task parallelism maybe a more viable
> way
> > > when
> > > > >> > encounter too many retry items for a single task.
> > > > >> > We recommend using a proper estimate of queue capacity based on
> > the
> > > > >> formula
> > > > >> > like this: 'inputRate * retryRate * avgRetryDuration', and also
> > the
> > > > >> actual
> > > > >> > checkpoint duration in runtime.
> > > > >> >
> > > > >> > > If I understand correctly, the retry queue will be put into
> > > > ListState,
> > > > >> > this
> > > > >> > state is OperatorState? As far as I know, OperatorState does not
> > > have
> > > > >> the
> > > > >> > ability to store a lot of data.
> > > > >> > So after we need to retry more data, we should need to block the
> > > main
> > > > >> > thread? What is the maximum size of the default retry queue?
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> > Best,
> > > > >> > Lincoln Lee
> > > > >> >
> > > > >> >
> > > > >> > Jingsong Li <jingsongl...@gmail.com> 于2022年5月16日周一 10:31写道:
> > > > >> >
> > > > >> > > Thank Lincoln for the proposal.
> > > > >> > >
> > > > >> > > ## Motivation:
> > > > >> > >
> > > > >> > > > asyncInvoke and callback functions are executed
> synchronously
> > by
> > > > the
> > > > >> > main
> > > > >> > > thread, which is not suitable adding long time blocking
> > > operations,
> > > > >> and
> > > > >> > > introducing additional thread will bring extra complexity for
> > > users
> > > > >> > >
> > > > >> > > According to the documentation of AsyncFunction:
> > > > >> > >
> > > > >> > > > For each #asyncInvoke, an async io operation can be
> triggered,
> > > and
> > > > >> once
> > > > >> > > it has been done, the result can be collected by calling
> {@link
> > > > >> > > ResultFuture#complete}. For each async operation, its context
> is
> > > > >> stored
> > > > >> > in
> > > > >> > > the operator immediately after invoking #asyncInvoke, avoiding
> > > > >> blocking
> > > > >> > for
> > > > >> > > each stream input as long as the internal buffer is not full.
> > > > >> > >
> > > > >> > > The real external call should happen in the asynchronous
> thread.
> > > > >> > >
> > > > >> > > My question is: If the user makes a retry in this asynchronous
> > > > thread
> > > > >> by
> > > > >> > > themselves, is there a difference between this and the current
> > > > FLIP's?
> > > > >> > >
> > > > >> > > ## Block Main Thread
> > > > >> > >
> > > > >> > > If I understand correctly, the retry queue will be put into
> > > > ListState,
> > > > >> > this
> > > > >> > > state is OperatorState? As far as I know, OperatorState does
> not
> > > > have
> > > > >> the
> > > > >> > > ability to store a lot of data.
> > > > >> > > So after we need to retry more data, we should need to block
> the
> > > > main
> > > > >> > > thread? What is the maximum size of the default retry queue?
> > > > >> > >
> > > > >> > > Best,
> > > > >> > > Jingsong
> > > > >> > >
> > > > >> > > On Thu, May 12, 2022 at 8:56 PM Lincoln Lee <
> > > lincoln.8...@gmail.com
> > > > >
> > > > >> > > wrote:
> > > > >> > >
> > > > >> > > > Dear Flink developers,
> > > > >> > > >
> > > > >> > > > I would like to open a discussion on FLIP 232 [1],  for an
> > > > >> extension of
> > > > >> > > > AsyncWaitOperator to support retry for user's asyncFunction.
> > > > >> > > >
> > > > >> > > > To do so, new user interface will added to define the
> trigger
> > > > >> condition
> > > > >> > > for
> > > > >> > > > retry and when should retry. Internally, a delayed retry
> > > mechanism
> > > > >> will
> > > > >> > > be
> > > > >> > > > introduced.
> > > > >> > > >
> > > > >> > > > There's PoC for this FLIP [2][3], thanks Yun Gao for offline
> > > > >> > discussions
> > > > >> > > > and valuable comments.
> > > > >> > > > The new feature is backwards compatible that can recover
> from
> > > > state
> > > > >> > which
> > > > >> > > > was generated by prior flink versions, and if no retry
> > strategy
> > > > >> enabled
> > > > >> > > the
> > > > >> > > > behavior is as before.
> > > > >> > > >
> > > > >> > > > [1]
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963
> > > > >> > > > [2] based on timer trigger
> > > > >> > > >
> > https://github.com/lincoln-lil/flink/pull/new/async-retry-timer
> > > > >> > > > [3] based on DelayQueue with pull fashion
> > > > >> > > >
> https://github.com/lincoln-lil/flink/pull/new/async-op-retry
> > > > >> > > >
> > > > >> > > >
> > > > >> > > > Best,
> > > > >> > > > Lincoln Lee
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > > >
> > > >
> > >
> >
>

Reply via email to