Thanks Lincoln for your reply. I'm a little confused about the relationship between Ordered/Unordered Queue and DelayQueue. Why do we need to have a DelayQueue? Can we remove the DelayQueue and put the state of the retry in the StreamRecordQueueEntry (seems like it's already in the FLIP) The advantages of doing this are: 1. twice less data is stored in state 2. the concept is unified, the user only needs to configure one queue capacity
Best, Jingsong On Mon, May 16, 2022 at 12:10 PM Lincoln Lee <lincoln.8...@gmail.com> wrote: > Hi Jinsong, > Thanks for your feedback! Let me try to answer the two questions: > > For q1: Motivation > Yes, users can implement retries themselves based on the external async > client, but this requires each user to do similar things, and if we can > support retries uniformly, user code would become much simpler. > > > The real external call should happen in the asynchronous thread. > My question is: If the user makes a retry in this asynchronous thread by > themselves, is there a difference between this and the current FLIP's? > > > For q2: Block Main Thread > You're right, the queue data will be stored in the ListState which is an > OperateState, though in fact, for ListState storage, the theoretical upper > limit is Integer.MAX_VALUE, but we can't increase the queue capacity too > big in production because the risk of OOM increases when the queue capacity > grows, and increases the task parallelism maybe a more viable way when > encounter too many retry items for a single task. > We recommend using a proper estimate of queue capacity based on the formula > like this: 'inputRate * retryRate * avgRetryDuration', and also the actual > checkpoint duration in runtime. > > > If I understand correctly, the retry queue will be put into ListState, > this > state is OperatorState? As far as I know, OperatorState does not have the > ability to store a lot of data. > So after we need to retry more data, we should need to block the main > thread? What is the maximum size of the default retry queue? > > > > Best, > Lincoln Lee > > > Jingsong Li <jingsongl...@gmail.com> 于2022年5月16日周一 10:31写道: > > > Thank Lincoln for the proposal. > > > > ## Motivation: > > > > > asyncInvoke and callback functions are executed synchronously by the > main > > thread, which is not suitable adding long time blocking operations, and > > introducing additional thread will bring extra complexity for users > > > > According to the documentation of AsyncFunction: > > > > > For each #asyncInvoke, an async io operation can be triggered, and once > > it has been done, the result can be collected by calling {@link > > ResultFuture#complete}. For each async operation, its context is stored > in > > the operator immediately after invoking #asyncInvoke, avoiding blocking > for > > each stream input as long as the internal buffer is not full. > > > > The real external call should happen in the asynchronous thread. > > > > My question is: If the user makes a retry in this asynchronous thread by > > themselves, is there a difference between this and the current FLIP's? > > > > ## Block Main Thread > > > > If I understand correctly, the retry queue will be put into ListState, > this > > state is OperatorState? As far as I know, OperatorState does not have the > > ability to store a lot of data. > > So after we need to retry more data, we should need to block the main > > thread? What is the maximum size of the default retry queue? > > > > Best, > > Jingsong > > > > On Thu, May 12, 2022 at 8:56 PM Lincoln Lee <lincoln.8...@gmail.com> > > wrote: > > > > > Dear Flink developers, > > > > > > I would like to open a discussion on FLIP 232 [1], for an extension of > > > AsyncWaitOperator to support retry for user's asyncFunction. > > > > > > To do so, new user interface will added to define the trigger condition > > for > > > retry and when should retry. Internally, a delayed retry mechanism will > > be > > > introduced. > > > > > > There's PoC for this FLIP [2][3], thanks Yun Gao for offline > discussions > > > and valuable comments. > > > The new feature is backwards compatible that can recover from state > which > > > was generated by prior flink versions, and if no retry strategy enabled > > the > > > behavior is as before. > > > > > > [1] > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963 > > > [2] based on timer trigger > > > https://github.com/lincoln-lil/flink/pull/new/async-retry-timer > > > [3] based on DelayQueue with pull fashion > > > https://github.com/lincoln-lil/flink/pull/new/async-op-retry > > > > > > > > > Best, > > > Lincoln Lee > > > > > >