Hi Dong,

Starting from the end:

> It seems that the only benefit of this approach is to avoid"
> adding SplitEnumeratorContext#setIsProcessingBacklog."

Yes, that's the major benefit of this counter-proposal.

> In the target use-case, user still want to do checkpoint (though at a"
> larger interval) when there is backlog. And HybridSource need to know the"
> expected checkpoint interval during backlog in order to determine whether"
> it should keep throwing CheckpointException. Thus, we still need to add"
> execution.checkpointing.interval-during-backlog for user to specify this"
> information."
>
> The downside of this approach is that it is hard to enforce the"
> semantics specified by execution.checkpointing.interval-during-backlog.
For"
> example, suppose execution.checkpointing.interval =3 minute and"
> execution.checkpointing.interval-during-backlog = 7 minutes. During the"
> backlog phase, checkpoint coordinator will still trigger the checkpoint"
> once every 3 minutes. HybridSource will need to reject 2 out of the 3"
> checkpoint invocation, and the effective checkpoint interval will be 9"
> minutes."

Does it really matter what's the exact value of the longer interval? Can
not we
hard-code it to be 5x or 10x of the base checkpoint interval? If there is a
notice
able overhead from the base interval slowing down records processing rate,
reducing this interval by a factor of 5x or 10x, would fix performance
issue for
vast majority of users. So a source could just skip 4 out of 5 or 9 out of
10
checkpoints.

Alternatively we could introduce a config option like:

execution.checkpointing.long-interval

that might be re-used in the future, with more fancy algorithms, but I
don't see
much value in doing that.

> Overall, I think the solution is a bit hacky. I think it is preferred to"
> throw exception only when there is indeed error. If we don't need to
check"
> a checkpoint, it is preferred to not trigger the checkpoint in the first"
> place. And I think adding SplitEnumeratorContext#setIsProcessingBacklog
is"
> probably not that much of a big deal."

Yes it's hacky, but at least it doesn't require extending the Public API
for a
quite limited solution, that only targets one or two sources that are
rarely used.

================

About the idea of emitting "RecordAttributes(isBacklog=..)". I have a
feeling that
this is overly complicated and would require every operator/function to
handle that.

Yes it would cover even more use cases, at the cost of complicating the
system by
a lot. IMO it looks like something we could do if there would indeed by a
high
demand of such a feature, after we provide some baseline generic solution,
that
doesn't require any configuration.

I have a feeling that by just statically looking at the shape of the job
graph and how
it is connected, we could deduce almost the same things.

Also:

>  - the FLIP suggests to use the long checkpointing interval as long as
any subtask is processing the backlog. Are you sure that's the right call?
What if other
>  sources are producing fresh records, and those fresh records are
reaching sinks? It could happen either with disjoint JobGraph, embarrassing
parallel
>  JobGraph (no keyBy/unions/joins), or even with keyBy. Fresh records can
slip using a not backpressured input channel through generally backpressured
>  keyBy exchange. How should we handle that? This problem I think will
affect every solution, including my previously proposed generic one, but we
should
>  discuss how to handle that as well.

By this I didn't necessarily mean that we have to solve it right now.

================

> The moments above seem kind of "abstract". I am hoping to understand more
> technical details behind these comments so that we can see how to address
> the concern.

Over the span of this discussion I think I have already explained many
times what
bothers me in the current proposal.

> For example, even if a FLP does not address all use-case
> (which is arguably true for every FLIP), its solution does not necessarily
> need to be thrown away later as long as it is extensible

That's my main point. I haven't yet seen how proposals from this FLIP, that
could
extend FLIP-309 to cover the generic use case and:
 - Would work out of the box, for all or majority of the properly
implemented sources.
 - Would require zero or very minimal configuration/input from the user.
Especially
   wouldn't require implementing some custom things in every source.
 - Could be made to work well enough in the (vast?) majority of the use
cases.

> So we probably need to understand specifically why the proposed APIs
would be thrown away.

As I have mentioned many times why that's the case:
1. This solution is not generic enough
2. I can see solutions that wouldn't require modification of every source
3. They would have zero overlap with the interfaces extension from this
FLIP

Best,
Piotrek

sob., 1 lip 2023 o 17:01 Dong Lin <lindon...@gmail.com> napisał(a):

> Hi Piotr,
>
> Thank you for providing further suggestions to help improve the API. Please
> see my comments inline.
>
> On Fri, Jun 30, 2023 at 10:35 PM Piotr Nowojski <pnowoj...@apache.org>
> wrote:
>
> > Hey,
> >
> > Sorry for a late reply, I was OoO for a week. I have three things to
> point
> > out.
> >
> > 1. ===============
> >
> > The updated proposal is indeed better, but to be honest I still don't
> like
> > it, for mostly the same reasons that I have mentioned earlier:
> > - only a partial solution, that doesn't address all use cases, so we
> would
> > need to throw it away sooner or later
> > - I don't see and it hasn't been discussed how to make it work out of the
> > box for all sources
> > - somehow complicating API for people implementing Sources
> > - it should work out of the box for most of the sources, or at least to
> > have that potential in the future
> >
>
> The moments above seem kind of "abstract". I am hoping to understand more
> technical details behind these comments so that we can see how to address
> the concern. For example, even if a FLP does not address all use-case
> (which is arguably true for every FLIP), its solution does not necessarily
> need to be thrown away later as long as it is extensible. So we probably
> need to understand specifically why the proposed APIs would be thrown away.
>
> Similarly, we would need to understand if there is a better design to make
> the API simpler and work out of the box etc. in order to decide how to
> address these comments.
>
>
> > On top of that:
> > - the FLIP I think is missing how to hook up SplitEnumeratorContext and
> > CheckpointCoordinator to pass "isProcessingBacklog"
> >
>
> I think it can be passed via the following function chain:
> - CheckpointCoordinator invokes
> OperatorCoordinatorCheckpointContext#isProcessingBacklog (via
> coordinatorsToCheckpoint) to get this information.
> - OperatorCoordinatorHolder implements
> OperatorCoordinatorCheckpointContext#isProcessingBacklog and returns
> OperatorCoordinator#isProcessingBacklog (via coordinator)
> - SourceCoordinator implements OperatorCoordinator#isProcessingBacklog and
> returns SourceCoordinatorContext#isProcessingBacklog
> - SourceCoordinatorContext will implement
> SplitEnumeratorContext#setIsProcessingBacklog and stores the given
> information in a variable.
>
> Note that it involves only internal API. We might be able to find a simpler
> solution with less functions on the path. As long as the above solution
> works without having any performance or correctness, I think maybe we
> should focus on the public API design and discuss the implementation in the
> PR review?
>
> - the FLIP suggests to use the long checkpointing interval as long as any
> > subtask is processing the backlog. Are you sure that's the right call?
> What
> > if other
> >   sources are producing fresh records, and those fresh records are
> reaching
> > sinks? It could happen either with disjoint JobGraph, embarrassing
> parallel
> >   JobGraph (no keyBy/unions/joins), or even with keyBy. Fresh records can
> > slip using a not backpressured input channel through generally
> > backpressured
> >   keyBy exchange. How should we handle that? This problem I think will
> > affect every solution, including my previously proposed generic one, but
> we
> > should
> >   discuss how to handle that as well.
> >
>
> Good question. Here is my plan to improve the solution in a follow-up FLIP:
>
> - Let every subtask of every source operator emit
> RecordAttributes(isBacklog=..)
> - Let every subtask of every operator handle the RecordAttributes received
> from inputs and emit RecordAttributes to downstream operators. Flink
> runtime can derive this information for every one-input operator. For an
> operator with two inputs, if one input has isBacklog=true and the other has
> isBacklog=false, the operator should determine the isBacklog for its output
> records based on its semantics.
> - If there exists a subtask of a two-phase commit operator with
> isBacklog=false, the operator should let JM know so that the JM will use
> the short checkpoint interval (for data freshness). Otherwise, JM will use
> the long checkpoint interval.
>
> The above solution guarantees that, if every two-input operator has
> explicitly specified their isBacklog based on the inputs' isBacklog, then
> the JM will use the short checkpoint interval if and only if it is useful
> for at least one subtask of one two-phase commit operator.
>
> Note that even the above solution might not be perfect. Suppose there
> exists one subtask of the two-phase commit operator has isBacklog=false,
> but every other subtasks of this operator has isBacklog=true, due to load
> imbalance. In this case, it might be beneficial to use the long checkpoint
> interval to improve the average data freshness for this operator. However,
> as we get into more edge case, the solution will become more complicated
> (e.g. providing more APIs for user to specify their intended strategy) and
> there will be less additional benefits (because these scenarios are less
> common).
>
> Also, note that we can support the solution described above without
> throwing away any public API currently proposed in FLIP-309. More
> specifically, we still
> need execution.checkpointing.interval-during-backlog. Sources such as
> HybridSource and MySQL CDC source can still use setIsProcessingBacklog() to
> specify the backlog status. We just need to update setIsProcessingBacklog()
> to emit RecordAttributes(isBacklog=..) upon invocation.
>
> I hope the above solution is reasonable and can address most of your
> concerns. And I hope we can use FLIP-309 to solve a large chunk of the
> existing problems in Flink 1.18 release and make further improvements in
> followup FLIPs. What do you think?
>
>
>
> > 2. ===============
> >
> > Regarding the current proposal, there might be a way to make it actually
> > somehow generic (but not pluggable). But it might require slightly
> > different
> > interfaces. We could keep the idea that SourceCoordinator/SplitEnumerator
> > is responsible for switching between slow/fast processing modes. It could
> > be
> > implemented to achieve something like in the FLIP-309 proposal, but apart
> > of that, the default behaviour would be a built in mechanism working like
> > this:
> > 1. Every SourceReaderBase checks its metrics and its state, to decide if
> it
> > considers itself as "processingBacklog" or "veryBackpressured". The base
> >     implementation could do it via a similar mechanism as I was proposing
> > previously, via looking at the busy/backPressuredTimeMsPerSecond,
> >     pendingRecords and processing rate.
> > 2. SourceReaderBase could send an event with
> > "processingBacklog"/"veryBackpressured" state.
> > 3. SourceCoordinator would collect those events, and decide what should
> it
> > do, whether it should switch whole source to the
> >     "processingBacklog"/"veryBackpressured" state or not.
> >
> That could provide eventually a generic solution that works fo every
> > source that reports the required metrics. Each source implementation
> could
> > decide
> > whether to use that default behaviour, or if maybe it's better to
> override
> > the default, or combine default with something custom (like
> HybridSource).
> >
> > And as a first step, we could implement that mechanism only on the
> > SourceCoordinator side, without events, without the default generic
> > solution and use
> > it in the HybridSource/MySQL CDC.
> >
> > This approach has some advantages compared to my previous proposal:
> >   + no need to tinker with metrics and pushing metrics from TMs to JM
> >   + somehow communicating this information via Events seems a bit cleaner
> > to me and avoids problems with freshness of the metrics
> > And some issues:
> >   - I don't know if it can be made pluggable in the future. If a user
> could
> > implement a custom `CheckpointTrigger` that would automatically work with
> > all/most
> >     of the pre-existing sources?
> >   - I don't know if it can be expanded if needed in the future, to make
> > decisions based on operators in the middle of a jobgraph.
> >
>
> Thanks for the proposal. Overall, I agree it is valuable to be able to
> determine the isProcessingBacklog based on the source reader metrics.
>
> I will probably suggest making the following changes upon your idea:
> - Instead of letting the source reader send events to the source
> coordinator, the source reader can emit RecordAttributes(isBacklog=..) as
> described earlier. We will let two-phase commit operator to decide whether
> they need the short checkpoint interval.
> - We consider isProcessingBacklog=true when watermarkLag is larger than a
> threshold.
>
> This is a nice addition. But I think we still need extra information from
> user (e.g. the threshold whether the watermarkLag or
> backPressuredTimeMsPerSecond is too high) with extra public APIs for this
> feature to work reliably. This is because there is no default algorithm
> that works in all cases without extra specification from users, due to the
> issues around the default algorithm we discussed previously.
>
> Overall, I think the current proposal in FLIP-309 is a first step towards
> addressing these problems. The API for source enumerator to explicitly set
> isProcessingBacklog based on its status is useful even if we can support
> metrics-based solutions.
>
> If that looks reasonable, can we agree to make incremental improvement and
> work on the metrics-based solution in a followup FLIP?
>
>
> >
> > 3. ===============
> >
> > Independent of that, during some brainstorming between me, Chesnay and
> > Stefan Richter, an idea popped up, that I think could be a counter
> proposal
> > as
> > an intermediate solution that probably effectively works the same way as
> > current FLIP-309.
> >
> > Inside a HybridSource, from it's SplitEnumerator#snapshotState method,
> can
> > not you throw an exception like
> > `new CheckpointException(TOO_MANY_CHECKPOINT_REQUESTS)` or `new
> > CheckpointException(TRIGGER_CHECKPOINT_FAILURE)`?
> > Actually we could also introduce a dedicated `CheckpointFailureReason`
> for
> > that purpose and handle it some special way in some places (like maybe
> hide
> > such rejected checkpoints from the REST API/WebUI). We could elaborate on
> > this a bit more, but after a brief thinking  I could see it actually
> > working well
> > enough without any public facing changes. But I might be wrong here.
> >
> > If this feature actually grabs traction, we could expand it to something
> > more sophisticated available via a public API in the future.
> >
>
> In the target use-case, user still want to do checkpoint (though at a
> larger interval) when there is backlog. And HybridSource need to know the
> expected checkpoint interval during backlog in order to determine whether
> it should keep throwing CheckpointException. Thus, we still need to add
> execution.checkpointing.interval-during-backlog for user to specify this
> information.
>
> It seems that the only benefit of this approach is to avoid
> adding SplitEnumeratorContext#setIsProcessingBacklog.
>
> The downside of this approach is that it is hard to enforce the
> semantics specified by execution.checkpointing.interval-during-backlog. For
> example, suppose execution.checkpointing.interval =3 minute and
> execution.checkpointing.interval-during-backlog = 7 minutes. During the
> backlog phase, checkpoint coordinator will still trigger the checkpoint
> once every 3 minutes. HybridSource will need to reject 2 out of the 3
> checkpoint invocation, and the effective checkpoint interval will be 9
> minutes.
>
> Overall, I think the solution is a bit hacky. I think it is preferred to
> throw exception only when there is indeed error. If we don't need to check
> a checkpoint, it is preferred to not trigger the checkpoint in the first
> place. And I think adding SplitEnumeratorContext#setIsProcessingBacklog is
> probably not that much of a big deal.
>
> Thanks for all the comments. I am looking forward to your thoughts.
>
> Best,
> Dong
>
>
> >
> > ===============
> >
> > Sorry for disturbing this FLIP discussion and voting.
> >
> > Best,
> > Piotrek
> >
> > czw., 29 cze 2023 o 05:08 feng xiangyu <xiangyu...@gmail.com>
> napisał(a):
> >
> > > Hi Dong,
> > >
> > > Thanks for your quick reply. I think this has truly solved our problem
> > and
> > > will enable us upgrade our existing jobs more seamless.
> > >
> > > Best,
> > > Xiangyu
> > >
> > > Dong Lin <lindon...@gmail.com> 于2023年6月29日周四 10:50写道:
> > >
> > > > Hi Feng,
> > > >
> > > > Thanks for the feedback. Yes, you can configure the
> > > > execution.checkpointing.interval-during-backlog to effectively
> disable
> > > > checkpoint during backlog.
> > > >
> > > > Prior to your comment, the FLIP allows users to do this by setting
> the
> > > > config value to something large (e.g. 365 day). After thinking about
> > this
> > > > more, we think it is more usable to allow users to achieve this goal
> by
> > > > setting the config value to 0. This is consistent with the existing
> > > > behavior of execution.checkpointing.interval -- the checkpoint is
> > > disabled
> > > > if user set execution.checkpointing.interval to 0.
> > > >
> > > > We have updated the description of
> > > > execution.checkpointing.interval-during-backlog
> > > > to say the following:
> > > > ... it is not null, the value must either be 0, which means the
> > > checkpoint
> > > > is disabled during backlog, or be larger than or equal to
> > > > execution.checkpointing.interval.
> > > >
> > > > Does this address your need?
> > > >
> > > > Best,
> > > > Dong
> > > >
> > > >
> > > >
> > > > On Thu, Jun 29, 2023 at 9:23 AM feng xiangyu <xiangyu...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Dong and Yunfeng,
> > > > >
> > > > > Thanks for the proposal, your flip sounds very useful from my
> > > > perspective.
> > > > > In our business, when we using hybrid source in production we also
> > met
> > > > the
> > > > > problem described in your flip.
> > > > > In our solution, we tend to skip making any checkpoints before all
> > > batch
> > > > > tasks have finished and resume the periodic checkpoint only in
> > > streaming
> > > > > phrase. Within this flip, we can solve our problem in a more
> generic
> > > way.
> > > > >
> > > > > However, I am wondering if we still want to skip making any
> > checkpoints
> > > > > during historical phrase, can we set this configuration
> > > > > "execution.checkpointing.interval-during-backlog" equals "-1" to
> > cover
> > > > this
> > > > > case?
> > > > >
> > > > > Best,
> > > > > Xiangyu
> > > > >
> > > > > Hang Ruan <ruanhang1...@gmail.com> 于2023年6月28日周三 16:30写道:
> > > > >
> > > > > > Thanks for Dong and Yunfeng's work.
> > > > > >
> > > > > > The FLIP looks good to me. This new version is clearer to
> > understand.
> > > > > >
> > > > > > Best,
> > > > > > Hang
> > > > > >
> > > > > > Dong Lin <lindon...@gmail.com> 于2023年6月27日周二 16:53写道:
> > > > > >
> > > > > > > Thanks Jack, Jingsong, and Zhu for the review!
> > > > > > >
> > > > > > > Thanks Zhu for the suggestion. I have updated the configuration
> > > name
> > > > as
> > > > > > > suggested.
> > > > > > >
> > > > > > > On Tue, Jun 27, 2023 at 4:45 PM Zhu Zhu <reed...@gmail.com>
> > wrote:
> > > > > > >
> > > > > > > > Thanks Dong and Yunfeng for creating this FLIP and driving
> this
> > > > > > > discussion.
> > > > > > > >
> > > > > > > > The new design looks generally good to me. Increasing the
> > > > checkpoint
> > > > > > > > interval when the job is processing backlogs is easier for
> > users
> > > to
> > > > > > > > understand and can help in more scenarios.
> > > > > > > >
> > > > > > > > I have one comment about the new configuration.
> > > > > > > > Naming the new configuration
> > > > > > > > "execution.checkpointing.interval-during-backlog" would be
> > better
> > > > > > > > according to Flink config naming convention.
> > > > > > > > It is also because that nested config keys should be avoided.
> > See
> > > > > > > > FLINK-29372 for more details.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Zhu
> > > > > > > >
> > > > > > > > Jingsong Li <jingsongl...@gmail.com> 于2023年6月27日周二 15:45写道:
> > > > > > > > >
> > > > > > > > > Looks good to me!
> > > > > > > > >
> > > > > > > > > Thanks Dong, Yunfeng and all for your discussion and
> design.
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Jingsong
> > > > > > > > >
> > > > > > > > > On Tue, Jun 27, 2023 at 3:35 PM Jark Wu <imj...@gmail.com>
> > > > wrote:
> > > > > > > > > >
> > > > > > > > > > Thank you Dong for driving this FLIP.
> > > > > > > > > >
> > > > > > > > > > The new design looks good to me!
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Jark
> > > > > > > > > >
> > > > > > > > > > > 2023年6月27日 14:38,Dong Lin <lindon...@gmail.com> 写道:
> > > > > > > > > > >
> > > > > > > > > > > Thank you Leonard for the review!
> > > > > > > > > > >
> > > > > > > > > > > Hi Piotr, do you have any comments on the latest
> > proposal?
> > > > > > > > > > >
> > > > > > > > > > > I am wondering if it is OK to start the voting thread
> > this
> > > > > week.
> > > > > > > > > > >
> > > > > > > > > > > On Mon, Jun 26, 2023 at 4:10 PM Leonard Xu <
> > > > xbjt...@gmail.com>
> > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > >> Thanks Dong for driving this FLIP forward!
> > > > > > > > > > >>
> > > > > > > > > > >> Introducing  `backlog status` concept for flink job
> > makes
> > > > > sense
> > > > > > to
> > > > > > > > me as
> > > > > > > > > > >> following reasons:
> > > > > > > > > > >>
> > > > > > > > > > >> From concept/API design perspective, it’s more general
> > and
> > > > > > natural
> > > > > > > > than
> > > > > > > > > > >> above proposals as it can be used in HybridSource for
> > > > bounded
> > > > > > > > records, CDC
> > > > > > > > > > >> Source for history snapshot and general sources like
> > > > > KafkaSource
> > > > > > > for
> > > > > > > > > > >> historical messages.
> > > > > > > > > > >>
> > > > > > > > > > >> From user cases/requirements, I’ve seen many users
> > > manually
> > > > to
> > > > > > set
> > > > > > > > larger
> > > > > > > > > > >> checkpoint interval during backfilling and then set a
> > > > shorter
> > > > > > > > checkpoint
> > > > > > > > > > >> interval for real-time processing in their production
> > > > > > environments
> > > > > > > > as a
> > > > > > > > > > >> flink application optimization. Now, the flink
> framework
> > > can
> > > > > > make
> > > > > > > > this
> > > > > > > > > > >> optimization no longer require the user to set the
> > > > checkpoint
> > > > > > > > interval and
> > > > > > > > > > >> restart the job multiple times.
> > > > > > > > > > >>
> > > > > > > > > > >> Following supporting using larger checkpoint for job
> > under
> > > > > > backlog
> > > > > > > > status
> > > > > > > > > > >> in current FLIP, we can explore supporting larger
> > > > > > > > parallelism/memory/cpu
> > > > > > > > > > >> for job under backlog status in the future.
> > > > > > > > > > >>
> > > > > > > > > > >> In short, the updated FLIP looks good to me.
> > > > > > > > > > >>
> > > > > > > > > > >>
> > > > > > > > > > >> Best,
> > > > > > > > > > >> Leonard
> > > > > > > > > > >>
> > > > > > > > > > >>
> > > > > > > > > > >>> On Jun 22, 2023, at 12:07 PM, Dong Lin <
> > > > lindon...@gmail.com>
> > > > > > > > wrote:
> > > > > > > > > > >>>
> > > > > > > > > > >>> Hi Piotr,
> > > > > > > > > > >>>
> > > > > > > > > > >>> Thanks again for proposing the isProcessingBacklog
> > > concept.
> > > > > > > > > > >>>
> > > > > > > > > > >>> After discussing with Becket Qin and thinking about
> > this
> > > > > more,
> > > > > > I
> > > > > > > > agree it
> > > > > > > > > > >>> is a better idea to add a top-level concept to all
> > source
> > > > > > > > operators to
> > > > > > > > > > >>> address the target use-case.
> > > > > > > > > > >>>
> > > > > > > > > > >>> The main reason that changed my mind is that
> > > > > > isProcessingBacklog
> > > > > > > > can be
> > > > > > > > > > >>> described as an inherent/nature attribute of every
> > source
> > > > > > > instance
> > > > > > > > and
> > > > > > > > > > >> its
> > > > > > > > > > >>> semantics does not need to depend on any specific
> > > > > checkpointing
> > > > > > > > policy.
> > > > > > > > > > >>> Also, we can hardcode the isProcessingBacklog
> behavior
> > > for
> > > > > the
> > > > > > > > sources we
> > > > > > > > > > >>> have considered so far (e.g. HybridSource and MySQL
> CDC
> > > > > source)
> > > > > > > > without
> > > > > > > > > > >>> asking users to explicitly configure the per-source
> > > > behavior,
> > > > > > > which
> > > > > > > > > > >> indeed
> > > > > > > > > > >>> provides better user experience.
> > > > > > > > > > >>>
> > > > > > > > > > >>> I have updated the FLIP based on the latest
> > suggestions.
> > > > The
> > > > > > > > latest FLIP
> > > > > > > > > > >> no
> > > > > > > > > > >>> longer introduces per-source config that can be used
> by
> > > > > > > end-users.
> > > > > > > > While
> > > > > > > > > > >> I
> > > > > > > > > > >>> agree with you that CheckpointTrigger can be a useful
> > > > feature
> > > > > > to
> > > > > > > > address
> > > > > > > > > > >>> additional use-cases, I am not sure it is necessary
> for
> > > the
> > > > > > > > use-case
> > > > > > > > > > >>> targeted by FLIP-309. Maybe we can introduce
> > > > > CheckpointTrigger
> > > > > > > > separately
> > > > > > > > > > >>> in another FLIP?
> > > > > > > > > > >>>
> > > > > > > > > > >>> Can you help take another look at the updated FLIP?
> > > > > > > > > > >>>
> > > > > > > > > > >>> Best,
> > > > > > > > > > >>> Dong
> > > > > > > > > > >>>
> > > > > > > > > > >>>
> > > > > > > > > > >>>
> > > > > > > > > > >>> On Fri, Jun 16, 2023 at 11:59 PM Piotr Nowojski <
> > > > > > > > pnowoj...@apache.org>
> > > > > > > > > > >>> wrote:
> > > > > > > > > > >>>
> > > > > > > > > > >>>> Hi Dong,
> > > > > > > > > > >>>>
> > > > > > > > > > >>>>> Suppose there are 1000 subtask and each subtask has
> > 1%
> > > > > chance
> > > > > > > of
> > > > > > > > being
> > > > > > > > > > >>>>> "backpressured" at a given time (due to random
> > traffic
> > > > > > spikes).
> > > > > > > > Then at
> > > > > > > > > > >>>> any
> > > > > > > > > > >>>>> given time, the chance of the job
> > > > > > > > > > >>>>> being considered not-backpressured = (1-0.01)^1000.
> > > Since
> > > > > we
> > > > > > > > evaluate
> > > > > > > > > > >> the
> > > > > > > > > > >>>>> backpressure metric once a second, the estimated
> time
> > > for
> > > > > the
> > > > > > > job
> > > > > > > > > > >>>>> to be considered not-backpressured is roughly 1 /
> > > > > > > > ((1-0.01)^1000) =
> > > > > > > > > > >> 23163
> > > > > > > > > > >>>>> sec = 6.4 hours.
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> This means that the job will effectively always use
> > the
> > > > > > longer
> > > > > > > > > > >>>>> checkpointing interval. It looks like a real
> concern,
> > > > > right?
> > > > > > > > > > >>>>
> > > > > > > > > > >>>> Sorry I don't understand where you are getting those
> > > > numbers
> > > > > > > from.
> > > > > > > > > > >>>> Instead of trying to find loophole after loophole,
> > could
> > > > you
> > > > > > try
> > > > > > > > to
> > > > > > > > > > >> think
> > > > > > > > > > >>>> how a given loophole could be improved/solved?
> > > > > > > > > > >>>>
> > > > > > > > > > >>>>> Hmm... I honestly think it will be useful to know
> the
> > > > APIs
> > > > > > due
> > > > > > > > to the
> > > > > > > > > > >>>>> following reasons.
> > > > > > > > > > >>>>
> > > > > > > > > > >>>> Please propose something. I don't think it's needed.
> > > > > > > > > > >>>>
> > > > > > > > > > >>>>> - For the use-case mentioned in FLIP-309 motivation
> > > > > section,
> > > > > > > > would the
> > > > > > > > > > >>>> APIs
> > > > > > > > > > >>>>> of this alternative approach be more or less
> usable?
> > > > > > > > > > >>>>
> > > > > > > > > > >>>> Everything that you originally wanted to achieve in
> > > > > FLIP-309,
> > > > > > > you
> > > > > > > > could
> > > > > > > > > > >> do
> > > > > > > > > > >>>> as well in my proposal.
> > > > > > > > > > >>>> Vide my many mentions of the "hacky solution".
> > > > > > > > > > >>>>
> > > > > > > > > > >>>>> - Can these APIs reliably address the extra
> use-case
> > > > (e.g.
> > > > > > > allow
> > > > > > > > > > >>>>> checkpointing interval to change dynamically even
> > > during
> > > > > the
> > > > > > > > unbounded
> > > > > > > > > > >>>>> phase) as it claims?
> > > > > > > > > > >>>>
> > > > > > > > > > >>>> I don't see why not.
> > > > > > > > > > >>>>
> > > > > > > > > > >>>>> - Can these APIs be decoupled from the APIs
> currently
> > > > > > proposed
> > > > > > > in
> > > > > > > > > > >>>> FLIP-309?
> > > > > > > > > > >>>>
> > > > > > > > > > >>>> Yes
> > > > > > > > > > >>>>
> > > > > > > > > > >>>>> For example, if the APIs of this alternative
> approach
> > > can
> > > > > be
> > > > > > > > decoupled
> > > > > > > > > > >>>> from
> > > > > > > > > > >>>>> the APIs currently proposed in FLIP-309, then it
> > might
> > > be
> > > > > > > > reasonable to
> > > > > > > > > > >>>>> work on this extra use-case with a more
> > > > > advanced/complicated
> > > > > > > > design
> > > > > > > > > > >>>>> separately in a followup work.
> > > > > > > > > > >>>>
> > > > > > > > > > >>>> As I voiced my concerns previously, the current
> design
> > > of
> > > > > > > > FLIP-309 would
> > > > > > > > > > >>>> clog the public API and in the long run confuse the
> > > users.
> > > > > IMO
> > > > > > > > It's
> > > > > > > > > > >>>> addressing the
> > > > > > > > > > >>>> problem in the wrong place.
> > > > > > > > > > >>>>
> > > > > > > > > > >>>>> Hmm.. do you mean we can do the following:
> > > > > > > > > > >>>>> - Have all source operators emit a metric named
> > > > > > > > "processingBacklog".
> > > > > > > > > > >>>>> - Add a job-level config that specifies "the
> > > > checkpointing
> > > > > > > > interval to
> > > > > > > > > > >> be
> > > > > > > > > > >>>>> used when any source is processing backlog".
> > > > > > > > > > >>>>> - The JM collects the "processingBacklog"
> > periodically
> > > > from
> > > > > > all
> > > > > > > > source
> > > > > > > > > > >>>>> operators and uses the newly added config value as
> > > > > > appropriate.
> > > > > > > > > > >>>>
> > > > > > > > > > >>>> Yes.
> > > > > > > > > > >>>>
> > > > > > > > > > >>>>> The challenge with this approach is that we need to
> > > > define
> > > > > > the
> > > > > > > > > > >> semantics
> > > > > > > > > > >>>> of
> > > > > > > > > > >>>>> this "processingBacklog" metric and have all source
> > > > > operators
> > > > > > > > > > >>>>> implement this metric. I am not sure we are able to
> > do
> > > > this
> > > > > > yet
> > > > > > > > without
> > > > > > > > > > >>>>> having users explicitly provide this information
> on a
> > > > > > > per-source
> > > > > > > > basis.
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> Suppose the job read from a bounded Kafka source,
> > > should
> > > > it
> > > > > > > emit
> > > > > > > > > > >>>>> "processingBacklog=true"? If yes, then the job
> might
> > > use
> > > > > long
> > > > > > > > > > >>>> checkpointing
> > > > > > > > > > >>>>> interval even
> > > > > > > > > > >>>>> if the job is asked to process data starting from
> now
> > > to
> > > > > the
> > > > > > > > next 1
> > > > > > > > > > >> hour.
> > > > > > > > > > >>>>> If no, then the job might use the short
> checkpointing
> > > > > > interval
> > > > > > > > > > >>>>> even if the job is asked to re-process data
> starting
> > > > from 7
> > > > > > > days
> > > > > > > > ago.
> > > > > > > > > > >>>>
> > > > > > > > > > >>>> Yes. The same can be said of your proposal. Your
> > > proposal
> > > > > has
> > > > > > > the
> > > > > > > > very
> > > > > > > > > > >> same
> > > > > > > > > > >>>> issues
> > > > > > > > > > >>>> that every source would have to implement it
> > > differently,
> > > > > most
> > > > > > > > sources
> > > > > > > > > > >>>> would
> > > > > > > > > > >>>> have no idea how to properly calculate the new
> > requested
> > > > > > > > checkpoint
> > > > > > > > > > >>>> interval,
> > > > > > > > > > >>>> for those that do know how to do that, user would
> have
> > > to
> > > > > > > > configure
> > > > > > > > > > >> every
> > > > > > > > > > >>>> source
> > > > > > > > > > >>>> individually and yet again we would end up with a
> > > system,
> > > > > that
> > > > > > > > works
> > > > > > > > > > >> only
> > > > > > > > > > >>>> partially in
> > > > > > > > > > >>>> some special use cases (HybridSource), that's
> > confusing
> > > > the
> > > > > > > users
> > > > > > > > even
> > > > > > > > > > >>>> more.
> > > > > > > > > > >>>>
> > > > > > > > > > >>>> That's why I think the more generic solution,
> working
> > > > > > primarily
> > > > > > > > on the
> > > > > > > > > > >> same
> > > > > > > > > > >>>> metrics that are used by various auto scaling
> > solutions
> > > > > (like
> > > > > > > > Flink K8s
> > > > > > > > > > >>>> operator's
> > > > > > > > > > >>>> autosaler) would be better. The hacky solution I
> > > proposed
> > > > > to:
> > > > > > > > > > >>>> 1. show you that the generic solution is simply a
> > > superset
> > > > > of
> > > > > > > your
> > > > > > > > > > >> proposal
> > > > > > > > > > >>>> 2. if you are adamant that
> > > busyness/backpressured/records
> > > > > > > > processing
> > > > > > > > > > >>>> rate/pending records
> > > > > > > > > > >>>>   metrics wouldn't cover your use case sufficiently
> > (imo
> > > > > they
> > > > > > > > can),
> > > > > > > > > > >> then
> > > > > > > > > > >>>> you can very easily
> > > > > > > > > > >>>>   enhance this algorithm with using some hints from
> > the
> > > > > > sources.
> > > > > > > > Like
> > > > > > > > > > >>>> "processingBacklog==true"
> > > > > > > > > > >>>>   to short circuit the main algorithm, if
> > > > > `processingBacklog`
> > > > > > is
> > > > > > > > > > >>>> available.
> > > > > > > > > > >>>>
> > > > > > > > > > >>>> Best,
> > > > > > > > > > >>>> Piotrek
> > > > > > > > > > >>>>
> > > > > > > > > > >>>>
> > > > > > > > > > >>>> pt., 16 cze 2023 o 04:45 Dong Lin <
> > lindon...@gmail.com>
> > > > > > > > napisał(a):
> > > > > > > > > > >>>>
> > > > > > > > > > >>>>> Hi again Piotr,
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> Thank you for the reply. Please see my reply
> inline.
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> On Fri, Jun 16, 2023 at 12:11 AM Piotr Nowojski <
> > > > > > > > > > >>>> piotr.nowoj...@gmail.com>
> > > > > > > > > > >>>>> wrote:
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>>> Hi again Dong,
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>>> I understand that JM will get the
> > > backpressure-related
> > > > > > > metrics
> > > > > > > > every
> > > > > > > > > > >>>>> time
> > > > > > > > > > >>>>>>> the RestServerEndpoint receives the REST request
> to
> > > get
> > > > > > these
> > > > > > > > > > >>>> metrics.
> > > > > > > > > > >>>>>> But
> > > > > > > > > > >>>>>>> I am not sure if RestServerEndpoint is already
> > always
> > > > > > > > receiving the
> > > > > > > > > > >>>>> REST
> > > > > > > > > > >>>>>>> metrics at regular interval (suppose there is no
> > > human
> > > > > > > manually
> > > > > > > > > > >>>>>>> opening/clicking the Flink Web UI). And if it
> does,
> > > > what
> > > > > is
> > > > > > > the
> > > > > > > > > > >>>>> interval?
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>> Good catch, I've thought that metrics are
> > > pre-emptively
> > > > > sent
> > > > > > > to
> > > > > > > > JM
> > > > > > > > > > >>>> every
> > > > > > > > > > >>>>> 10
> > > > > > > > > > >>>>>> seconds.
> > > > > > > > > > >>>>>> Indeed that's not the case at the moment, and that
> > > would
> > > > > > have
> > > > > > > > to be
> > > > > > > > > > >>>>>> improved.
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>>> I would be surprised if Flink is already paying
> > this
> > > > much
> > > > > > > > overhead
> > > > > > > > > > >>>> just
> > > > > > > > > > >>>>>> for
> > > > > > > > > > >>>>>>> metrics monitoring. That is the main reason I
> still
> > > > doubt
> > > > > > it
> > > > > > > > is true.
> > > > > > > > > > >>>>> Can
> > > > > > > > > > >>>>>>> you show where this 100 ms is currently
> configured?
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> Alternatively, maybe you mean that we should add
> > > extra
> > > > > code
> > > > > > > to
> > > > > > > > invoke
> > > > > > > > > > >>>>> the
> > > > > > > > > > >>>>>>> REST API at 100 ms interval. Then that means we
> > need
> > > to
> > > > > > > > considerably
> > > > > > > > > > >>>>>>> increase the network/cpu overhead at JM, where
> the
> > > > > overhead
> > > > > > > > will
> > > > > > > > > > >>>>> increase
> > > > > > > > > > >>>>>>> as the number of TM/slots increase, which may
> pose
> > > risk
> > > > > to
> > > > > > > the
> > > > > > > > > > >>>>>> scalability
> > > > > > > > > > >>>>>>> of the proposed design. I am not sure we should
> do
> > > > this.
> > > > > > What
> > > > > > > > do you
> > > > > > > > > > >>>>>> think?
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>> Sorry. I didn't mean metric should be reported
> every
> > > > > 100ms.
> > > > > > I
> > > > > > > > meant
> > > > > > > > > > >>>> that
> > > > > > > > > > >>>>>> "backPressuredTimeMsPerSecond (metric) would
> report
> > (a
> > > > > value
> > > > > > > of)
> > > > > > > > > > >>>>> 100ms/s."
> > > > > > > > > > >>>>>> once per metric interval (10s?).
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> Suppose there are 1000 subtask and each subtask has
> > 1%
> > > > > chance
> > > > > > > of
> > > > > > > > being
> > > > > > > > > > >>>>> "backpressured" at a given time (due to random
> > traffic
> > > > > > spikes).
> > > > > > > > Then at
> > > > > > > > > > >>>> any
> > > > > > > > > > >>>>> given time, the chance of the job
> > > > > > > > > > >>>>> being considered not-backpressured = (1-0.01)^1000.
> > > Since
> > > > > we
> > > > > > > > evaluate
> > > > > > > > > > >> the
> > > > > > > > > > >>>>> backpressure metric once a second, the estimated
> time
> > > for
> > > > > the
> > > > > > > job
> > > > > > > > > > >>>>> to be considered not-backpressured is roughly 1 /
> > > > > > > > ((1-0.01)^1000) =
> > > > > > > > > > >> 23163
> > > > > > > > > > >>>>> sec = 6.4 hours.
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> This means that the job will effectively always use
> > the
> > > > > > longer
> > > > > > > > > > >>>>> checkpointing interval. It looks like a real
> concern,
> > > > > right?
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>>>> - What is the interface of this
> CheckpointTrigger?
> > > For
> > > > > > > > example, are
> > > > > > > > > > >>>> we
> > > > > > > > > > >>>>>>> going to give CheckpointTrigger a context that it
> > can
> > > > use
> > > > > > to
> > > > > > > > fetch
> > > > > > > > > > >>>>>>> arbitrary metric values? This can help us
> > understand
> > > > what
> > > > > > > > information
> > > > > > > > > > >>>>>> this
> > > > > > > > > > >>>>>>> user-defined CheckpointTrigger can use to make
> the
> > > > > > checkpoint
> > > > > > > > > > >>>> decision.
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>> I honestly don't think this is important at this
> > stage
> > > > of
> > > > > > the
> > > > > > > > > > >>>> discussion.
> > > > > > > > > > >>>>>> It could have
> > > > > > > > > > >>>>>> whatever interface we would deem to be best.
> > Required
> > > > > > things:
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>> - access to at least a subset of metrics that the
> > > given
> > > > > > > > > > >>>>> `CheckpointTrigger`
> > > > > > > > > > >>>>>> requests,
> > > > > > > > > > >>>>>> for example via some registration mechanism, so we
> > > don't
> > > > > > have
> > > > > > > to
> > > > > > > > > > >>>> fetch
> > > > > > > > > > >>>>>> all of the
> > > > > > > > > > >>>>>> metrics all the time from TMs.
> > > > > > > > > > >>>>>> - some way to influence `CheckpointCoordinator`.
> > > Either
> > > > > via
> > > > > > > > manually
> > > > > > > > > > >>>>>> triggering
> > > > > > > > > > >>>>>> checkpoints, and/or ability to change the
> > > checkpointing
> > > > > > > > interval.
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> Hmm... I honestly think it will be useful to know
> the
> > > > APIs
> > > > > > due
> > > > > > > > to the
> > > > > > > > > > >>>>> following reasons.
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> We would need to know the concrete APIs to gauge
> the
> > > > > > following:
> > > > > > > > > > >>>>> - For the use-case mentioned in FLIP-309 motivation
> > > > > section,
> > > > > > > > would the
> > > > > > > > > > >>>> APIs
> > > > > > > > > > >>>>> of this alternative approach be more or less
> usable?
> > > > > > > > > > >>>>> - Can these APIs reliably address the extra
> use-case
> > > > (e.g.
> > > > > > > allow
> > > > > > > > > > >>>>> checkpointing interval to change dynamically even
> > > during
> > > > > the
> > > > > > > > unbounded
> > > > > > > > > > >>>>> phase) as it claims?
> > > > > > > > > > >>>>> - Can these APIs be decoupled from the APIs
> currently
> > > > > > proposed
> > > > > > > in
> > > > > > > > > > >>>> FLIP-309?
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> For example, if the APIs of this alternative
> approach
> > > can
> > > > > be
> > > > > > > > decoupled
> > > > > > > > > > >>>> from
> > > > > > > > > > >>>>> the APIs currently proposed in FLIP-309, then it
> > might
> > > be
> > > > > > > > reasonable to
> > > > > > > > > > >>>>> work on this extra use-case with a more
> > > > > advanced/complicated
> > > > > > > > design
> > > > > > > > > > >>>>> separately in a followup work.
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>>>> - Where is this CheckpointTrigger running? For
> > > example,
> > > > > is
> > > > > > it
> > > > > > > > going
> > > > > > > > > > >>>> to
> > > > > > > > > > >>>>>> run
> > > > > > > > > > >>>>>>> on the subtask of every source operator? Or is it
> > > going
> > > > > to
> > > > > > > run
> > > > > > > > on the
> > > > > > > > > > >>>>> JM?
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>> IMO on the JM.
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>>> - Are we going to provide a default
> implementation
> > of
> > > > > this
> > > > > > > > > > >>>>>>> CheckpointTrigger in Flink that implements the
> > > > algorithm
> > > > > > > > described
> > > > > > > > > > >>>>> below,
> > > > > > > > > > >>>>>>> or do we expect each source operator developer to
> > > > > implement
> > > > > > > > their own
> > > > > > > > > > >>>>>>> CheckpointTrigger?
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>> As I mentioned before, I think we should provide
> at
> > > the
> > > > > very
> > > > > > > > least the
> > > > > > > > > > >>>>>> implementation
> > > > > > > > > > >>>>>> that replaces the current triggering mechanism
> > > > (statically
> > > > > > > > configured
> > > > > > > > > > >>>>>> checkpointing interval)
> > > > > > > > > > >>>>>> and it would be great to provide the backpressure
> > > > > monitoring
> > > > > > > > trigger
> > > > > > > > > > >> as
> > > > > > > > > > >>>>>> well.
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> I agree that if there is a good use-case that can
> be
> > > > > > addressed
> > > > > > > > by the
> > > > > > > > > > >>>>> proposed CheckpointTrigger, then it is reasonable
> > > > > > > > > > >>>>> to add CheckpointTrigger and replace the current
> > > > triggering
> > > > > > > > mechanism
> > > > > > > > > > >>>> with
> > > > > > > > > > >>>>> it.
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> I also agree that we will likely find such a
> > use-case.
> > > > For
> > > > > > > > example,
> > > > > > > > > > >>>> suppose
> > > > > > > > > > >>>>> the source records have event timestamps, then it
> is
> > > > likely
> > > > > > > > > > >>>>> that we can use the trigger to dynamically control
> > the
> > > > > > > > checkpointing
> > > > > > > > > > >>>>> interval based on the difference between the
> > watermark
> > > > and
> > > > > > > > current
> > > > > > > > > > >> system
> > > > > > > > > > >>>>> time.
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> But I am not sure the addition of this
> > > CheckpointTrigger
> > > > > > should
> > > > > > > > be
> > > > > > > > > > >>>> coupled
> > > > > > > > > > >>>>> with FLIP-309. Whether or not it is coupled
> probably
> > > > > depends
> > > > > > on
> > > > > > > > the
> > > > > > > > > > >>>>> concrete API design around CheckpointTrigger.
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> If you would be adamant that the backpressure
> > > monitoring
> > > > > > > doesn't
> > > > > > > > cover
> > > > > > > > > > >>>> well
> > > > > > > > > > >>>>>> enough your use case, I would be ok to provide the
> > > hacky
> > > > > > > > version that
> > > > > > > > > > >> I
> > > > > > > > > > >>>>>> also mentioned
> > > > > > > > > > >>>>>> before:
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>>> """
> > > > > > > > > > >>>>>> Especially that if my proposed algorithm wouldn't
> > work
> > > > > good
> > > > > > > > enough,
> > > > > > > > > > >>>> there
> > > > > > > > > > >>>>>> is
> > > > > > > > > > >>>>>> an obvious solution, that any source could add a
> > > metric,
> > > > > > like
> > > > > > > > let say
> > > > > > > > > > >>>>>> "processingBacklog: true/false", and the
> > > > > `CheckpointTrigger`
> > > > > > > > > > >>>>>> could use this as an override to always switch to
> > the
> > > > > > > > > > >>>>>> "slowCheckpointInterval". I don't think we need
> it,
> > > but
> > > > > > that's
> > > > > > > > always
> > > > > > > > > > >>>> an
> > > > > > > > > > >>>>>> option
> > > > > > > > > > >>>>>> that would be basically equivalent to your
> original
> > > > > > proposal.
> > > > > > > > > > >>>>>> """
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> Hmm.. do you mean we can do the following:
> > > > > > > > > > >>>>> - Have all source operators emit a metric named
> > > > > > > > "processingBacklog".
> > > > > > > > > > >>>>> - Add a job-level config that specifies "the
> > > > checkpointing
> > > > > > > > interval to
> > > > > > > > > > >> be
> > > > > > > > > > >>>>> used when any source is processing backlog".
> > > > > > > > > > >>>>> - The JM collects the "processingBacklog"
> > periodically
> > > > from
> > > > > > all
> > > > > > > > source
> > > > > > > > > > >>>>> operators and uses the newly added config value as
> > > > > > appropriate.
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> The challenge with this approach is that we need to
> > > > define
> > > > > > the
> > > > > > > > > > >> semantics
> > > > > > > > > > >>>> of
> > > > > > > > > > >>>>> this "processingBacklog" metric and have all source
> > > > > operators
> > > > > > > > > > >>>>> implement this metric. I am not sure we are able to
> > do
> > > > this
> > > > > > yet
> > > > > > > > without
> > > > > > > > > > >>>>> having users explicitly provide this information
> on a
> > > > > > > per-source
> > > > > > > > basis.
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> Suppose the job read from a bounded Kafka source,
> > > should
> > > > it
> > > > > > > emit
> > > > > > > > > > >>>>> "processingBacklog=true"? If yes, then the job
> might
> > > use
> > > > > long
> > > > > > > > > > >>>> checkpointing
> > > > > > > > > > >>>>> interval even
> > > > > > > > > > >>>>> if the job is asked to process data starting from
> now
> > > to
> > > > > the
> > > > > > > > next 1
> > > > > > > > > > >> hour.
> > > > > > > > > > >>>>> If no, then the job might use the short
> checkpointing
> > > > > > interval
> > > > > > > > > > >>>>> even if the job is asked to re-process data
> starting
> > > > from 7
> > > > > > > days
> > > > > > > > ago.
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>>> - How can users specify the
> > > > > > > > > > >>>>>> fastCheckpointInterval/slowCheckpointInterval?
> > > > > > > > > > >>>>>>> For example, will we provide APIs on the
> > > > > CheckpointTrigger
> > > > > > > that
> > > > > > > > > > >>>>> end-users
> > > > > > > > > > >>>>>>> can use to specify the checkpointing interval?
> What
> > > > would
> > > > > > > that
> > > > > > > > look
> > > > > > > > > > >>>>> like?
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>> Also as I mentioned before, just like metric
> > reporters
> > > > are
> > > > > > > > configured:
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>
> > > > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/metric_reporters/
> > > > > > > > > > >>>>>> Every CheckpointTrigger could have its own custom
> > > > > > > configuration.
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>>> Overall, my gut feel is that the alternative
> > approach
> > > > > based
> > > > > > > on
> > > > > > > > > > >>>>>>> CheckpointTrigger is more complicated
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>> Yes, as usual, more generic things are more
> > > complicated,
> > > > > but
> > > > > > > > often
> > > > > > > > > > >> more
> > > > > > > > > > >>>>>> useful in the long run.
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>>> and harder to use.
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>> I don't agree. Why setting in config
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>> execution.checkpointing.trigger:
> > > > > > > > > > >>>> BackPressureMonitoringCheckpointTrigger
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>
> > > > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> execution.checkpointing.BackPressureMonitoringCheckpointTrigger.fast-interval:
> > > > > > > > > > >>>>>> 1s
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>
> > > > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> execution.checkpointing.BackPressureMonitoringCheckpointTrigger.slow-interval:
> > > > > > > > > > >>>>>> 30s
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>> that we could even provide a shortcut to the above
> > > > > construct
> > > > > > > > via:
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>> execution.checkpointing.fast-interval: 1s
> > > > > > > > > > >>>>>> execution.checkpointing.slow-interval: 30s
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>> is harder compared to setting two/three checkpoint
> > > > > > intervals,
> > > > > > > > one in
> > > > > > > > > > >>>> the
> > > > > > > > > > >>>>>> config/or via `env.enableCheckpointing(x)`,
> > > > > > > > > > >>>>>> secondly passing one/two (fast/slow) values on the
> > > > source
> > > > > > > > itself?
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> If we can address the use-case by providing just
> the
> > > two
> > > > > > > > job-level
> > > > > > > > > > >> config
> > > > > > > > > > >>>>> as described above, I agree it will indeed be
> > simpler.
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> I have tried to achieve this goal. But the caveat
> is
> > > that
> > > > > it
> > > > > > > > requires
> > > > > > > > > > >>>> much
> > > > > > > > > > >>>>> more work than described above in order to give the
> > > > configs
> > > > > > > > > > >> well-defined
> > > > > > > > > > >>>>> semantics. So I find it simpler to just use the
> > > approach
> > > > in
> > > > > > > > FLIP-309.
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> Let me explain my concern below. It will be great
> if
> > > you
> > > > or
> > > > > > > > someone
> > > > > > > > > > >> else
> > > > > > > > > > >>>>> can help provide a solution.
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> 1) We need to clearly document when the
> fast-interval
> > > and
> > > > > > > > slow-interval
> > > > > > > > > > >>>>> will be used so that users can derive the expected
> > > > behavior
> > > > > > of
> > > > > > > > the job
> > > > > > > > > > >>>> and
> > > > > > > > > > >>>>> be able to config these values.
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> 2) The trigger of fast/slow interval depends on the
> > > > > behavior
> > > > > > of
> > > > > > > > the
> > > > > > > > > > >>>> source
> > > > > > > > > > >>>>> (e.g. MySQL CDC, HybridSource). However, no
> existing
> > > > > concepts
> > > > > > > of
> > > > > > > > source
> > > > > > > > > > >>>>> operator (e.g. boundedness) can describe the target
> > > > > behavior.
> > > > > > > For
> > > > > > > > > > >>>> example,
> > > > > > > > > > >>>>> MySQL CDC internally has two phases, namely
> snapshot
> > > > phase
> > > > > > and
> > > > > > > > binlog
> > > > > > > > > > >>>>> phase, which are not explicitly exposed to its
> users
> > > via
> > > > > > source
> > > > > > > > > > >> operator
> > > > > > > > > > >>>>> API. And we probably should not enumerate all
> > internal
> > > > > phases
> > > > > > > of
> > > > > > > > all
> > > > > > > > > > >>>> source
> > > > > > > > > > >>>>> operators that are affected by fast/slow interval.
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> 3) An alternative approach might be to define a new
> > > > concept
> > > > > > > (e.g.
> > > > > > > > > > >>>>> processingBacklog) that is applied to all source
> > > > operators.
> > > > > > > Then
> > > > > > > > the
> > > > > > > > > > >>>>> fast/slow interval's documentation can depend on
> this
> > > > > > concept.
> > > > > > > > That
> > > > > > > > > > >> means
> > > > > > > > > > >>>>> we have to add a top-level concept (similar to
> source
> > > > > > > > boundedness) and
> > > > > > > > > > >>>>> require all source operators to specify how they
> > > enforce
> > > > > this
> > > > > > > > concept
> > > > > > > > > > >>>> (e.g.
> > > > > > > > > > >>>>> FileSystemSource always emits
> > processingBacklog=true).
> > > > And
> > > > > > > there
> > > > > > > > might
> > > > > > > > > > >> be
> > > > > > > > > > >>>>> cases where the source itself (e.g. a bounded Kafka
> > > > Source)
> > > > > > can
> > > > > > > > not
> > > > > > > > > > >>>>> automatically derive the value of this concept, in
> > > which
> > > > > case
> > > > > > > we
> > > > > > > > need
> > > > > > > > > > >> to
> > > > > > > > > > >>>>> provide option for users to explicitly specify the
> > > value
> > > > > for
> > > > > > > this
> > > > > > > > > > >> concept
> > > > > > > > > > >>>>> on a per-source basis.
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>>>> And it probably also has the issues of "having
> two
> > > > places
> > > > > > to
> > > > > > > > > > >>>> configure
> > > > > > > > > > >>>>>> checkpointing
> > > > > > > > > > >>>>>>> interval" and "giving flexibility for every
> source
> > to
> > > > > > > > implement a
> > > > > > > > > > >>>>>> different
> > > > > > > > > > >>>>>>> API" (as mentioned below).
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>> No, it doesn't.
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>>> IMO, it is a hard-requirement for the user-facing
> > API
> > > > to
> > > > > be
> > > > > > > > > > >>>>>>> clearly defined and users should be able to use
> the
> > > API
> > > > > > > without
> > > > > > > > > > >>>> concern
> > > > > > > > > > >>>>>> of
> > > > > > > > > > >>>>>>> regression. And this requirement is more
> important
> > > than
> > > > > the
> > > > > > > > other
> > > > > > > > > > >>>> goals
> > > > > > > > > > >>>>>>> discussed above because it is related to the
> > > > > > > > stability/performance of
> > > > > > > > > > >>>>> the
> > > > > > > > > > >>>>>>> production job. What do you think?
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>> I don't agree with this. There are many things
> that
> > > work
> > > > > > > > something in
> > > > > > > > > > >>>>>> between perfectly and well enough
> > > > > > > > > > >>>>>> in some fraction of use cases (maybe in 99%, maybe
> > 95%
> > > > or
> > > > > > > maybe
> > > > > > > > 60%),
> > > > > > > > > > >>>>> while
> > > > > > > > > > >>>>>> still being very useful.
> > > > > > > > > > >>>>>> Good examples are: selection of state backend,
> > > unaligned
> > > > > > > > checkpoints,
> > > > > > > > > > >>>>>> buffer debloating but frankly if I go
> > > > > > > > > > >>>>>> through list of currently available config
> options,
> > > > > > something
> > > > > > > > like
> > > > > > > > > > >> half
> > > > > > > > > > >>>>> of
> > > > > > > > > > >>>>>> them can cause regressions. Heck,
> > > > > > > > > > >>>>>> even Flink itself doesn't work perfectly in 100%
> of
> > > the
> > > > > use
> > > > > > > > cases, due
> > > > > > > > > > >>>>> to a
> > > > > > > > > > >>>>>> variety of design choices. Of
> > > > > > > > > > >>>>>> course, the more use cases are fine with said
> > feature,
> > > > the
> > > > > > > > better, but
> > > > > > > > > > >>>> we
> > > > > > > > > > >>>>>> shouldn't fixate to perfectly cover
> > > > > > > > > > >>>>>> 100% of the cases, as that's impossible.
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>> In this particular case, if back pressure
> monitoring
> > > > > > trigger
> > > > > > > > can work
> > > > > > > > > > >>>>> well
> > > > > > > > > > >>>>>> enough in 95% of cases, I would
> > > > > > > > > > >>>>>> say that's already better than the originally
> > proposed
> > > > > > > > alternative,
> > > > > > > > > > >>>> which
> > > > > > > > > > >>>>>> doesn't work at all if user has a large
> > > > > > > > > > >>>>>> backlog to reprocess from Kafka, including when
> > using
> > > > > > > > HybridSource
> > > > > > > > > > >>>> AFTER
> > > > > > > > > > >>>>>> the switch to Kafka has
> > > > > > > > > > >>>>>> happened. For the remaining 5%, we should try to
> > > improve
> > > > > the
> > > > > > > > behaviour
> > > > > > > > > > >>>>> over
> > > > > > > > > > >>>>>> time, but ultimately, users can
> > > > > > > > > > >>>>>> decide to just run a fixed checkpoint interval (or
> > at
> > > > > worst
> > > > > > > use
> > > > > > > > the
> > > > > > > > > > >>>> hacky
> > > > > > > > > > >>>>>> checkpoint trigger that I mentioned
> > > > > > > > > > >>>>>> before a couple of times).
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>> Also to be pedantic, if a user naively selects
> > > > > slow-interval
> > > > > > > in
> > > > > > > > your
> > > > > > > > > > >>>>>> proposal to 30 minutes, when that user's
> > > > > > > > > > >>>>>> job fails on average every 15-20minutes, his job
> can
> > > end
> > > > > up
> > > > > > in
> > > > > > > > a state
> > > > > > > > > > >>>>> that
> > > > > > > > > > >>>>>> it can not make any progress,
> > > > > > > > > > >>>>>> this arguably is quite serious regression.
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> I probably should not say it is "hard requirement".
> > > After
> > > > > all
> > > > > > > > there are
> > > > > > > > > > >>>>> pros/cons. We will need to consider implementation
> > > > > > complexity,
> > > > > > > > > > >> usability,
> > > > > > > > > > >>>>> extensibility etc.
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> I just don't think we should take it for granted to
> > > > > introduce
> > > > > > > > > > >> regression
> > > > > > > > > > >>>>> for one use-case in order to support another
> > use-case.
> > > If
> > > > > we
> > > > > > > can
> > > > > > > > not
> > > > > > > > > > >> find
> > > > > > > > > > >>>>> an algorithm/solution that addresses
> > > > > > > > > > >>>>> both use-case well, I hope we can be open to tackle
> > > them
> > > > > > > > separately so
> > > > > > > > > > >>>> that
> > > > > > > > > > >>>>> users can choose the option that best fits their
> > needs.
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> All things else being equal, I think it is
> preferred
> > > for
> > > > > > > > user-facing
> > > > > > > > > > >> API
> > > > > > > > > > >>>> to
> > > > > > > > > > >>>>> be clearly defined and let users should be able to
> > use
> > > > the
> > > > > > API
> > > > > > > > without
> > > > > > > > > > >>>>> concern of regression.
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> Maybe we can list pros/cons for the alternative
> > > > approaches
> > > > > we
> > > > > > > > have been
> > > > > > > > > > >>>>> discussing and see choose the best approach. And
> > maybe
> > > we
> > > > > > will
> > > > > > > > end up
> > > > > > > > > > >>>>> finding that use-case
> > > > > > > > > > >>>>> which needs CheckpointTrigger can be tackled
> > separately
> > > > > from
> > > > > > > the
> > > > > > > > > > >> use-case
> > > > > > > > > > >>>>> in FLIP-309.
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>>>> I am not sure if there is a typo. Because if
> > > > > > > > > > >>>>> backPressuredTimeMsPerSecond
> > > > > > > > > > >>>>>> =
> > > > > > > > > > >>>>>>> 0, then maxRecordsConsumedWithoutBackpressure =
> > > > > > > > > > >>>> numRecordsInPerSecond /
> > > > > > > > > > >>>>>>> 1000 * metricsUpdateInterval according to the
> above
> > > > > > > algorithm.
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> Do you mean
> "maxRecordsConsumedWithoutBackpressure
> > =
> > > > > > > > > > >>>>>> (numRecordsInPerSecond
> > > > > > > > > > >>>>>>> / (1 - backPressuredTimeMsPerSecond / 1000)) *
> > > > > > > > > > >>>> metricsUpdateInterval"?
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>> It looks like there is indeed some mistake in my
> > > > proposal
> > > > > > > > above. Yours
> > > > > > > > > > >>>>> look
> > > > > > > > > > >>>>>> more correct, it probably
> > > > > > > > > > >>>>>> still needs some safeguard/special handling if
> > > > > > > > > > >>>>>> `backPressuredTimeMsPerSecond > 950`
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>>> The only information it can access is the
> backlog.
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>> Again no. It can access whatever we want to
> provide
> > to
> > > > it.
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>> Regarding the rest of your concerns. It's a matter
> > of
> > > > > > tweaking
> > > > > > > > the
> > > > > > > > > > >>>>>> parameters and the algorithm itself,
> > > > > > > > > > >>>>>> and how much safety-net do we want to have.
> > > Ultimately,
> > > > > I'm
> > > > > > > > pretty
> > > > > > > > > > >> sure
> > > > > > > > > > >>>>>> that's a (for 95-99% of cases)
> > > > > > > > > > >>>>>> solvable problem. If not, there is always the
> hacky
> > > > > > solution,
> > > > > > > > that
> > > > > > > > > > >>>> could
> > > > > > > > > > >>>>> be
> > > > > > > > > > >>>>>> even integrated into this above
> > > > > > > > > > >>>>>> mentioned algorithm as a short circuit to always
> > reach
> > > > > > > > > > >> `slow-interval`.
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>> Apart of that, you picked 3 minutes as the
> > > checkpointing
> > > > > > > > interval in
> > > > > > > > > > >>>> your
> > > > > > > > > > >>>>>> counter example. In most cases
> > > > > > > > > > >>>>>> any interval above 1 minute would inflict pretty
> > > > > negligible
> > > > > > > > overheads,
> > > > > > > > > > >>>> so
> > > > > > > > > > >>>>>> all in all, I would doubt there is
> > > > > > > > > > >>>>>> a significant benefit (in most cases) of
> increasing
> > 3
> > > > > minute
> > > > > > > > > > >> checkpoint
> > > > > > > > > > >>>>>> interval to anything more, let alone
> > > > > > > > > > >>>>>> 30 minutes.
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> I am not sure we should design the algorithm with
> the
> > > > > > > assumption
> > > > > > > > that
> > > > > > > > > > >> the
> > > > > > > > > > >>>>> short checkpointing interval will always be higher
> > > than 1
> > > > > > > minute
> > > > > > > > etc.
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> I agree the proposed algorithm can solve most cases
> > > where
> > > > > the
> > > > > > > > resource
> > > > > > > > > > >> is
> > > > > > > > > > >>>>> sufficient and there is always no backlog in source
> > > > > subtasks.
> > > > > > > On
> > > > > > > > the
> > > > > > > > > > >>>> other
> > > > > > > > > > >>>>> hand, what makes SRE
> > > > > > > > > > >>>>> life hard is probably the remaining 1-5% cases
> where
> > > the
> > > > > > > traffic
> > > > > > > > is
> > > > > > > > > > >> spiky
> > > > > > > > > > >>>>> and the cluster is reaching its capacity limit. The
> > > > ability
> > > > > > to
> > > > > > > > predict
> > > > > > > > > > >>>> and
> > > > > > > > > > >>>>> control Flink job's behavior (including
> checkpointing
> > > > > > interval)
> > > > > > > > can
> > > > > > > > > > >>>>> considerably reduce the burden of manging Flink
> jobs.
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> Best,
> > > > > > > > > > >>>>> Dong
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>> Best,
> > > > > > > > > > >>>>>> Piotrek
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>> sob., 3 cze 2023 o 05:44 Dong Lin <
> > > lindon...@gmail.com>
> > > > > > > > napisał(a):
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>>> Hi Piotr,
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> Thanks for the explanations. I have some followup
> > > > > questions
> > > > > > > > below.
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> On Fri, Jun 2, 2023 at 10:55 PM Piotr Nowojski <
> > > > > > > > pnowoj...@apache.org
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>>>> wrote:
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>>> Hi All,
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>> Thanks for chipping in the discussion Ahmed!
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>> Regarding using the REST API. Currently I'm
> > leaning
> > > > > > towards
> > > > > > > > > > >>>>>> implementing
> > > > > > > > > > >>>>>>>> this feature inside the Flink itself, via some
> > > > pluggable
> > > > > > > > interface.
> > > > > > > > > > >>>>>>>> REST API solution would be tempting, but I guess
> > not
> > > > > > > everyone
> > > > > > > > is
> > > > > > > > > > >>>>> using
> > > > > > > > > > >>>>>>>> Flink Kubernetes Operator.
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>> @Dong
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>>> I am not sure metrics such as isBackPressured
> are
> > > > > already
> > > > > > > > sent to
> > > > > > > > > > >>>>> JM.
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>> Fetching code path on the JM:
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>
> > > > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl#queryTmMetricsFuture
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>
> > > > > > > >
> > > > org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore#add
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>> Example code path accessing Task level metrics
> via
> > > JM
> > > > > > using
> > > > > > > > the
> > > > > > > > > > >>>>>>>> `MetricStore`:
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>
> > > > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.rest.handler.job.metrics.AggregatingSubtasksMetricsHandler
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> Thanks for the code reference. I checked the code
> > > that
> > > > > > > invoked
> > > > > > > > these
> > > > > > > > > > >>>>> two
> > > > > > > > > > >>>>>>> classes and found the following information:
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> - AggregatingSubtasksMetricsHandler#getStoresis
> > > > currently
> > > > > > > > invoked
> > > > > > > > > > >>>> only
> > > > > > > > > > >>>>>>> when AggregatingJobsMetricsHandler is invoked.
> > > > > > > > > > >>>>>>> - AggregatingJobsMetricsHandler is only
> > instantiated
> > > > and
> > > > > > > > returned by
> > > > > > > > > > >>>>>>> WebMonitorEndpoint#initializeHandlers
> > > > > > > > > > >>>>>>> - WebMonitorEndpoint#initializeHandlers is only
> > used
> > > by
> > > > > > > > > > >>>>>> RestServerEndpoint.
> > > > > > > > > > >>>>>>> And RestServerEndpoint invokes these handlers in
> > > > response
> > > > > > to
> > > > > > > > external
> > > > > > > > > > >>>>>> REST
> > > > > > > > > > >>>>>>> request.
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> I understand that JM will get the
> > > backpressure-related
> > > > > > > metrics
> > > > > > > > every
> > > > > > > > > > >>>>> time
> > > > > > > > > > >>>>>>> the RestServerEndpoint receives the REST request
> to
> > > get
> > > > > > these
> > > > > > > > > > >>>> metrics.
> > > > > > > > > > >>>>>> But
> > > > > > > > > > >>>>>>> I am not sure if RestServerEndpoint is already
> > always
> > > > > > > > receiving the
> > > > > > > > > > >>>>> REST
> > > > > > > > > > >>>>>>> metrics at regular interval (suppose there is no
> > > human
> > > > > > > manually
> > > > > > > > > > >>>>>>> opening/clicking the Flink Web UI). And if it
> does,
> > > > what
> > > > > is
> > > > > > > the
> > > > > > > > > > >>>>> interval?
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>>>> For example, let's say every source operator
> > > subtask
> > > > > > > reports
> > > > > > > > this
> > > > > > > > > > >>>>>>> metric
> > > > > > > > > > >>>>>>>> to
> > > > > > > > > > >>>>>>>>> JM once every 10 seconds. There are 100 source
> > > > > subtasks.
> > > > > > > And
> > > > > > > > each
> > > > > > > > > > >>>>>>> subtask
> > > > > > > > > > >>>>>>>>> is backpressured roughly 10% of the total time
> > due
> > > to
> > > > > > > traffic
> > > > > > > > > > >>>>> spikes
> > > > > > > > > > >>>>>>> (and
> > > > > > > > > > >>>>>>>>> limited buffer). Then at any given time, there
> > are
> > > 1
> > > > -
> > > > > > > > 0.9^100 =
> > > > > > > > > > >>>>>>> 99.997%
> > > > > > > > > > >>>>>>>>> chance that there is at least one subtask that
> is
> > > > > > > > backpressured.
> > > > > > > > > > >>>>> Then
> > > > > > > > > > >>>>>>> we
> > > > > > > > > > >>>>>>>>> have to wait for at least 10 seconds to check
> > > again.
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>> backPressuredTimeMsPerSecond and other related
> > > metrics
> > > > > > (like
> > > > > > > > > > >>>>>>>> busyTimeMsPerSecond) are not subject to that
> > > problem.
> > > > > > > > > > >>>>>>>> They are recalculated once every metric fetching
> > > > > interval,
> > > > > > > > and they
> > > > > > > > > > >>>>>>> report
> > > > > > > > > > >>>>>>>> accurately on average the given subtask spent
> > > > > > > > > > >>>>>> busy/idling/backpressured.
> > > > > > > > > > >>>>>>>> In your example, backPressuredTimeMsPerSecond
> > would
> > > > > report
> > > > > > > > 100ms/s.
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> Suppose every subtask is already reporting
> > > > > > > > > > >>>> backPressuredTimeMsPerSecond
> > > > > > > > > > >>>>>> to
> > > > > > > > > > >>>>>>> JM once every 100 ms. If a job has 10 operators
> > (that
> > > > are
> > > > > > not
> > > > > > > > > > >>>> chained)
> > > > > > > > > > >>>>>> and
> > > > > > > > > > >>>>>>> each operator has 100 subtasks, then JM would
> need
> > to
> > > > > > handle
> > > > > > > > 10000
> > > > > > > > > > >>>>>> requests
> > > > > > > > > > >>>>>>> per second to receive metrics from these 1000
> > > subtasks.
> > > > > It
> > > > > > > > seems
> > > > > > > > > > >>>> like a
> > > > > > > > > > >>>>>>> non-trivial overhead for medium-to-large sized
> jobs
> > > and
> > > > > can
> > > > > > > > make JM
> > > > > > > > > > >>>> the
> > > > > > > > > > >>>>>>> performance bottleneck during job execution.
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> I would be surprised if Flink is already paying
> > this
> > > > much
> > > > > > > > overhead
> > > > > > > > > > >>>> just
> > > > > > > > > > >>>>>> for
> > > > > > > > > > >>>>>>> metrics monitoring. That is the main reason I
> still
> > > > doubt
> > > > > > it
> > > > > > > > is true.
> > > > > > > > > > >>>>> Can
> > > > > > > > > > >>>>>>> you show where this 100 ms is currently
> configured?
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> Alternatively, maybe you mean that we should add
> > > extra
> > > > > code
> > > > > > > to
> > > > > > > > invoke
> > > > > > > > > > >>>>> the
> > > > > > > > > > >>>>>>> REST API at 100 ms interval. Then that means we
> > need
> > > to
> > > > > > > > considerably
> > > > > > > > > > >>>>>>> increase the network/cpu overhead at JM, where
> the
> > > > > overhead
> > > > > > > > will
> > > > > > > > > > >>>>> increase
> > > > > > > > > > >>>>>>> as the number of TM/slots increase, which may
> pose
> > > risk
> > > > > to
> > > > > > > the
> > > > > > > > > > >>>>>> scalability
> > > > > > > > > > >>>>>>> of the proposed design. I am not sure we should
> do
> > > > this.
> > > > > > What
> > > > > > > > do you
> > > > > > > > > > >>>>>> think?
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>>> While it will be nice to support additional
> > > use-cases
> > > > > > > > > > >>>>>>>>> with one proposal, it is probably also
> reasonable
> > > to
> > > > > make
> > > > > > > > > > >>>>> incremental
> > > > > > > > > > >>>>>>>>> progress and support the low-hanging-fruit
> > use-case
> > > > > > first.
> > > > > > > > The
> > > > > > > > > > >>>>> choice
> > > > > > > > > > >>>>>>>>> really depends on the complexity and the
> > importance
> > > > of
> > > > > > > > supporting
> > > > > > > > > > >>>>> the
> > > > > > > > > > >>>>>>>> extra
> > > > > > > > > > >>>>>>>>> use-cases.
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>> That would be true, if that was a private
> > > > implementation
> > > > > > > > detail or
> > > > > > > > > > >>>> if
> > > > > > > > > > >>>>>> the
> > > > > > > > > > >>>>>>>> low-hanging-fruit-solution would be on the
> direct
> > > path
> > > > > to
> > > > > > > the
> > > > > > > > final
> > > > > > > > > > >>>>>>>> solution.
> > > > > > > > > > >>>>>>>> That's unfortunately not the case here. This
> will
> > > add
> > > > > > public
> > > > > > > > facing
> > > > > > > > > > >>>>>> API,
> > > > > > > > > > >>>>>>>> that we will later need to maintain, no matter
> > what
> > > > the
> > > > > > > final
> > > > > > > > > > >>>>> solution
> > > > > > > > > > >>>>>>> will
> > > > > > > > > > >>>>>>>> be,
> > > > > > > > > > >>>>>>>> and at the moment at least I don't see it being
> > > > related
> > > > > > to a
> > > > > > > > > > >>>>> "perfect"
> > > > > > > > > > >>>>>>>> solution.
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> Sure. Then let's decide the final solution first.
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>>>> I guess the point is that the suggested
> approach,
> > > > which
> > > > > > > > > > >>>> dynamically
> > > > > > > > > > >>>>>>>>> determines the checkpointing interval based on
> > the
> > > > > > > > backpressure,
> > > > > > > > > > >>>>> may
> > > > > > > > > > >>>>>>>> cause
> > > > > > > > > > >>>>>>>>> regression when the checkpointing interval is
> > > > > relatively
> > > > > > > low.
> > > > > > > > > > >>>> This
> > > > > > > > > > >>>>>>> makes
> > > > > > > > > > >>>>>>>> it
> > > > > > > > > > >>>>>>>>> hard for users to enable this feature in
> > > production.
> > > > It
> > > > > > is
> > > > > > > > like
> > > > > > > > > > >>>> an
> > > > > > > > > > >>>>>>>>> auto-driving system that is not guaranteed to
> > work
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>> Yes, creating a more generic solution that would
> > > > require
> > > > > > > less
> > > > > > > > > > >>>>>>> configuration
> > > > > > > > > > >>>>>>>> is usually more difficult then static
> > > configurations.
> > > > > > > > > > >>>>>>>> It doesn't mean we shouldn't try to do that.
> > > > Especially
> > > > > > that
> > > > > > > > if my
> > > > > > > > > > >>>>>>> proposed
> > > > > > > > > > >>>>>>>> algorithm wouldn't work good enough, there is
> > > > > > > > > > >>>>>>>> an obvious solution, that any source could add a
> > > > metric,
> > > > > > > like
> > > > > > > > let
> > > > > > > > > > >>>> say
> > > > > > > > > > >>>>>>>> "processingBacklog: true/false", and the
> > > > > > `CheckpointTrigger`
> > > > > > > > > > >>>>>>>> could use this as an override to always switch
> to
> > > the
> > > > > > > > > > >>>>>>>> "slowCheckpointInterval". I don't think we need
> > it,
> > > > but
> > > > > > > that's
> > > > > > > > > > >>>> always
> > > > > > > > > > >>>>>> an
> > > > > > > > > > >>>>>>>> option
> > > > > > > > > > >>>>>>>> that would be basically equivalent to your
> > original
> > > > > > > proposal.
> > > > > > > > Or
> > > > > > > > > > >>>> even
> > > > > > > > > > >>>>>>>> source could add "suggestedCheckpointInterval :
> > > int",
> > > > > and
> > > > > > > > > > >>>>>>>> `CheckpointTrigger` could use that value if
> > present
> > > > as a
> > > > > > > hint
> > > > > > > > in
> > > > > > > > > > >>>> one
> > > > > > > > > > >>>>>> way
> > > > > > > > > > >>>>>>> or
> > > > > > > > > > >>>>>>>> another.
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> So far we have talked about the possibility of
> > using
> > > > > > > > > > >>>> CheckpointTrigger
> > > > > > > > > > >>>>>> and
> > > > > > > > > > >>>>>>> mentioned the CheckpointTrigger
> > > > > > > > > > >>>>>>> and read metric values.
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> Can you help answer the following questions so
> > that I
> > > > can
> > > > > > > > understand
> > > > > > > > > > >>>>> the
> > > > > > > > > > >>>>>>> alternative solution more concretely:
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> - What is the interface of this
> CheckpointTrigger?
> > > For
> > > > > > > > example, are
> > > > > > > > > > >>>> we
> > > > > > > > > > >>>>>>> going to give CheckpointTrigger a context that it
> > can
> > > > use
> > > > > > to
> > > > > > > > fetch
> > > > > > > > > > >>>>>>> arbitrary metric values? This can help us
> > understand
> > > > what
> > > > > > > > information
> > > > > > > > > > >>>>>> this
> > > > > > > > > > >>>>>>> user-defined CheckpointTrigger can use to make
> the
> > > > > > checkpoint
> > > > > > > > > > >>>> decision.
> > > > > > > > > > >>>>>>> - Where is this CheckpointTrigger running? For
> > > example,
> > > > > is
> > > > > > it
> > > > > > > > going
> > > > > > > > > > >>>> to
> > > > > > > > > > >>>>>> run
> > > > > > > > > > >>>>>>> on the subtask of every source operator? Or is it
> > > going
> > > > > to
> > > > > > > run
> > > > > > > > on the
> > > > > > > > > > >>>>> JM?
> > > > > > > > > > >>>>>>> - Are we going to provide a default
> implementation
> > of
> > > > > this
> > > > > > > > > > >>>>>>> CheckpointTrigger in Flink that implements the
> > > > algorithm
> > > > > > > > described
> > > > > > > > > > >>>>> below,
> > > > > > > > > > >>>>>>> or do we expect each source operator developer to
> > > > > implement
> > > > > > > > their own
> > > > > > > > > > >>>>>>> CheckpointTrigger?
> > > > > > > > > > >>>>>>> - How can users specify the
> > > > > > > > > > >>>>>> fastCheckpointInterval/slowCheckpointInterval?
> > > > > > > > > > >>>>>>> For example, will we provide APIs on the
> > > > > CheckpointTrigger
> > > > > > > that
> > > > > > > > > > >>>>> end-users
> > > > > > > > > > >>>>>>> can use to specify the checkpointing interval?
> What
> > > > would
> > > > > > > that
> > > > > > > > look
> > > > > > > > > > >>>>> like?
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> Overall, my gut feel is that the alternative
> > approach
> > > > > based
> > > > > > > on
> > > > > > > > > > >>>>>>> CheckpointTrigger is more complicated and harder
> to
> > > > use.
> > > > > > And
> > > > > > > it
> > > > > > > > > > >>>>> probably
> > > > > > > > > > >>>>>>> also has the issues of "having two places to
> > > configure
> > > > > > > > checkpointing
> > > > > > > > > > >>>>>>> interval" and "giving flexibility for every
> source
> > to
> > > > > > > > implement a
> > > > > > > > > > >>>>>> different
> > > > > > > > > > >>>>>>> API" (as mentioned below).
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> Maybe we can evaluate it more after knowing the
> > > answers
> > > > > to
> > > > > > > the
> > > > > > > > above
> > > > > > > > > > >>>>>>> questions.
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>>> On the other hand, the approach currently
> > proposed
> > > in
> > > > > the
> > > > > > > > FLIP is
> > > > > > > > > > >>>>>> much
> > > > > > > > > > >>>>>>>>> simpler as it does not depend on backpressure.
> > > Users
> > > > > > > specify
> > > > > > > > the
> > > > > > > > > > >>>>>> extra
> > > > > > > > > > >>>>>>>>> interval requirement on the specific sources
> > (e.g.
> > > > > > > > HybridSource,
> > > > > > > > > > >>>>>> MySQL
> > > > > > > > > > >>>>>>>> CDC
> > > > > > > > > > >>>>>>>>> Source) and can easily know the checkpointing
> > > > interval
> > > > > > will
> > > > > > > > be
> > > > > > > > > > >>>> used
> > > > > > > > > > >>>>>> on
> > > > > > > > > > >>>>>>>> the
> > > > > > > > > > >>>>>>>>> continuous phase of the corresponding source.
> > This
> > > is
> > > > > > > pretty
> > > > > > > > much
> > > > > > > > > > >>>>>> same
> > > > > > > > > > >>>>>>> as
> > > > > > > > > > >>>>>>>>> how users use the existing
> > > > > > execution.checkpointing.interval
> > > > > > > > > > >>>> config.
> > > > > > > > > > >>>>>> So
> > > > > > > > > > >>>>>>>>> there is no extra concern of regression caused
> by
> > > > this
> > > > > > > > approach.
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>> To an extent, but as I have already previously
> > > > > mentioned I
> > > > > > > > really
> > > > > > > > > > >>>>>> really
> > > > > > > > > > >>>>>>> do
> > > > > > > > > > >>>>>>>> not like idea of:
> > > > > > > > > > >>>>>>>> - having two places to configure checkpointing
> > > > interval
> > > > > > > > (config
> > > > > > > > > > >>>>> file
> > > > > > > > > > >>>>>>> and
> > > > > > > > > > >>>>>>>> in the Source builders)
> > > > > > > > > > >>>>>>>> - giving flexibility for every source to
> > implement a
> > > > > > > different
> > > > > > > > > > >>>> API
> > > > > > > > > > >>>>>> for
> > > > > > > > > > >>>>>>>> that purpose
> > > > > > > > > > >>>>>>>> - creating a solution that is not generic
> enough,
> > so
> > > > > that
> > > > > > we
> > > > > > > > will
> > > > > > > > > > >>>>>> need
> > > > > > > > > > >>>>>>> a
> > > > > > > > > > >>>>>>>> completely different mechanism in the future
> > anyway
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> Yeah, I understand different developers might
> have
> > > > > > different
> > > > > > > > > > >>>>>>> concerns/tastes for these APIs. Ultimately, there
> > > might
> > > > > not
> > > > > > > be
> > > > > > > > a
> > > > > > > > > > >>>>> perfect
> > > > > > > > > > >>>>>>> solution and we have to choose based on the
> > pros/cons
> > > > of
> > > > > > > these
> > > > > > > > > > >>>>> solutions.
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> I agree with you that, all things being equal, it
> > is
> > > > > > > > preferable to 1)
> > > > > > > > > > >>>>>> have
> > > > > > > > > > >>>>>>> one place to configure checkpointing intervals,
> 2)
> > > have
> > > > > all
> > > > > > > > source
> > > > > > > > > > >>>>>>> operators use the same API, and 3) create a
> > solution
> > > > that
> > > > > > is
> > > > > > > > generic
> > > > > > > > > > >>>>> and
> > > > > > > > > > >>>>>>> last lasting. Note that these three goals affects
> > the
> > > > > > > > usability and
> > > > > > > > > > >>>>>>> extensibility of the API, but not necessarily the
> > > > > > > > > > >>>> stability/performance
> > > > > > > > > > >>>>>> of
> > > > > > > > > > >>>>>>> the production job.
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> BTW, there are also other preferrable goals. For
> > > > example,
> > > > > > it
> > > > > > > > is very
> > > > > > > > > > >>>>>> useful
> > > > > > > > > > >>>>>>> for the job's behavior to be predictable and
> > > > > interpretable
> > > > > > so
> > > > > > > > that
> > > > > > > > > > >>>> SRE
> > > > > > > > > > >>>>>> can
> > > > > > > > > > >>>>>>> operator/debug the Flink in an easier way. We can
> > > list
> > > > > > these
> > > > > > > > > > >>>> pros/cons
> > > > > > > > > > >>>>>>> altogether later.
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> I am wondering if we can first agree on the
> > priority
> > > of
> > > > > > goals
> > > > > > > > we want
> > > > > > > > > > >>>>> to
> > > > > > > > > > >>>>>>> achieve. IMO, it is a hard-requirement for the
> > > > > user-facing
> > > > > > > API
> > > > > > > > to be
> > > > > > > > > > >>>>>>> clearly defined and users should be able to use
> the
> > > API
> > > > > > > without
> > > > > > > > > > >>>> concern
> > > > > > > > > > >>>>>> of
> > > > > > > > > > >>>>>>> regression. And this requirement is more
> important
> > > than
> > > > > the
> > > > > > > > other
> > > > > > > > > > >>>> goals
> > > > > > > > > > >>>>>>> discussed above because it is related to the
> > > > > > > > stability/performance of
> > > > > > > > > > >>>>> the
> > > > > > > > > > >>>>>>> production job. What do you think?
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>>> Sounds good. Looking forward to learning more
> > > ideas.
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>> I have thought about this a bit more, and I
> think
> > we
> > > > > don't
> > > > > > > > need to
> > > > > > > > > > >>>>>> check
> > > > > > > > > > >>>>>>>> for the backpressure status, or how much
> > overloaded
> > > > all
> > > > > of
> > > > > > > the
> > > > > > > > > > >>>>>> operators
> > > > > > > > > > >>>>>>>> are.
> > > > > > > > > > >>>>>>>> We could just check three things for source
> > > operators:
> > > > > > > > > > >>>>>>>> 1. pendingRecords (backlog length)
> > > > > > > > > > >>>>>>>> 2. numRecordsInPerSecond
> > > > > > > > > > >>>>>>>> 3. backPressuredTimeMsPerSecond
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>> // int metricsUpdateInterval = 10s // obtained
> > from
> > > > > config
> > > > > > > > > > >>>>>>>> // Next line calculates how many records can we
> > > > consume
> > > > > > from
> > > > > > > > the
> > > > > > > > > > >>>>>> backlog,
> > > > > > > > > > >>>>>>>> assuming
> > > > > > > > > > >>>>>>>> // that magically the reason behind a
> backpressure
> > > > > > vanishes.
> > > > > > > > We
> > > > > > > > > > >>>> will
> > > > > > > > > > >>>>>> use
> > > > > > > > > > >>>>>>>> this only as
> > > > > > > > > > >>>>>>>> // a safeguard  against scenarios like for
> example
> > > if
> > > > > > > > backpressure
> > > > > > > > > > >>>>> was
> > > > > > > > > > >>>>>>>> caused by some
> > > > > > > > > > >>>>>>>> // intermittent failure/performance degradation.
> > > > > > > > > > >>>>>>>> maxRecordsConsumedWithoutBackpressure =
> > > > > > > > (numRecordsInPerSecond /
> > > > > > > > > > >>>>> (1000
> > > > > > > > > > >>>>>>>> - backPressuredTimeMsPerSecond / 1000)) *
> > > > > > > > metricsUpdateInterval
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> I am not sure if there is a typo. Because if
> > > > > > > > > > >>>>>> backPressuredTimeMsPerSecond =
> > > > > > > > > > >>>>>>> 0, then maxRecordsConsumedWithoutBackpressure =
> > > > > > > > > > >>>> numRecordsInPerSecond /
> > > > > > > > > > >>>>>>> 1000 * metricsUpdateInterval according to the
> above
> > > > > > > algorithm.
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> Do you mean
> "maxRecordsConsumedWithoutBackpressure
> > =
> > > > > > > > > > >>>>>> (numRecordsInPerSecond
> > > > > > > > > > >>>>>>> / (1 - backPressuredTimeMsPerSecond / 1000)) *
> > > > > > > > > > >>>> metricsUpdateInterval"?
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>> // we are excluding
> > > > > maxRecordsConsumedWithoutBackpressure
> > > > > > > > from the
> > > > > > > > > > >>>>>>> backlog
> > > > > > > > > > >>>>>>>> as
> > > > > > > > > > >>>>>>>> // a safeguard against an intermittent back
> > pressure
> > > > > > > > problems, so
> > > > > > > > > > >>>>> that
> > > > > > > > > > >>>>>> we
> > > > > > > > > > >>>>>>>> don't
> > > > > > > > > > >>>>>>>> // calculate next checkpoint interval far far in
> > the
> > > > > > future,
> > > > > > > > while
> > > > > > > > > > >>>>> the
> > > > > > > > > > >>>>>>>> backpressure
> > > > > > > > > > >>>>>>>> // goes away before we will recalculate metrics
> > and
> > > > new
> > > > > > > > > > >>>> checkpointing
> > > > > > > > > > >>>>>>>> interval
> > > > > > > > > > >>>>>>>> timeToConsumeBacklog = (pendingRecords -
> > > > > > > > > > >>>>>>>> maxRecordsConsumedWithoutBackpressure) /
> > > > > > > numRecordsInPerSecond
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>> Then we can use those numbers to calculate
> desired
> > > > > > > > checkpointed
> > > > > > > > > > >>>>>> interval
> > > > > > > > > > >>>>>>>> for example like this:
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>> long calculatedCheckpointInterval =
> > > > > timeToConsumeBacklog /
> > > > > > > 10;
> > > > > > > > > > >>>> //this
> > > > > > > > > > >>>>>> may
> > > > > > > > > > >>>>>>>> need some refining
> > > > > > > > > > >>>>>>>> long nextCheckpointInterval =
> > > > > > > min(max(fastCheckpointInterval,
> > > > > > > > > > >>>>>>>> calculatedCheckpointInterval),
> > > > slowCheckpointInterval);
> > > > > > > > > > >>>>>>>> long nextCheckpointTs = lastCheckpointTs +
> > > > > > > > nextCheckpointInterval;
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>> WDYT?
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> I think the idea of the above algorithm is to
> > incline
> > > > to
> > > > > > use
> > > > > > > > the
> > > > > > > > > > >>>>>>> fastCheckpointInterval unless we are very sure
> the
> > > > > backlog
> > > > > > > > will take
> > > > > > > > > > >>>> a
> > > > > > > > > > >>>>>> long
> > > > > > > > > > >>>>>>> time to process. This can alleviate the concern
> of
> > > > > > regression
> > > > > > > > during
> > > > > > > > > > >>>>> the
> > > > > > > > > > >>>>>>> continuous_bounded phase since we are more likely
> > to
> > > > use
> > > > > > the
> > > > > > > > > > >>>>>>> fastCheckpointInterval. However, it can cause
> > > > regression
> > > > > > > > during the
> > > > > > > > > > >>>>>> bounded
> > > > > > > > > > >>>>>>> phase.
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> I will use a concrete example to explain the risk
> > of
> > > > > > > > regression:
> > > > > > > > > > >>>>>>> - The user is using HybridSource to read from
> HDFS
> > > > > followed
> > > > > > > by
> > > > > > > > Kafka.
> > > > > > > > > > >>>>> The
> > > > > > > > > > >>>>>>> data in HDFS is old and there is no need for data
> > > > > freshness
> > > > > > > > for the
> > > > > > > > > > >>>>> data
> > > > > > > > > > >>>>>> in
> > > > > > > > > > >>>>>>> HDFS.
> > > > > > > > > > >>>>>>> - The user configures the job as below:
> > > > > > > > > > >>>>>>> - fastCheckpointInterval = 3 minutes
> > > > > > > > > > >>>>>>> - slowCheckpointInterval = 30 minutes
> > > > > > > > > > >>>>>>> - metricsUpdateInterval = 100 ms
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> Using the above formulate, we can know that once
> > > > > > > pendingRecords
> > > > > > > > > > >>>>>>> <= numRecordsInPerSecond * 30-minutes, then
> > > > > > > > > > >>>>> calculatedCheckpointInterval
> > > > > > > > > > >>>>>> <=
> > > > > > > > > > >>>>>>> 3 minutes, meaning that we will use
> > > > > slowCheckpointInterval
> > > > > > as
> > > > > > > > the
> > > > > > > > > > >>>>>>> checkpointing interval. Then in the last 30
> minutes
> > > of
> > > > > the
> > > > > > > > bounded
> > > > > > > > > > >>>>> phase,
> > > > > > > > > > >>>>>>> the checkpointing frequency will be 10X higher
> than
> > > > what
> > > > > > the
> > > > > > > > user
> > > > > > > > > > >>>>> wants.
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> Also note that the same issue would also
> > considerably
> > > > > limit
> > > > > > > the
> > > > > > > > > > >>>>> benefits
> > > > > > > > > > >>>>>> of
> > > > > > > > > > >>>>>>> the algorithm. For example, during the continuous
> > > > phase,
> > > > > > the
> > > > > > > > > > >>>> algorithm
> > > > > > > > > > >>>>>> will
> > > > > > > > > > >>>>>>> only be better than the approach in FLIP-309 when
> > > there
> > > > > is
> > > > > > at
> > > > > > > > least
> > > > > > > > > > >>>>>>> 30-minutes worth of backlog in the source.
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> Sure, having a slower checkpointing interval in
> > this
> > > > > > extreme
> > > > > > > > case
> > > > > > > > > > >>>>> (where
> > > > > > > > > > >>>>>>> there is 30-minutes backlog in the
> > > continous-unbounded
> > > > > > phase)
> > > > > > > > is
> > > > > > > > > > >>>> still
> > > > > > > > > > >>>>>>> useful when this happens. But since this is the
> > > > un-common
> > > > > > > > case, and
> > > > > > > > > > >>>> the
> > > > > > > > > > >>>>>>> right solution is probably to do capacity
> planning
> > to
> > > > > avoid
> > > > > > > > this from
> > > > > > > > > > >>>>>>> happening in the first place, I am not sure it is
> > > worth
> > > > > > > > optimizing
> > > > > > > > > > >>>> for
> > > > > > > > > > >>>>>> this
> > > > > > > > > > >>>>>>> case at the cost of regression in the bounded
> phase
> > > and
> > > > > the
> > > > > > > > reduced
> > > > > > > > > > >>>>>>> operational predictability for users (e.g. what
> > > > > > checkpointing
> > > > > > > > > > >>>> interval
> > > > > > > > > > >>>>>>> should I expect at this stage of the job).
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> I think the fundamental issue with this algorithm
> > is
> > > > that
> > > > > > it
> > > > > > > is
> > > > > > > > > > >>>> applied
> > > > > > > > > > >>>>>> to
> > > > > > > > > > >>>>>>> both the bounded phases and the
> continous_unbounded
> > > > > phases
> > > > > > > > without
> > > > > > > > > > >>>>>> knowing
> > > > > > > > > > >>>>>>> which phase the job is running at. The only
> > > information
> > > > > it
> > > > > > > can
> > > > > > > > access
> > > > > > > > > > >>>>> is
> > > > > > > > > > >>>>>>> the backlog. But two sources with the same amount
> > of
> > > > > > backlog
> > > > > > > > do not
> > > > > > > > > > >>>>>>> necessarily mean they have the same data
> freshness
> > > > > > > requirement.
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> In this particular example, users know that the
> > data
> > > in
> > > > > > HDFS
> > > > > > > > is very
> > > > > > > > > > >>>>> old
> > > > > > > > > > >>>>>>> and there is no need for data freshness. Users
> can
> > > > > express
> > > > > > > > signals
> > > > > > > > > > >>>> via
> > > > > > > > > > >>>>>> the
> > > > > > > > > > >>>>>>> per-source API proposed in the FLIP. This is why
> > the
> > > > > > current
> > > > > > > > approach
> > > > > > > > > > >>>>> in
> > > > > > > > > > >>>>>>> FLIP-309 can be better in this case.
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> What do you think?
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> Best,
> > > > > > > > > > >>>>>>> Dong
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>> Best,
> > > > > > > > > > >>>>>>>> Piotrek
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>
> > > > > > > > > > >>
> > > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to