Hi everyone, By comparing the two internal implementations of delayed retries, we prefer the timer-based solution, which obtains precise delay control through simple logic and only needs to pay (what we consider to be acceptable) timer instance cost for the retry element. The FLIP[1] doc has been updated.
[1]: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963 Best, Lincoln Lee Lincoln Lee <lincoln.8...@gmail.com> 于2022年5月16日周一 15:09写道: > Hi Jinsong, > > Good question! > > The delayQueue is very similar to incompleteElements in > UnorderedStreamElementQueue, it only records the references of in-flight > retry elements, the core value is for the ease of a fast scan when force > flush during endInput and less refactor for existing logic. > > Users needn't configure a new capacity for the delayQueue, just turn the > original one up (if needed). > And separately store the input data and retry state is mainly to implement > backwards compatibility. The first version of Poc, I used a single combined > state in order to reduce state costs, but hard to keep compatibility, and > changed into two via Yun Gao's concern about the compatibility. > > Best, > Lincoln Lee > > > Jingsong Li <jingsongl...@gmail.com> 于2022年5月16日周一 14:48写道: > >> Thanks Lincoln for your reply. >> >> I'm a little confused about the relationship between Ordered/Unordered >> Queue and DelayQueue. Why do we need to have a DelayQueue? >> Can we remove the DelayQueue and put the state of the retry in the >> StreamRecordQueueEntry (seems like it's already in the FLIP) >> The advantages of doing this are: >> 1. twice less data is stored in state >> 2. the concept is unified, the user only needs to configure one queue >> capacity >> >> Best, >> Jingsong >> >> On Mon, May 16, 2022 at 12:10 PM Lincoln Lee <lincoln.8...@gmail.com> >> wrote: >> >> > Hi Jinsong, >> > Thanks for your feedback! Let me try to answer the two questions: >> > >> > For q1: Motivation >> > Yes, users can implement retries themselves based on the external async >> > client, but this requires each user to do similar things, and if we can >> > support retries uniformly, user code would become much simpler. >> > >> > > The real external call should happen in the asynchronous thread. >> > My question is: If the user makes a retry in this asynchronous thread by >> > themselves, is there a difference between this and the current FLIP's? >> > >> > >> > For q2: Block Main Thread >> > You're right, the queue data will be stored in the ListState which is an >> > OperateState, though in fact, for ListState storage, the theoretical >> upper >> > limit is Integer.MAX_VALUE, but we can't increase the queue capacity too >> > big in production because the risk of OOM increases when the queue >> capacity >> > grows, and increases the task parallelism maybe a more viable way when >> > encounter too many retry items for a single task. >> > We recommend using a proper estimate of queue capacity based on the >> formula >> > like this: 'inputRate * retryRate * avgRetryDuration', and also the >> actual >> > checkpoint duration in runtime. >> > >> > > If I understand correctly, the retry queue will be put into ListState, >> > this >> > state is OperatorState? As far as I know, OperatorState does not have >> the >> > ability to store a lot of data. >> > So after we need to retry more data, we should need to block the main >> > thread? What is the maximum size of the default retry queue? >> > >> > >> > >> > Best, >> > Lincoln Lee >> > >> > >> > Jingsong Li <jingsongl...@gmail.com> 于2022年5月16日周一 10:31写道: >> > >> > > Thank Lincoln for the proposal. >> > > >> > > ## Motivation: >> > > >> > > > asyncInvoke and callback functions are executed synchronously by the >> > main >> > > thread, which is not suitable adding long time blocking operations, >> and >> > > introducing additional thread will bring extra complexity for users >> > > >> > > According to the documentation of AsyncFunction: >> > > >> > > > For each #asyncInvoke, an async io operation can be triggered, and >> once >> > > it has been done, the result can be collected by calling {@link >> > > ResultFuture#complete}. For each async operation, its context is >> stored >> > in >> > > the operator immediately after invoking #asyncInvoke, avoiding >> blocking >> > for >> > > each stream input as long as the internal buffer is not full. >> > > >> > > The real external call should happen in the asynchronous thread. >> > > >> > > My question is: If the user makes a retry in this asynchronous thread >> by >> > > themselves, is there a difference between this and the current FLIP's? >> > > >> > > ## Block Main Thread >> > > >> > > If I understand correctly, the retry queue will be put into ListState, >> > this >> > > state is OperatorState? As far as I know, OperatorState does not have >> the >> > > ability to store a lot of data. >> > > So after we need to retry more data, we should need to block the main >> > > thread? What is the maximum size of the default retry queue? >> > > >> > > Best, >> > > Jingsong >> > > >> > > On Thu, May 12, 2022 at 8:56 PM Lincoln Lee <lincoln.8...@gmail.com> >> > > wrote: >> > > >> > > > Dear Flink developers, >> > > > >> > > > I would like to open a discussion on FLIP 232 [1], for an >> extension of >> > > > AsyncWaitOperator to support retry for user's asyncFunction. >> > > > >> > > > To do so, new user interface will added to define the trigger >> condition >> > > for >> > > > retry and when should retry. Internally, a delayed retry mechanism >> will >> > > be >> > > > introduced. >> > > > >> > > > There's PoC for this FLIP [2][3], thanks Yun Gao for offline >> > discussions >> > > > and valuable comments. >> > > > The new feature is backwards compatible that can recover from state >> > which >> > > > was generated by prior flink versions, and if no retry strategy >> enabled >> > > the >> > > > behavior is as before. >> > > > >> > > > [1] >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963 >> > > > [2] based on timer trigger >> > > > https://github.com/lincoln-lil/flink/pull/new/async-retry-timer >> > > > [3] based on DelayQueue with pull fashion >> > > > https://github.com/lincoln-lil/flink/pull/new/async-op-retry >> > > > >> > > > >> > > > Best, >> > > > Lincoln Lee >> > > > >> > > >> > >> >