Hi Jark & developers, I'm fine with this, and minor changes:
"timeout from first invoke to final completion of asynchronous operation, may include multiple retries, and will be reset in case of failover" The FLIP[1] was updated including two changes: 1. generic type naming, use OUT instead of T 2. the new api's comments *And if no more new feedback, we will start a VOTE next monday.* [1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963 Best, Lincoln Lee Jark Wu <imj...@gmail.com> 于2022年5月26日周四 23:10写道: > Hi Lincoln, > > What do you think about > "timeout for the asynchronous operation from the first invoke to finally > complete, which may across multiple retry attempts". > > Best, > Jark > > On Wed, 25 May 2022 at 20:29, Lincoln Lee <lincoln.8...@gmail.com> wrote: > > > Hi Jark, > > > > Thanks for your feedback! > > > > for 2) good advice for the generic type naming, use OUT instead of T for > > the async scenario can be better. > > > > for 3) your concern makes sense to me, we should make the change more > > explicitly to users, especially the api itself (although the > documentation > > is necessary, it is not sufficient). And I didn't paste the complete > method > > signature into the FLIP. > > Now review the comments of the new method again, obviously it can not > > eliminate your confusion by just saying: > > '@param timeout for the asynchronous operation to complete include all > > reattempts.' > > > > The 'timeout' we want to clarify is that the user function finally > reaches > > the complete state, including all of the reattempts' time, and there is > no > > separate timeout for each attempt. > > > > In a worst case, if the first async request is stuck until the timeout, > > then enable retry will not improve (we discussed this scenario, in the > case > > of such a stuck, very probability the retry still stucks, and more > > importantly, there's no contract on freeing the resource for the stucked > > request for the user function, so we prefer to keep the behavior as it is > > now) > > > > Do you think it would be easier to understand if changes to: '@param > > timeout for the asynchronous operation that finally complete, including > all > > reattempts and there is no separate timeout for each attempt.' ? > > > > Best, > > Lincoln Lee > > > > > > Jark Wu <imj...@gmail.com> 于2022年5月25日周三 17:45写道: > > > > > Hi Lincoln, > > > > > > Thanks for proposing this retry feature for the async operator, this > > would > > > be very helpful for FLIP-234. > > > It's glad to see the vivid discussion, and the following are my > thoughts: > > > > > > 1) +1 w/o retry state. > > > It's very tricky and hard to implement a semantic exact state for retry > > > (currentAttemps and firstExecTime/costTime > > > may not be enough). I think this might be overdesigned because most > > users > > > are fine with more retries when > > > failover happens. Flink also doesn't provide the exact retry semantic > in > > > other places, e.g. "restart-strategy". > > > > > > 2) It confuses me what's the meaning of generic type <T> > > > of AsyncRetryStrategy and AsyncRetryPredicate. > > > It would be better to add an annotation description for it. In > addition, > > > maybe <OUT> would be better to keep > > > aligned with other async interfaces (e.g. AsyncFunction). > > > > > > 3) timeout parameter: total timeout vs. timeout per async operation > > > According to the Javadoc `AsyncDataStream#orderedWait/unorderedWait`, > the > > > "timeout" parameter is for > > > the asynchronous operation to complete, i.e. every call of > > > `AsyncFunction#asyncInvoke`. When we add a new > > > `orderedWaitWithRetry` method, I think we should keep the meaning of > > > "timeout" unchanged, otherwise, > > > we need a different parameter name and description. > > > > > > Best, > > > Jark > > > > > > On Wed, 25 May 2022 at 15:00, Lincoln Lee <lincoln.8...@gmail.com> > > wrote: > > > > > > > Hi everyone, > > > > > > > > Gen Luo, Yun Gao and I had a long offline discussion about the > > > > implementation of the recovery part. The key point was should we > store > > > the > > > > retry state and do the recovery after the job restart? > > > > > > > > We reached a consensus not to store the retry state for now, which is > > the > > > > clearest for users and does not require any new changes to the > current > > > > recovery behavior. We have discussed three possible options, the > > > behavior > > > > of these three options is identical in normal processing, the only > > > > difference lies in what retry state is recorded when do > checkpointing, > > > and > > > > what is the strategy when recovering. > > > > > > > > More details are updated into the FLIP[1], and the PoC[2] is also > > > updated. > > > > > > > > [1] > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963 > > > > [2] https://github.com/lincoln-lil/flink/tree/async-retry-poc > > > > > > > > Best, > > > > Lincoln Lee > > > > > > > > > > > > Lincoln Lee <lincoln.8...@gmail.com> 于2022年5月24日周二 12:23写道: > > > > > > > > > Hi Gen Luo, > > > > > > > > > > You're right, the total cost time include the failover-restart > time. > > So > > > > > when the failover time exceeds the retry timeout set by the user, > in > > > > fact, > > > > > all the data to be retry after recovery will have no additional > retry > > > > > opportunities, which is equivalent to normal data. In such > > > circumstances, > > > > > the retry state takes no effect. But not all jobs' restart is slow > > and > > > in > > > > > flink it is becoming more and more fast due the continuously > > > > improvements. > > > > > Hope this can help explaining your question. > > > > > > > > > > Best, > > > > > Lincoln Lee > > > > > > > > > > > > > > > Gen Luo <luogen...@gmail.com> 于2022年5月24日周二 11:50写道: > > > > > > > > > >> Hi Lincoln, > > > > >> > > > > >> Thanks for the explanation. I understand your thought, but I'm a > > > little > > > > >> confused by the additional detail. > > > > >> Is the startTime when the record is processed for the first time? > > And > > > > the > > > > >> cost time is counted based on it even after a job recovers from a > > > > failover > > > > >> or is restarted? For the failover case, the records may be > processed > > > > >> successfully when normally running, but after some time (probably > > > longer > > > > >> than the timeout) the job fails and restores, the records in the > > retry > > > > >> state will be timeout and discarded immediately. There's also same > > > > >> situation for the restarting case. I suppose in many cases the > > timeout > > > > >> will > > > > >> be less then the time a job may cost to restart, so in these cases > > the > > > > >> stored in-flight retry attempts will timeout immediately after the > > > > >> restarting, making the retry state meaningless. Please let me know > > if > > > I > > > > >> mistake somthing. > > > > >> > > > > >> Lincoln Lee <lincoln.8...@gmail.com> 于 2022年5月24日周二 10:20写道: > > > > >> > > > > >> > Thanks Gen Luo! > > > > >> > > > > > >> > Agree with you that prefer the simpler design. > > > > >> > > > > > >> > I’d like to share my thoughts on this choice: whether store the > > > retry > > > > >> state > > > > >> > or not only affect the recovery logic, not the per-record > > > processing, > > > > >> so I > > > > >> > just compare the two: > > > > >> > 1. w/ retry state: simple recovery but lost precision > > > > >> > 2. w/o retry state: one more state and little complexly but > > precise > > > > for > > > > >> > users > > > > >> > I prefer the second one for the user perspective, the additional > > > > >> complexity > > > > >> > is manageable. > > > > >> > > > > > >> > One detail that not mentioned in the FLIP: we will check if any > > time > > > > >> left > > > > >> > (now() - startTime > timeout) for next attempt, so the real > total > > > > >> attempts > > > > >> > will always less than or equal to maxAttempts and the total cost > > > time > > > > <= > > > > >> > timeout (one special case is job failover takes too long) > > > > >> > > > > > >> > For the api, I've updated the FLIP[1] > > > > >> > > > > > >> > [1]: > > > > >> > > > > > >> > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963 > > > > >> > > > > > >> > Best, > > > > >> > Lincoln Lee > > > > >> > > > > > >> > > > > > >> > Gen Luo <luogen...@gmail.com> 于2022年5月23日周一 16:54写道: > > > > >> > > > > > >> > > Hi Lincoln, > > > > >> > > > > > > >> > > Thanks for the quick reply. > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > 1. I understand when restarting a job with a savepoint, the > > retry > > > > >> state > > > > >> > can > > > > >> > > ensure the total retry attempts and delay is expected. > However, > > > when > > > > >> > > failover happens while a job is running, the remaining > attempts > > > > >> recorded > > > > >> > in > > > > >> > > the state are actually redid, and of course the total attempts > > are > > > > >> more > > > > >> > > than expected. The delay is indeed one of the concerns, but > I'm > > > > >> wondering > > > > >> > > whether the retry state kept here is really important to users > > or > > > > >> not. In > > > > >> > > my opinion its benefit is limited but it makes the change much > > > more > > > > >> > > complex. I would prefer a simpler solution, in which the retry > > > state > > > > >> is > > > > >> > > still possible to add if the need really arises in the future, > > > but I > > > > >> > > respect your decision. > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > 2. I think adding a currentAttempts parameter to the method is > > > good > > > > >> > enough. > > > > >> > > > > > > >> > > Lincoln Lee <lincoln.8...@gmail.com> 于 2022年5月23日周一 14:52写道: > > > > >> > > > > > > >> > > > Hi Gen Luo, > > > > >> > > > Thanks a lot for your feedback! > > > > >> > > > > > > > >> > > > 1. About the retry state: > > > > >> > > > I considered dropping the retry state which really > simplifies > > > > state > > > > >> > > changes > > > > >> > > > and avoids compatibility handling. The only reason I changed > > my > > > > mind > > > > >> > was > > > > >> > > > that it might be lossy to the user. Elements that has been > > tried > > > > >> > several > > > > >> > > > times but not exhausted its retry opportunities will reset > the > > > > retry > > > > >> > > state > > > > >> > > > after a job failover-restart and start the retry process > again > > > (if > > > > >> the > > > > >> > > > retry condition persists true), which may cause a greater > > delay > > > > for > > > > >> the > > > > >> > > > retried elements, actually retrying more times and for > longer > > > than > > > > >> > > > expected. (Although in the PoC may also have a special case > > when > > > > >> > > > recovering: if the remaining timeout is exhausted for the > > > > >> > recalculation, > > > > >> > > it > > > > >> > > > will execute immediately but will have to register a timeout > > > timer > > > > >> for > > > > >> > > the > > > > >> > > > async, here using an extra backoffTimeMillis) > > > > >> > > > For example, '60s fixed-delay retry if empty result, > > > max-attempts: > > > > >> 5, > > > > >> > > > timeout 300s' > > > > >> > > > When checkpointing, some data has been retry 2 times, then > > > suppose > > > > >> the > > > > >> > > job > > > > >> > > > is restarted and it takes 2min when the restart succeeds, if > > we > > > > drop > > > > >> > the > > > > >> > > > retry state, the worst case will take more 240s(60s * 2 + > > 2min) > > > > >> delay > > > > >> > for > > > > >> > > > users to finish retry. > > > > >> > > > > > > > >> > > > For my understanding(please correct me if I missed > something), > > > if > > > > a > > > > >> job > > > > >> > > is > > > > >> > > > resumed from a previous state and the retry strategy is > > changed, > > > > the > > > > >> > > > elements that need to be recovered in the retry state just > > needs > > > > the > > > > >> > new > > > > >> > > > strategy to take over the current attempts and time that has > > > been > > > > >> used, > > > > >> > > or > > > > >> > > > give up retry if no retry strategy was set. > > > > >> > > > > and can be more compatible when the user restart a job > with > > a > > > > >> changed > > > > >> > > > retry strategy. > > > > >> > > > > > > > >> > > > 2. About the interface, do you think it would be helpful if > > add > > > > the > > > > >> > > > currentAttempts into getBackoffTimeMillis()? e.g., long > > > > >> > > > getBackoffTimeMillis(int currentAttempts) > > > > >> > > > The existing RetryStrategy and RestartBackoffTimeStrategy > were > > > in > > > > my > > > > >> > > > candidate list but not exactly match, and I want to avoid > > > creating > > > > >> the > > > > >> > > new > > > > >> > > > instances for every attempt in RetryStrategy. > > > > >> > > > > > > > >> > > > WDYT? > > > > >> > > > > > > > >> > > > Best, > > > > >> > > > Lincoln Lee > > > > >> > > > > > > > >> > > > > > > > >> > > > Gen Luo <luogen...@gmail.com> 于2022年5月23日周一 11:37写道: > > > > >> > > > > > > > >> > > > > Thank Lincoln for the proposal! > > > > >> > > > > > > > > >> > > > > The FLIP looks good to me. I'm in favor of the timer based > > > > >> > > > implementation, > > > > >> > > > > and I'd like to share some thoughts. > > > > >> > > > > > > > > >> > > > > I'm thinking if we have to store the retry status in the > > > state. > > > > I > > > > >> > > suppose > > > > >> > > > > the retrying requests can just submit as the first attempt > > > when > > > > >> the > > > > >> > job > > > > >> > > > > restores from a checkpoint, since in fact the side effect > of > > > the > > > > >> > > retries > > > > >> > > > > can not draw back by the restoring. This makes the state > > > simpler > > > > >> and > > > > >> > > > makes > > > > >> > > > > it unnecessary to do the state migration, and can be more > > > > >> compatible > > > > >> > > when > > > > >> > > > > the user restart a job with a changed retry strategy. > > > > >> > > > > > > > > >> > > > > Besides, I find it hard to implement a flexible backoff > > > strategy > > > > >> with > > > > >> > > the > > > > >> > > > > current AsyncRetryStrategy interface, for example an > > > > >> > > > > ExponentialBackoffRetryStrategy. Maybe we can add a > > parameter > > > of > > > > >> the > > > > >> > > > > attempt or just use the > > > > >> > org.apache.flink.util.concurrent.RetryStrategy > > > > >> > > to > > > > >> > > > > take the place of the retry strategy part in the > > > > >> AsyncRetryStrategy? > > > > >> > > > > > > > > >> > > > > Lincoln Lee <lincoln.8...@gmail.com> 于 2022年5月20日周五 > > 14:24写道: > > > > >> > > > > > > > > >> > > > > > Hi everyone, > > > > >> > > > > > > > > > >> > > > > > By comparing the two internal implementations of > > delayed > > > > >> > retries, > > > > >> > > we > > > > >> > > > > > prefer the timer-based solution, which obtains precise > > delay > > > > >> > control > > > > >> > > > > > through simple logic and only needs to pay (what we > > consider > > > > to > > > > >> be > > > > >> > > > > > acceptable) timer instance cost for the retry element. > > The > > > > >> FLIP[1] > > > > >> > > doc > > > > >> > > > > has > > > > >> > > > > > been updated. > > > > >> > > > > > > > > > >> > > > > > [1]: > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963 > > > > >> > > > > > > > > > >> > > > > > Best, > > > > >> > > > > > Lincoln Lee > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > Lincoln Lee <lincoln.8...@gmail.com> 于2022年5月16日周一 > > 15:09写道: > > > > >> > > > > > > > > > >> > > > > > > Hi Jinsong, > > > > >> > > > > > > > > > > >> > > > > > > Good question! > > > > >> > > > > > > > > > > >> > > > > > > The delayQueue is very similar to incompleteElements > in > > > > >> > > > > > > UnorderedStreamElementQueue, it only records the > > > references > > > > of > > > > >> > > > > in-flight > > > > >> > > > > > > retry elements, the core value is for the ease of a > fast > > > > scan > > > > >> > when > > > > >> > > > > force > > > > >> > > > > > > flush during endInput and less refactor for existing > > > logic. > > > > >> > > > > > > > > > > >> > > > > > > Users needn't configure a new capacity for the > > delayQueue, > > > > >> just > > > > >> > > turn > > > > >> > > > > the > > > > >> > > > > > > original one up (if needed). > > > > >> > > > > > > And separately store the input data and retry state is > > > > mainly > > > > >> to > > > > >> > > > > > implement > > > > >> > > > > > > backwards compatibility. The first version of Poc, I > > used > > > a > > > > >> > single > > > > >> > > > > > combined > > > > >> > > > > > > state in order to reduce state costs, but hard to keep > > > > >> > > compatibility, > > > > >> > > > > and > > > > >> > > > > > > changed into two via Yun Gao's concern about the > > > > >> compatibility. > > > > >> > > > > > > > > > > >> > > > > > > Best, > > > > >> > > > > > > Lincoln Lee > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > Jingsong Li <jingsongl...@gmail.com> 于2022年5月16日周一 > > > 14:48写道: > > > > >> > > > > > > > > > > >> > > > > > >> Thanks Lincoln for your reply. > > > > >> > > > > > >> > > > > >> > > > > > >> I'm a little confused about the relationship between > > > > >> > > > Ordered/Unordered > > > > >> > > > > > >> Queue and DelayQueue. Why do we need to have a > > > DelayQueue? > > > > >> > > > > > >> Can we remove the DelayQueue and put the state of the > > > retry > > > > >> in > > > > >> > the > > > > >> > > > > > >> StreamRecordQueueEntry (seems like it's already in > the > > > > FLIP) > > > > >> > > > > > >> The advantages of doing this are: > > > > >> > > > > > >> 1. twice less data is stored in state > > > > >> > > > > > >> 2. the concept is unified, the user only needs to > > > configure > > > > >> one > > > > >> > > > queue > > > > >> > > > > > >> capacity > > > > >> > > > > > >> > > > > >> > > > > > >> Best, > > > > >> > > > > > >> Jingsong > > > > >> > > > > > >> > > > > >> > > > > > >> On Mon, May 16, 2022 at 12:10 PM Lincoln Lee < > > > > >> > > > lincoln.8...@gmail.com> > > > > >> > > > > > >> wrote: > > > > >> > > > > > >> > > > > >> > > > > > >> > Hi Jinsong, > > > > >> > > > > > >> > Thanks for your feedback! Let me try to answer the > > two > > > > >> > > questions: > > > > >> > > > > > >> > > > > > >> > > > > > >> > For q1: Motivation > > > > >> > > > > > >> > Yes, users can implement retries themselves based > on > > > the > > > > >> > > external > > > > >> > > > > > async > > > > >> > > > > > >> > client, but this requires each user to do similar > > > things, > > > > >> and > > > > >> > if > > > > >> > > > we > > > > >> > > > > > can > > > > >> > > > > > >> > support retries uniformly, user code would become > > much > > > > >> > simpler. > > > > >> > > > > > >> > > > > > >> > > > > > >> > > The real external call should happen in the > > > > asynchronous > > > > >> > > thread. > > > > >> > > > > > >> > My question is: If the user makes a retry in this > > > > >> asynchronous > > > > >> > > > > thread > > > > >> > > > > > by > > > > >> > > > > > >> > themselves, is there a difference between this and > > the > > > > >> current > > > > >> > > > > FLIP's? > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > For q2: Block Main Thread > > > > >> > > > > > >> > You're right, the queue data will be stored in the > > > > >> ListState > > > > >> > > which > > > > >> > > > > is > > > > >> > > > > > an > > > > >> > > > > > >> > OperateState, though in fact, for ListState > storage, > > > the > > > > >> > > > theoretical > > > > >> > > > > > >> upper > > > > >> > > > > > >> > limit is Integer.MAX_VALUE, but we can't increase > the > > > > queue > > > > >> > > > capacity > > > > >> > > > > > too > > > > >> > > > > > >> > big in production because the risk of OOM increases > > > when > > > > >> the > > > > >> > > queue > > > > >> > > > > > >> capacity > > > > >> > > > > > >> > grows, and increases the task parallelism maybe a > > more > > > > >> viable > > > > >> > > way > > > > >> > > > > when > > > > >> > > > > > >> > encounter too many retry items for a single task. > > > > >> > > > > > >> > We recommend using a proper estimate of queue > > capacity > > > > >> based > > > > >> > on > > > > >> > > > the > > > > >> > > > > > >> formula > > > > >> > > > > > >> > like this: 'inputRate * retryRate * > > avgRetryDuration', > > > > and > > > > >> > also > > > > >> > > > the > > > > >> > > > > > >> actual > > > > >> > > > > > >> > checkpoint duration in runtime. > > > > >> > > > > > >> > > > > > >> > > > > > >> > > If I understand correctly, the retry queue will > be > > > put > > > > >> into > > > > >> > > > > > ListState, > > > > >> > > > > > >> > this > > > > >> > > > > > >> > state is OperatorState? As far as I know, > > OperatorState > > > > >> does > > > > >> > not > > > > >> > > > > have > > > > >> > > > > > >> the > > > > >> > > > > > >> > ability to store a lot of data. > > > > >> > > > > > >> > So after we need to retry more data, we should need > > to > > > > >> block > > > > >> > the > > > > >> > > > > main > > > > >> > > > > > >> > thread? What is the maximum size of the default > retry > > > > >> queue? > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > Best, > > > > >> > > > > > >> > Lincoln Lee > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > Jingsong Li <jingsongl...@gmail.com> 于2022年5月16日周一 > > > > >> 10:31写道: > > > > >> > > > > > >> > > > > > >> > > > > > >> > > Thank Lincoln for the proposal. > > > > >> > > > > > >> > > > > > > >> > > > > > >> > > ## Motivation: > > > > >> > > > > > >> > > > > > > >> > > > > > >> > > > asyncInvoke and callback functions are executed > > > > >> > > synchronously > > > > >> > > > by > > > > >> > > > > > the > > > > >> > > > > > >> > main > > > > >> > > > > > >> > > thread, which is not suitable adding long time > > > blocking > > > > >> > > > > operations, > > > > >> > > > > > >> and > > > > >> > > > > > >> > > introducing additional thread will bring extra > > > > complexity > > > > >> > for > > > > >> > > > > users > > > > >> > > > > > >> > > > > > > >> > > > > > >> > > According to the documentation of AsyncFunction: > > > > >> > > > > > >> > > > > > > >> > > > > > >> > > > For each #asyncInvoke, an async io operation > can > > be > > > > >> > > triggered, > > > > >> > > > > and > > > > >> > > > > > >> once > > > > >> > > > > > >> > > it has been done, the result can be collected by > > > > calling > > > > >> > > {@link > > > > >> > > > > > >> > > ResultFuture#complete}. For each async operation, > > its > > > > >> > context > > > > >> > > is > > > > >> > > > > > >> stored > > > > >> > > > > > >> > in > > > > >> > > > > > >> > > the operator immediately after invoking > > #asyncInvoke, > > > > >> > avoiding > > > > >> > > > > > >> blocking > > > > >> > > > > > >> > for > > > > >> > > > > > >> > > each stream input as long as the internal buffer > is > > > not > > > > >> > full. > > > > >> > > > > > >> > > > > > > >> > > > > > >> > > The real external call should happen in the > > > > asynchronous > > > > >> > > thread. > > > > >> > > > > > >> > > > > > > >> > > > > > >> > > My question is: If the user makes a retry in this > > > > >> > asynchronous > > > > >> > > > > > thread > > > > >> > > > > > >> by > > > > >> > > > > > >> > > themselves, is there a difference between this > and > > > the > > > > >> > current > > > > >> > > > > > FLIP's? > > > > >> > > > > > >> > > > > > > >> > > > > > >> > > ## Block Main Thread > > > > >> > > > > > >> > > > > > > >> > > > > > >> > > If I understand correctly, the retry queue will > be > > > put > > > > >> into > > > > >> > > > > > ListState, > > > > >> > > > > > >> > this > > > > >> > > > > > >> > > state is OperatorState? As far as I know, > > > OperatorState > > > > >> does > > > > >> > > not > > > > >> > > > > > have > > > > >> > > > > > >> the > > > > >> > > > > > >> > > ability to store a lot of data. > > > > >> > > > > > >> > > So after we need to retry more data, we should > need > > > to > > > > >> block > > > > >> > > the > > > > >> > > > > > main > > > > >> > > > > > >> > > thread? What is the maximum size of the default > > retry > > > > >> queue? > > > > >> > > > > > >> > > > > > > >> > > > > > >> > > Best, > > > > >> > > > > > >> > > Jingsong > > > > >> > > > > > >> > > > > > > >> > > > > > >> > > On Thu, May 12, 2022 at 8:56 PM Lincoln Lee < > > > > >> > > > > lincoln.8...@gmail.com > > > > >> > > > > > > > > > > >> > > > > > >> > > wrote: > > > > >> > > > > > >> > > > > > > >> > > > > > >> > > > Dear Flink developers, > > > > >> > > > > > >> > > > > > > > >> > > > > > >> > > > I would like to open a discussion on FLIP 232 > > [1], > > > > >> for an > > > > >> > > > > > >> extension of > > > > >> > > > > > >> > > > AsyncWaitOperator to support retry for user's > > > > >> > asyncFunction. > > > > >> > > > > > >> > > > > > > > >> > > > > > >> > > > To do so, new user interface will added to > define > > > the > > > > >> > > trigger > > > > >> > > > > > >> condition > > > > >> > > > > > >> > > for > > > > >> > > > > > >> > > > retry and when should retry. Internally, a > > delayed > > > > >> retry > > > > >> > > > > mechanism > > > > >> > > > > > >> will > > > > >> > > > > > >> > > be > > > > >> > > > > > >> > > > introduced. > > > > >> > > > > > >> > > > > > > > >> > > > > > >> > > > There's PoC for this FLIP [2][3], thanks Yun > Gao > > > for > > > > >> > offline > > > > >> > > > > > >> > discussions > > > > >> > > > > > >> > > > and valuable comments. > > > > >> > > > > > >> > > > The new feature is backwards compatible that > can > > > > >> recover > > > > >> > > from > > > > >> > > > > > state > > > > >> > > > > > >> > which > > > > >> > > > > > >> > > > was generated by prior flink versions, and if > no > > > > retry > > > > >> > > > strategy > > > > >> > > > > > >> enabled > > > > >> > > > > > >> > > the > > > > >> > > > > > >> > > > behavior is as before. > > > > >> > > > > > >> > > > > > > > >> > > > > > >> > > > [1] > > > > >> > > > > > >> > > > > > > > >> > > > > > >> > > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963 > > > > >> > > > > > >> > > > [2] based on timer trigger > > > > >> > > > > > >> > > > > > > > >> > > > > > https://github.com/lincoln-lil/flink/pull/new/async-retry-timer > > > > >> > > > > > >> > > > [3] based on DelayQueue with pull fashion > > > > >> > > > > > >> > > > > > > > >> > > https://github.com/lincoln-lil/flink/pull/new/async-op-retry > > > > >> > > > > > >> > > > > > > > >> > > > > > >> > > > > > > > >> > > > > > >> > > > Best, > > > > >> > > > > > >> > > > Lincoln Lee > > > > >> > > > > > >> > > > > > > > >> > > > > > >> > > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > > > > > >