Thanks  Lincoln for your reply.

I'm a little confused about the relationship between Ordered/Unordered
Queue and DelayQueue. Why do we need to have a DelayQueue?
Can we remove the DelayQueue and put the state of the retry in the
StreamRecordQueueEntry (seems like it's already in the FLIP)
The advantages of doing this are:
1. twice less data is stored in state
2. the concept is unified, the user only needs to configure one queue
capacity

Best,
Jingsong

On Mon, May 16, 2022 at 12:10 PM Lincoln Lee <lincoln.8...@gmail.com> wrote:

> Hi Jinsong,
> Thanks for your feedback! Let me try to answer the two questions:
>
> For q1: Motivation
> Yes, users can implement retries themselves based on the external  async
> client, but this requires each user to do similar things, and if we can
> support retries uniformly, user code would become much simpler.
>
> > The real external call should happen in the asynchronous thread.
> My question is: If the user makes a retry in this asynchronous thread by
> themselves, is there a difference between this and the current FLIP's?
>
>
> For q2: Block Main Thread
> You're right, the queue data will be stored in the ListState which is an
> OperateState, though in fact, for ListState storage, the theoretical upper
> limit is Integer.MAX_VALUE, but we can't increase the queue capacity too
> big in production because the risk of OOM increases when the queue capacity
> grows, and increases the task parallelism maybe a more viable way when
> encounter too many retry items for a single task.
> We recommend using a proper estimate of queue capacity based on the formula
> like this: 'inputRate * retryRate * avgRetryDuration', and also the actual
> checkpoint duration in runtime.
>
> > If I understand correctly, the retry queue will be put into ListState,
> this
> state is OperatorState? As far as I know, OperatorState does not have the
> ability to store a lot of data.
> So after we need to retry more data, we should need to block the main
> thread? What is the maximum size of the default retry queue?
>
>
>
> Best,
> Lincoln Lee
>
>
> Jingsong Li <jingsongl...@gmail.com> 于2022年5月16日周一 10:31写道:
>
> > Thank Lincoln for the proposal.
> >
> > ## Motivation:
> >
> > > asyncInvoke and callback functions are executed synchronously by the
> main
> > thread, which is not suitable adding long time blocking operations, and
> > introducing additional thread will bring extra complexity for users
> >
> > According to the documentation of AsyncFunction:
> >
> > > For each #asyncInvoke, an async io operation can be triggered, and once
> > it has been done, the result can be collected by calling {@link
> > ResultFuture#complete}. For each async operation, its context is stored
> in
> > the operator immediately after invoking #asyncInvoke, avoiding blocking
> for
> > each stream input as long as the internal buffer is not full.
> >
> > The real external call should happen in the asynchronous thread.
> >
> > My question is: If the user makes a retry in this asynchronous thread by
> > themselves, is there a difference between this and the current FLIP's?
> >
> > ## Block Main Thread
> >
> > If I understand correctly, the retry queue will be put into ListState,
> this
> > state is OperatorState? As far as I know, OperatorState does not have the
> > ability to store a lot of data.
> > So after we need to retry more data, we should need to block the main
> > thread? What is the maximum size of the default retry queue?
> >
> > Best,
> > Jingsong
> >
> > On Thu, May 12, 2022 at 8:56 PM Lincoln Lee <lincoln.8...@gmail.com>
> > wrote:
> >
> > > Dear Flink developers,
> > >
> > > I would like to open a discussion on FLIP 232 [1],  for an extension of
> > > AsyncWaitOperator to support retry for user's asyncFunction.
> > >
> > > To do so, new user interface will added to define the trigger condition
> > for
> > > retry and when should retry. Internally, a delayed retry mechanism will
> > be
> > > introduced.
> > >
> > > There's PoC for this FLIP [2][3], thanks Yun Gao for offline
> discussions
> > > and valuable comments.
> > > The new feature is backwards compatible that can recover from state
> which
> > > was generated by prior flink versions, and if no retry strategy enabled
> > the
> > > behavior is as before.
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963
> > > [2] based on timer trigger
> > > https://github.com/lincoln-lil/flink/pull/new/async-retry-timer
> > > [3] based on DelayQueue with pull fashion
> > > https://github.com/lincoln-lil/flink/pull/new/async-op-retry
> > >
> > >
> > > Best,
> > > Lincoln Lee
> > >
> >
>

Reply via email to