Thanks Gen Luo! Agree with you that prefer the simpler design.
I’d like to share my thoughts on this choice: whether store the retry state or not only affect the recovery logic, not the per-record processing, so I just compare the two: 1. w/ retry state: simple recovery but lost precision 2. w/o retry state: one more state and little complexly but precise for users I prefer the second one for the user perspective, the additional complexity is manageable. One detail that not mentioned in the FLIP: we will check if any time left (now() - startTime > timeout) for next attempt, so the real total attempts will always less than or equal to maxAttempts and the total cost time <= timeout (one special case is job failover takes too long) For the api, I've updated the FLIP[1] [1]: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963 Best, Lincoln Lee Gen Luo <luogen...@gmail.com> 于2022年5月23日周一 16:54写道: > Hi Lincoln, > > Thanks for the quick reply. > > > > 1. I understand when restarting a job with a savepoint, the retry state can > ensure the total retry attempts and delay is expected. However, when > failover happens while a job is running, the remaining attempts recorded in > the state are actually redid, and of course the total attempts are more > than expected. The delay is indeed one of the concerns, but I'm wondering > whether the retry state kept here is really important to users or not. In > my opinion its benefit is limited but it makes the change much more > complex. I would prefer a simpler solution, in which the retry state is > still possible to add if the need really arises in the future, but I > respect your decision. > > > > 2. I think adding a currentAttempts parameter to the method is good enough. > > Lincoln Lee <lincoln.8...@gmail.com> 于 2022年5月23日周一 14:52写道: > > > Hi Gen Luo, > > Thanks a lot for your feedback! > > > > 1. About the retry state: > > I considered dropping the retry state which really simplifies state > changes > > and avoids compatibility handling. The only reason I changed my mind was > > that it might be lossy to the user. Elements that has been tried several > > times but not exhausted its retry opportunities will reset the retry > state > > after a job failover-restart and start the retry process again (if the > > retry condition persists true), which may cause a greater delay for the > > retried elements, actually retrying more times and for longer than > > expected. (Although in the PoC may also have a special case when > > recovering: if the remaining timeout is exhausted for the recalculation, > it > > will execute immediately but will have to register a timeout timer for > the > > async, here using an extra backoffTimeMillis) > > For example, '60s fixed-delay retry if empty result, max-attempts: 5, > > timeout 300s' > > When checkpointing, some data has been retry 2 times, then suppose the > job > > is restarted and it takes 2min when the restart succeeds, if we drop the > > retry state, the worst case will take more 240s(60s * 2 + 2min) delay for > > users to finish retry. > > > > For my understanding(please correct me if I missed something), if a job > is > > resumed from a previous state and the retry strategy is changed, the > > elements that need to be recovered in the retry state just needs the new > > strategy to take over the current attempts and time that has been used, > or > > give up retry if no retry strategy was set. > > > and can be more compatible when the user restart a job with a changed > > retry strategy. > > > > 2. About the interface, do you think it would be helpful if add the > > currentAttempts into getBackoffTimeMillis()? e.g., long > > getBackoffTimeMillis(int currentAttempts) > > The existing RetryStrategy and RestartBackoffTimeStrategy were in my > > candidate list but not exactly match, and I want to avoid creating the > new > > instances for every attempt in RetryStrategy. > > > > WDYT? > > > > Best, > > Lincoln Lee > > > > > > Gen Luo <luogen...@gmail.com> 于2022年5月23日周一 11:37写道: > > > > > Thank Lincoln for the proposal! > > > > > > The FLIP looks good to me. I'm in favor of the timer based > > implementation, > > > and I'd like to share some thoughts. > > > > > > I'm thinking if we have to store the retry status in the state. I > suppose > > > the retrying requests can just submit as the first attempt when the job > > > restores from a checkpoint, since in fact the side effect of the > retries > > > can not draw back by the restoring. This makes the state simpler and > > makes > > > it unnecessary to do the state migration, and can be more compatible > when > > > the user restart a job with a changed retry strategy. > > > > > > Besides, I find it hard to implement a flexible backoff strategy with > the > > > current AsyncRetryStrategy interface, for example an > > > ExponentialBackoffRetryStrategy. Maybe we can add a parameter of the > > > attempt or just use the org.apache.flink.util.concurrent.RetryStrategy > to > > > take the place of the retry strategy part in the AsyncRetryStrategy? > > > > > > Lincoln Lee <lincoln.8...@gmail.com> 于 2022年5月20日周五 14:24写道: > > > > > > > Hi everyone, > > > > > > > > By comparing the two internal implementations of delayed retries, > we > > > > prefer the timer-based solution, which obtains precise delay control > > > > through simple logic and only needs to pay (what we consider to be > > > > acceptable) timer instance cost for the retry element. The FLIP[1] > doc > > > has > > > > been updated. > > > > > > > > [1]: > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963 > > > > > > > > Best, > > > > Lincoln Lee > > > > > > > > > > > > Lincoln Lee <lincoln.8...@gmail.com> 于2022年5月16日周一 15:09写道: > > > > > > > > > Hi Jinsong, > > > > > > > > > > Good question! > > > > > > > > > > The delayQueue is very similar to incompleteElements in > > > > > UnorderedStreamElementQueue, it only records the references of > > > in-flight > > > > > retry elements, the core value is for the ease of a fast scan when > > > force > > > > > flush during endInput and less refactor for existing logic. > > > > > > > > > > Users needn't configure a new capacity for the delayQueue, just > turn > > > the > > > > > original one up (if needed). > > > > > And separately store the input data and retry state is mainly to > > > > implement > > > > > backwards compatibility. The first version of Poc, I used a single > > > > combined > > > > > state in order to reduce state costs, but hard to keep > compatibility, > > > and > > > > > changed into two via Yun Gao's concern about the compatibility. > > > > > > > > > > Best, > > > > > Lincoln Lee > > > > > > > > > > > > > > > Jingsong Li <jingsongl...@gmail.com> 于2022年5月16日周一 14:48写道: > > > > > > > > > >> Thanks Lincoln for your reply. > > > > >> > > > > >> I'm a little confused about the relationship between > > Ordered/Unordered > > > > >> Queue and DelayQueue. Why do we need to have a DelayQueue? > > > > >> Can we remove the DelayQueue and put the state of the retry in the > > > > >> StreamRecordQueueEntry (seems like it's already in the FLIP) > > > > >> The advantages of doing this are: > > > > >> 1. twice less data is stored in state > > > > >> 2. the concept is unified, the user only needs to configure one > > queue > > > > >> capacity > > > > >> > > > > >> Best, > > > > >> Jingsong > > > > >> > > > > >> On Mon, May 16, 2022 at 12:10 PM Lincoln Lee < > > lincoln.8...@gmail.com> > > > > >> wrote: > > > > >> > > > > >> > Hi Jinsong, > > > > >> > Thanks for your feedback! Let me try to answer the two > questions: > > > > >> > > > > > >> > For q1: Motivation > > > > >> > Yes, users can implement retries themselves based on the > external > > > > async > > > > >> > client, but this requires each user to do similar things, and if > > we > > > > can > > > > >> > support retries uniformly, user code would become much simpler. > > > > >> > > > > > >> > > The real external call should happen in the asynchronous > thread. > > > > >> > My question is: If the user makes a retry in this asynchronous > > > thread > > > > by > > > > >> > themselves, is there a difference between this and the current > > > FLIP's? > > > > >> > > > > > >> > > > > > >> > For q2: Block Main Thread > > > > >> > You're right, the queue data will be stored in the ListState > which > > > is > > > > an > > > > >> > OperateState, though in fact, for ListState storage, the > > theoretical > > > > >> upper > > > > >> > limit is Integer.MAX_VALUE, but we can't increase the queue > > capacity > > > > too > > > > >> > big in production because the risk of OOM increases when the > queue > > > > >> capacity > > > > >> > grows, and increases the task parallelism maybe a more viable > way > > > when > > > > >> > encounter too many retry items for a single task. > > > > >> > We recommend using a proper estimate of queue capacity based on > > the > > > > >> formula > > > > >> > like this: 'inputRate * retryRate * avgRetryDuration', and also > > the > > > > >> actual > > > > >> > checkpoint duration in runtime. > > > > >> > > > > > >> > > If I understand correctly, the retry queue will be put into > > > > ListState, > > > > >> > this > > > > >> > state is OperatorState? As far as I know, OperatorState does not > > > have > > > > >> the > > > > >> > ability to store a lot of data. > > > > >> > So after we need to retry more data, we should need to block the > > > main > > > > >> > thread? What is the maximum size of the default retry queue? > > > > >> > > > > > >> > > > > > >> > > > > > >> > Best, > > > > >> > Lincoln Lee > > > > >> > > > > > >> > > > > > >> > Jingsong Li <jingsongl...@gmail.com> 于2022年5月16日周一 10:31写道: > > > > >> > > > > > >> > > Thank Lincoln for the proposal. > > > > >> > > > > > > >> > > ## Motivation: > > > > >> > > > > > > >> > > > asyncInvoke and callback functions are executed > synchronously > > by > > > > the > > > > >> > main > > > > >> > > thread, which is not suitable adding long time blocking > > > operations, > > > > >> and > > > > >> > > introducing additional thread will bring extra complexity for > > > users > > > > >> > > > > > > >> > > According to the documentation of AsyncFunction: > > > > >> > > > > > > >> > > > For each #asyncInvoke, an async io operation can be > triggered, > > > and > > > > >> once > > > > >> > > it has been done, the result can be collected by calling > {@link > > > > >> > > ResultFuture#complete}. For each async operation, its context > is > > > > >> stored > > > > >> > in > > > > >> > > the operator immediately after invoking #asyncInvoke, avoiding > > > > >> blocking > > > > >> > for > > > > >> > > each stream input as long as the internal buffer is not full. > > > > >> > > > > > > >> > > The real external call should happen in the asynchronous > thread. > > > > >> > > > > > > >> > > My question is: If the user makes a retry in this asynchronous > > > > thread > > > > >> by > > > > >> > > themselves, is there a difference between this and the current > > > > FLIP's? > > > > >> > > > > > > >> > > ## Block Main Thread > > > > >> > > > > > > >> > > If I understand correctly, the retry queue will be put into > > > > ListState, > > > > >> > this > > > > >> > > state is OperatorState? As far as I know, OperatorState does > not > > > > have > > > > >> the > > > > >> > > ability to store a lot of data. > > > > >> > > So after we need to retry more data, we should need to block > the > > > > main > > > > >> > > thread? What is the maximum size of the default retry queue? > > > > >> > > > > > > >> > > Best, > > > > >> > > Jingsong > > > > >> > > > > > > >> > > On Thu, May 12, 2022 at 8:56 PM Lincoln Lee < > > > lincoln.8...@gmail.com > > > > > > > > > >> > > wrote: > > > > >> > > > > > > >> > > > Dear Flink developers, > > > > >> > > > > > > > >> > > > I would like to open a discussion on FLIP 232 [1], for an > > > > >> extension of > > > > >> > > > AsyncWaitOperator to support retry for user's asyncFunction. > > > > >> > > > > > > > >> > > > To do so, new user interface will added to define the > trigger > > > > >> condition > > > > >> > > for > > > > >> > > > retry and when should retry. Internally, a delayed retry > > > mechanism > > > > >> will > > > > >> > > be > > > > >> > > > introduced. > > > > >> > > > > > > > >> > > > There's PoC for this FLIP [2][3], thanks Yun Gao for offline > > > > >> > discussions > > > > >> > > > and valuable comments. > > > > >> > > > The new feature is backwards compatible that can recover > from > > > > state > > > > >> > which > > > > >> > > > was generated by prior flink versions, and if no retry > > strategy > > > > >> enabled > > > > >> > > the > > > > >> > > > behavior is as before. > > > > >> > > > > > > > >> > > > [1] > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963 > > > > >> > > > [2] based on timer trigger > > > > >> > > > > > https://github.com/lincoln-lil/flink/pull/new/async-retry-timer > > > > >> > > > [3] based on DelayQueue with pull fashion > > > > >> > > > > https://github.com/lincoln-lil/flink/pull/new/async-op-retry > > > > >> > > > > > > > >> > > > > > > > >> > > > Best, > > > > >> > > > Lincoln Lee > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > > > > > >