Hi Gen Luo,
    Thanks a lot for your feedback!

1. About the retry state:
I considered dropping the retry state which really simplifies state changes
and avoids compatibility handling. The only reason I changed my mind was
that it might be lossy to the user. Elements that has been tried several
times but not exhausted its retry opportunities will reset the retry state
after a job failover-restart and start the retry process again (if the
retry condition persists true), which may cause a greater delay for the
retried elements, actually retrying more times and for longer than
expected. (Although in the PoC may also have a special case when
recovering: if the remaining timeout is exhausted for the recalculation, it
will execute immediately but will have to register a timeout timer for the
async, here using an extra backoffTimeMillis)
For example, '60s fixed-delay retry if empty result, max-attempts: 5,
timeout 300s'
When checkpointing, some data has been retry 2 times, then suppose the job
is restarted and it takes 2min when the restart succeeds, if we drop the
retry state, the worst case will take more 240s(60s * 2 + 2min) delay for
users to finish retry.

For my understanding(please correct me if I missed something), if a job is
resumed from a previous state and the retry strategy is changed, the
elements that need to be recovered in the retry state just needs the new
strategy to take over the current attempts and time that has been used,  or
give up retry if no retry strategy was set.
> and can be more compatible when the user restart a job with a changed
retry strategy.

2.  About the interface, do you think it would be helpful if add the
currentAttempts into getBackoffTimeMillis()? e.g.,  long
getBackoffTimeMillis(int currentAttempts)
The existing RetryStrategy and RestartBackoffTimeStrategy were in my
candidate list but not exactly match, and I want to avoid creating the new
instances for every attempt in RetryStrategy.

WDYT?

Best,
Lincoln Lee


Gen Luo <luogen...@gmail.com> 于2022年5月23日周一 11:37写道:

> Thank Lincoln for the proposal!
>
> The FLIP looks good to me. I'm in favor of the timer based implementation,
> and I'd like to share some thoughts.
>
> I'm thinking if we have to store the retry status in the state. I suppose
> the retrying requests can just submit as the first attempt when the job
> restores from a checkpoint, since in fact the side effect of the retries
> can not draw back by the restoring. This makes the state simpler and makes
> it unnecessary to do the state migration, and can be more compatible when
> the user restart a job with a changed retry strategy.
>
> Besides, I find it hard to implement a flexible backoff strategy with the
> current AsyncRetryStrategy interface, for example an
> ExponentialBackoffRetryStrategy. Maybe we can add a parameter of the
> attempt or just use the org.apache.flink.util.concurrent.RetryStrategy to
> take the place of the retry strategy part in the AsyncRetryStrategy?
>
> Lincoln Lee <lincoln.8...@gmail.com> 于 2022年5月20日周五 14:24写道:
>
> > Hi everyone,
> >
> >    By comparing the two internal implementations of delayed retries, we
> > prefer the timer-based solution, which obtains precise delay control
> > through simple logic and only needs to pay (what we consider to be
> > acceptable) timer instance cost for the retry element.  The FLIP[1] doc
> has
> > been updated.
> >
> > [1]:
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963
> >
> > Best,
> > Lincoln Lee
> >
> >
> > Lincoln Lee <lincoln.8...@gmail.com> 于2022年5月16日周一 15:09写道:
> >
> > > Hi Jinsong,
> > >
> > > Good question!
> > >
> > > The delayQueue is very similar to incompleteElements in
> > > UnorderedStreamElementQueue, it only records the references of
> in-flight
> > > retry elements, the core value is for the ease of a fast scan when
> force
> > > flush during endInput and less refactor for existing logic.
> > >
> > > Users needn't configure a new capacity for the delayQueue, just turn
> the
> > > original one up (if needed).
> > > And separately store the input data and retry state is mainly to
> > implement
> > > backwards compatibility. The first version of Poc, I used a single
> > combined
> > > state in order to reduce state costs, but hard to keep compatibility,
> and
> > > changed  into two via Yun Gao's concern about the compatibility.
> > >
> > > Best,
> > > Lincoln Lee
> > >
> > >
> > > Jingsong Li <jingsongl...@gmail.com> 于2022年5月16日周一 14:48写道:
> > >
> > >> Thanks  Lincoln for your reply.
> > >>
> > >> I'm a little confused about the relationship between Ordered/Unordered
> > >> Queue and DelayQueue. Why do we need to have a DelayQueue?
> > >> Can we remove the DelayQueue and put the state of the retry in the
> > >> StreamRecordQueueEntry (seems like it's already in the FLIP)
> > >> The advantages of doing this are:
> > >> 1. twice less data is stored in state
> > >> 2. the concept is unified, the user only needs to configure one queue
> > >> capacity
> > >>
> > >> Best,
> > >> Jingsong
> > >>
> > >> On Mon, May 16, 2022 at 12:10 PM Lincoln Lee <lincoln.8...@gmail.com>
> > >> wrote:
> > >>
> > >> > Hi Jinsong,
> > >> > Thanks for your feedback! Let me try to answer the two questions:
> > >> >
> > >> > For q1: Motivation
> > >> > Yes, users can implement retries themselves based on the external
> > async
> > >> > client, but this requires each user to do similar things, and if we
> > can
> > >> > support retries uniformly, user code would become much simpler.
> > >> >
> > >> > > The real external call should happen in the asynchronous thread.
> > >> > My question is: If the user makes a retry in this asynchronous
> thread
> > by
> > >> > themselves, is there a difference between this and the current
> FLIP's?
> > >> >
> > >> >
> > >> > For q2: Block Main Thread
> > >> > You're right, the queue data will be stored in the ListState which
> is
> > an
> > >> > OperateState, though in fact, for ListState storage, the theoretical
> > >> upper
> > >> > limit is Integer.MAX_VALUE, but we can't increase the queue capacity
> > too
> > >> > big in production because the risk of OOM increases when the queue
> > >> capacity
> > >> > grows, and increases the task parallelism maybe a more viable way
> when
> > >> > encounter too many retry items for a single task.
> > >> > We recommend using a proper estimate of queue capacity based on the
> > >> formula
> > >> > like this: 'inputRate * retryRate * avgRetryDuration', and also the
> > >> actual
> > >> > checkpoint duration in runtime.
> > >> >
> > >> > > If I understand correctly, the retry queue will be put into
> > ListState,
> > >> > this
> > >> > state is OperatorState? As far as I know, OperatorState does not
> have
> > >> the
> > >> > ability to store a lot of data.
> > >> > So after we need to retry more data, we should need to block the
> main
> > >> > thread? What is the maximum size of the default retry queue?
> > >> >
> > >> >
> > >> >
> > >> > Best,
> > >> > Lincoln Lee
> > >> >
> > >> >
> > >> > Jingsong Li <jingsongl...@gmail.com> 于2022年5月16日周一 10:31写道:
> > >> >
> > >> > > Thank Lincoln for the proposal.
> > >> > >
> > >> > > ## Motivation:
> > >> > >
> > >> > > > asyncInvoke and callback functions are executed synchronously by
> > the
> > >> > main
> > >> > > thread, which is not suitable adding long time blocking
> operations,
> > >> and
> > >> > > introducing additional thread will bring extra complexity for
> users
> > >> > >
> > >> > > According to the documentation of AsyncFunction:
> > >> > >
> > >> > > > For each #asyncInvoke, an async io operation can be triggered,
> and
> > >> once
> > >> > > it has been done, the result can be collected by calling {@link
> > >> > > ResultFuture#complete}. For each async operation, its context is
> > >> stored
> > >> > in
> > >> > > the operator immediately after invoking #asyncInvoke, avoiding
> > >> blocking
> > >> > for
> > >> > > each stream input as long as the internal buffer is not full.
> > >> > >
> > >> > > The real external call should happen in the asynchronous thread.
> > >> > >
> > >> > > My question is: If the user makes a retry in this asynchronous
> > thread
> > >> by
> > >> > > themselves, is there a difference between this and the current
> > FLIP's?
> > >> > >
> > >> > > ## Block Main Thread
> > >> > >
> > >> > > If I understand correctly, the retry queue will be put into
> > ListState,
> > >> > this
> > >> > > state is OperatorState? As far as I know, OperatorState does not
> > have
> > >> the
> > >> > > ability to store a lot of data.
> > >> > > So after we need to retry more data, we should need to block the
> > main
> > >> > > thread? What is the maximum size of the default retry queue?
> > >> > >
> > >> > > Best,
> > >> > > Jingsong
> > >> > >
> > >> > > On Thu, May 12, 2022 at 8:56 PM Lincoln Lee <
> lincoln.8...@gmail.com
> > >
> > >> > > wrote:
> > >> > >
> > >> > > > Dear Flink developers,
> > >> > > >
> > >> > > > I would like to open a discussion on FLIP 232 [1],  for an
> > >> extension of
> > >> > > > AsyncWaitOperator to support retry for user's asyncFunction.
> > >> > > >
> > >> > > > To do so, new user interface will added to define the trigger
> > >> condition
> > >> > > for
> > >> > > > retry and when should retry. Internally, a delayed retry
> mechanism
> > >> will
> > >> > > be
> > >> > > > introduced.
> > >> > > >
> > >> > > > There's PoC for this FLIP [2][3], thanks Yun Gao for offline
> > >> > discussions
> > >> > > > and valuable comments.
> > >> > > > The new feature is backwards compatible that can recover from
> > state
> > >> > which
> > >> > > > was generated by prior flink versions, and if no retry strategy
> > >> enabled
> > >> > > the
> > >> > > > behavior is as before.
> > >> > > >
> > >> > > > [1]
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963
> > >> > > > [2] based on timer trigger
> > >> > > > https://github.com/lincoln-lil/flink/pull/new/async-retry-timer
> > >> > > > [3] based on DelayQueue with pull fashion
> > >> > > > https://github.com/lincoln-lil/flink/pull/new/async-op-retry
> > >> > > >
> > >> > > >
> > >> > > > Best,
> > >> > > > Lincoln Lee
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> >
>

Reply via email to