Hi Piotr,

Please see my comments inline.

On Mon, Jul 3, 2023 at 5:19 PM Piotr Nowojski <piotr.nowoj...@gmail.com>
wrote:

> Hi Dong,
>
> Starting from the end:
>
> > It seems that the only benefit of this approach is to avoid"
> > adding SplitEnumeratorContext#setIsProcessingBacklog."
>
> Yes, that's the major benefit of this counter-proposal.
>
> > In the target use-case, user still want to do checkpoint (though at a"
> > larger interval) when there is backlog. And HybridSource need to know
> the"
> > expected checkpoint interval during backlog in order to determine
> whether"
> > it should keep throwing CheckpointException. Thus, we still need to add"
> > execution.checkpointing.interval-during-backlog for user to specify this"
> > information."
> >
> > The downside of this approach is that it is hard to enforce the"
> > semantics specified by execution.checkpointing.interval-during-backlog.
> For"
> > example, suppose execution.checkpointing.interval =3 minute and"
> > execution.checkpointing.interval-during-backlog = 7 minutes. During the"
> > backlog phase, checkpoint coordinator will still trigger the checkpoint"
> > once every 3 minutes. HybridSource will need to reject 2 out of the 3"
> > checkpoint invocation, and the effective checkpoint interval will be 9"
> > minutes."
>
> Does it really matter what's the exact value of the longer interval? Can
> not we
> hard-code it to be 5x or 10x of the base checkpoint interval? If there is a
> notice
> able overhead from the base interval slowing down records processing rate,
> reducing this interval by a factor of 5x or 10x, would fix performance
> issue for
> vast majority of users. So a source could just skip 4 out of 5 or 9 out of
> 10
> checkpoints.
>

Yes, I think the exact value of the longer interval matters.

The main reason we need two intervals is for jobs which have two-phase
commit sink. The short interval typically represents the interval that a
user can accept for the two-phase commit sink to buffer data (since it can
only emit data when checkpoint is triggered). And the long interval
typically represents the maximum amount of duplicate work (in terms of
time) that a job need to re-do after failover.

Since there is no intrinsic relationship between the data buffer interval
(related to processing latency) and the failover boundary, I don't think we
can hardcode it to be 5x or 10x of the base checkpoint interval.


> Alternatively we could introduce a config option like:
>
> execution.checkpointing.long-interval
>
> that might be re-used in the future, with more fancy algorithms, but I
> don't see
> much value in doing that.


> > Overall, I think the solution is a bit hacky. I think it is preferred to"
> > throw exception only when there is indeed error. If we don't need to
> check"
> > a checkpoint, it is preferred to not trigger the checkpoint in the first"
> > place. And I think adding SplitEnumeratorContext#setIsProcessingBacklog
> is"
> > probably not that much of a big deal."
>
> Yes it's hacky, but at least it doesn't require extending the Public API
> for a
> quite limited solution, that only targets one or two sources that are
> rarely used.
>

I am not sure it is fair to say MySQL CDC source is "rarely used".
ververica/flink-cdc-connectors GitHub repo has 4K + starts. Also, note that
the proposed feature can be useful for CDC sources with an internal
"backlog phase". Its usage is not limited to just the two sources mentioned
in the FLIP.


>
> ================
>
> About the idea of emitting "RecordAttributes(isBacklog=..)". I have a
> feeling that
> this is overly complicated and would require every operator/function to
> handle that.
>
> Yes it would cover even more use cases, at the cost of complicating the
> system by
> a lot. IMO it looks like something we could do if there would indeed by a
> high
> demand of such a feature, after we provide some baseline generic solution,
> that
> doesn't require any configuration.
>
> I have a feeling that by just statically looking at the shape of the job
> graph and how
> it is connected, we could deduce almost the same things.
>

Note that pretty much every FLIP will address my use-case at the cost
of complicating the system. I understand you have the feeling that this
complexity is not worthwhile. However, as we can see from the comments in
this thread and the votes in the voting thread, many
committers/developers/users actually welcome the feature introduced in this
FLIP.

I am happy to work with you together to find a more generic and simpler
solution, as long as that solution can address the target use-case without
hurting user-experience. The alternative solution which you have mentioned
so far, unfortunately, still has drawbacks as mentioned earlier.


> Also:
>
> >  - the FLIP suggests to use the long checkpointing interval as long as
> any subtask is processing the backlog. Are you sure that's the right call?
> What if other
> >  sources are producing fresh records, and those fresh records are
> reaching sinks? It could happen either with disjoint JobGraph, embarrassing
> parallel
> >  JobGraph (no keyBy/unions/joins), or even with keyBy. Fresh records can
> slip using a not backpressured input channel through generally
> backpressured
> >  keyBy exchange. How should we handle that? This problem I think will
> affect every solution, including my previously proposed generic one, but we
> should
> >  discuss how to handle that as well.
>
> By this I didn't necessarily mean that we have to solve it right now.
>
> ================
>
> > The moments above seem kind of "abstract". I am hoping to understand more
> > technical details behind these comments so that we can see how to address
> > the concern.
>
> Over the span of this discussion I think I have already explained many
> times what
> bothers me in the current proposal.
>

If I understand you correctly, your concern is that the solution is not
generic, can not address the extra use-case you want, and is too
complicated.

I think it is fair to say that no FLIP can be fully generic to address all
use-case, just like the fact that Flink is still not perfect and still need
FLIPs to improve its performance/usage. And whether an API is too
complicated really depends on whether there exists a better option.

Fairly speaking, different people can have different opinions on whether a
proposal is generic and whether it is too complicated. At least from the
comments from other developers in this thread and in the voting thread,
many developers and users actually like this current proposal. I hope you
understand that your concerns mentioned above are subjective and not
unnecessarily shared by other developers.

Honestly speaking, if we block this FLIP just because anyone thinks it can
be better (yet without any concrete proposal for making it better), I feel
it is not a good result to other developers and users who would like to
have this feature to address their existing pain points.

I am wondering if you can be a bit more lenient, consider the opinion of
other developers (not just me) who have voted, and allow us to make
incremental progress even though you might find it not meeting your
expectations in its current form?


> > For example, even if a FLP does not address all use-case
> > (which is arguably true for every FLIP), its solution does not
> necessarily
> > need to be thrown away later as long as it is extensible
>
> That's my main point. I haven't yet seen how proposals from this FLIP, that
> could
> extend FLIP-309 to cover the generic use case and:
>  - Would work out of the box, for all or majority of the properly
> implemented sources.
>

I would argue that for the target use-case mentioned in the motivation
section, it is impossible to address the use-case without any code change
from the source, and still have the same stability as the current proposal.
The reason is that when the source do not have event-time, we can not
correctly derive whether the MySQL CDC source is in the snapshot/binlog
phase by just looking at the processing time related metrics or
backpressure.

We have discussed this issue in detail in the earlier emails of this
thread. I also mentioned that I will add follow-up FLIPs to make use of the
event-time metrics and backpressure metrics to derive backlog status. But
that can not replace the capability for source to explicitly specify its
metrics.



>  - Would require zero or very minimal configuration/input from the user.
> Especially
>

Note that the current proposal only requires the user to specify one extra
config, namely execution.checkpointing.interval-during-backlog. I don't
think we are able to reduce the extra config to be zero, due to the reason
explained above (i.e. separate interval for failover and data freshness).
Therefore, the configuration from user is already minimal.


>    wouldn't require implementing some custom things in every source.
>  - Could be made to work well enough in the (vast?) majority of the use
> cases.


I think you are talking about the case of delaying checkpoint interval when
backpressure is high etc. I would argue this is a use-case not targeted by
this FLIP and it can be addressed in a follow-up FLIP.


>


> > So we probably need to understand specifically why the proposed APIs
> would be thrown away.
>
> As I have mentioned many times why that's the case:
> 1. This solution is not generic enough
> 2. I can see solutions that wouldn't require modification of every source
> 3. They would have zero overlap with the interfaces extension from this
> FLIP
>


I have the feeling our discussion is kind of in a loop, where you ask for a
solution without any change to the source (so that it is generic), I
explain why I am not able to find such a solution and the drawback of your
proposed solution, and then you repeat the same ask and insist this is
possible.

If you can find a solution that wouldn't require modification of every
source and still address the target use-case well, could you please kindly
rephrase your solution so that we can revisit it?

I assume this solution would not require extra config from users, would not
cause the job to use long checkpoint interval due to random/short traffic
spikes, and would not cause the job to use the short interval when the job
is still reading backlog data.

I would be happy to be proven wrong if you else can provide such a solution
without the aforementioned drawbacks. I just hope we don't block the FLIP
forever for a goal that no one can address.

Best,
Dong


>
> Best,
> Piotrek
>
> sob., 1 lip 2023 o 17:01 Dong Lin <lindon...@gmail.com> napisał(a):
>
> > Hi Piotr,
> >
> > Thank you for providing further suggestions to help improve the API.
> Please
> > see my comments inline.
> >
> > On Fri, Jun 30, 2023 at 10:35 PM Piotr Nowojski <pnowoj...@apache.org>
> > wrote:
> >
> > > Hey,
> > >
> > > Sorry for a late reply, I was OoO for a week. I have three things to
> > point
> > > out.
> > >
> > > 1. ===============
> > >
> > > The updated proposal is indeed better, but to be honest I still don't
> > like
> > > it, for mostly the same reasons that I have mentioned earlier:
> > > - only a partial solution, that doesn't address all use cases, so we
> > would
> > > need to throw it away sooner or later
> > > - I don't see and it hasn't been discussed how to make it work out of
> the
> > > box for all sources
> > > - somehow complicating API for people implementing Sources
> > > - it should work out of the box for most of the sources, or at least to
> > > have that potential in the future
> > >
> >
> > The moments above seem kind of "abstract". I am hoping to understand more
> > technical details behind these comments so that we can see how to address
> > the concern. For example, even if a FLP does not address all use-case
> > (which is arguably true for every FLIP), its solution does not
> necessarily
> > need to be thrown away later as long as it is extensible. So we probably
> > need to understand specifically why the proposed APIs would be thrown
> away.
> >
> > Similarly, we would need to understand if there is a better design to
> make
> > the API simpler and work out of the box etc. in order to decide how to
> > address these comments.
> >
> >
> > > On top of that:
> > > - the FLIP I think is missing how to hook up SplitEnumeratorContext and
> > > CheckpointCoordinator to pass "isProcessingBacklog"
> > >
> >
> > I think it can be passed via the following function chain:
> > - CheckpointCoordinator invokes
> > OperatorCoordinatorCheckpointContext#isProcessingBacklog (via
> > coordinatorsToCheckpoint) to get this information.
> > - OperatorCoordinatorHolder implements
> > OperatorCoordinatorCheckpointContext#isProcessingBacklog and returns
> > OperatorCoordinator#isProcessingBacklog (via coordinator)
> > - SourceCoordinator implements OperatorCoordinator#isProcessingBacklog
> and
> > returns SourceCoordinatorContext#isProcessingBacklog
> > - SourceCoordinatorContext will implement
> > SplitEnumeratorContext#setIsProcessingBacklog and stores the given
> > information in a variable.
> >
> > Note that it involves only internal API. We might be able to find a
> simpler
> > solution with less functions on the path. As long as the above solution
> > works without having any performance or correctness, I think maybe we
> > should focus on the public API design and discuss the implementation in
> the
> > PR review?
> >
> > - the FLIP suggests to use the long checkpointing interval as long as any
> > > subtask is processing the backlog. Are you sure that's the right call?
> > What
> > > if other
> > >   sources are producing fresh records, and those fresh records are
> > reaching
> > > sinks? It could happen either with disjoint JobGraph, embarrassing
> > parallel
> > >   JobGraph (no keyBy/unions/joins), or even with keyBy. Fresh records
> can
> > > slip using a not backpressured input channel through generally
> > > backpressured
> > >   keyBy exchange. How should we handle that? This problem I think will
> > > affect every solution, including my previously proposed generic one,
> but
> > we
> > > should
> > >   discuss how to handle that as well.
> > >
> >
> > Good question. Here is my plan to improve the solution in a follow-up
> FLIP:
> >
> > - Let every subtask of every source operator emit
> > RecordAttributes(isBacklog=..)
> > - Let every subtask of every operator handle the RecordAttributes
> received
> > from inputs and emit RecordAttributes to downstream operators. Flink
> > runtime can derive this information for every one-input operator. For an
> > operator with two inputs, if one input has isBacklog=true and the other
> has
> > isBacklog=false, the operator should determine the isBacklog for its
> output
> > records based on its semantics.
> > - If there exists a subtask of a two-phase commit operator with
> > isBacklog=false, the operator should let JM know so that the JM will use
> > the short checkpoint interval (for data freshness). Otherwise, JM will
> use
> > the long checkpoint interval.
> >
> > The above solution guarantees that, if every two-input operator has
> > explicitly specified their isBacklog based on the inputs' isBacklog, then
> > the JM will use the short checkpoint interval if and only if it is useful
> > for at least one subtask of one two-phase commit operator.
> >
> > Note that even the above solution might not be perfect. Suppose there
> > exists one subtask of the two-phase commit operator has isBacklog=false,
> > but every other subtasks of this operator has isBacklog=true, due to load
> > imbalance. In this case, it might be beneficial to use the long
> checkpoint
> > interval to improve the average data freshness for this operator.
> However,
> > as we get into more edge case, the solution will become more complicated
> > (e.g. providing more APIs for user to specify their intended strategy)
> and
> > there will be less additional benefits (because these scenarios are less
> > common).
> >
> > Also, note that we can support the solution described above without
> > throwing away any public API currently proposed in FLIP-309. More
> > specifically, we still
> > need execution.checkpointing.interval-during-backlog. Sources such as
> > HybridSource and MySQL CDC source can still use setIsProcessingBacklog()
> to
> > specify the backlog status. We just need to update
> setIsProcessingBacklog()
> > to emit RecordAttributes(isBacklog=..) upon invocation.
> >
> > I hope the above solution is reasonable and can address most of your
> > concerns. And I hope we can use FLIP-309 to solve a large chunk of the
> > existing problems in Flink 1.18 release and make further improvements in
> > followup FLIPs. What do you think?
> >
> >
> >
> > > 2. ===============
> > >
> > > Regarding the current proposal, there might be a way to make it
> actually
> > > somehow generic (but not pluggable). But it might require slightly
> > > different
> > > interfaces. We could keep the idea that
> SourceCoordinator/SplitEnumerator
> > > is responsible for switching between slow/fast processing modes. It
> could
> > > be
> > > implemented to achieve something like in the FLIP-309 proposal, but
> apart
> > > of that, the default behaviour would be a built in mechanism working
> like
> > > this:
> > > 1. Every SourceReaderBase checks its metrics and its state, to decide
> if
> > it
> > > considers itself as "processingBacklog" or "veryBackpressured". The
> base
> > >     implementation could do it via a similar mechanism as I was
> proposing
> > > previously, via looking at the busy/backPressuredTimeMsPerSecond,
> > >     pendingRecords and processing rate.
> > > 2. SourceReaderBase could send an event with
> > > "processingBacklog"/"veryBackpressured" state.
> > > 3. SourceCoordinator would collect those events, and decide what should
> > it
> > > do, whether it should switch whole source to the
> > >     "processingBacklog"/"veryBackpressured" state or not.
> > >
> > That could provide eventually a generic solution that works fo every
> > > source that reports the required metrics. Each source implementation
> > could
> > > decide
> > > whether to use that default behaviour, or if maybe it's better to
> > override
> > > the default, or combine default with something custom (like
> > HybridSource).
> > >
> > > And as a first step, we could implement that mechanism only on the
> > > SourceCoordinator side, without events, without the default generic
> > > solution and use
> > > it in the HybridSource/MySQL CDC.
> > >
> > > This approach has some advantages compared to my previous proposal:
> > >   + no need to tinker with metrics and pushing metrics from TMs to JM
> > >   + somehow communicating this information via Events seems a bit
> cleaner
> > > to me and avoids problems with freshness of the metrics
> > > And some issues:
> > >   - I don't know if it can be made pluggable in the future. If a user
> > could
> > > implement a custom `CheckpointTrigger` that would automatically work
> with
> > > all/most
> > >     of the pre-existing sources?
> > >   - I don't know if it can be expanded if needed in the future, to make
> > > decisions based on operators in the middle of a jobgraph.
> > >
> >
> > Thanks for the proposal. Overall, I agree it is valuable to be able to
> > determine the isProcessingBacklog based on the source reader metrics.
> >
> > I will probably suggest making the following changes upon your idea:
> > - Instead of letting the source reader send events to the source
> > coordinator, the source reader can emit RecordAttributes(isBacklog=..) as
> > described earlier. We will let two-phase commit operator to decide
> whether
> > they need the short checkpoint interval.
> > - We consider isProcessingBacklog=true when watermarkLag is larger than a
> > threshold.
> >
> > This is a nice addition. But I think we still need extra information from
> > user (e.g. the threshold whether the watermarkLag or
> > backPressuredTimeMsPerSecond is too high) with extra public APIs for this
> > feature to work reliably. This is because there is no default algorithm
> > that works in all cases without extra specification from users, due to
> the
> > issues around the default algorithm we discussed previously.
> >
> > Overall, I think the current proposal in FLIP-309 is a first step towards
> > addressing these problems. The API for source enumerator to explicitly
> set
> > isProcessingBacklog based on its status is useful even if we can support
> > metrics-based solutions.
> >
> > If that looks reasonable, can we agree to make incremental improvement
> and
> > work on the metrics-based solution in a followup FLIP?
> >
> >
> > >
> > > 3. ===============
> > >
> > > Independent of that, during some brainstorming between me, Chesnay and
> > > Stefan Richter, an idea popped up, that I think could be a counter
> > proposal
> > > as
> > > an intermediate solution that probably effectively works the same way
> as
> > > current FLIP-309.
> > >
> > > Inside a HybridSource, from it's SplitEnumerator#snapshotState method,
> > can
> > > not you throw an exception like
> > > `new CheckpointException(TOO_MANY_CHECKPOINT_REQUESTS)` or `new
> > > CheckpointException(TRIGGER_CHECKPOINT_FAILURE)`?
> > > Actually we could also introduce a dedicated `CheckpointFailureReason`
> > for
> > > that purpose and handle it some special way in some places (like maybe
> > hide
> > > such rejected checkpoints from the REST API/WebUI). We could elaborate
> on
> > > this a bit more, but after a brief thinking  I could see it actually
> > > working well
> > > enough without any public facing changes. But I might be wrong here.
> > >
> > > If this feature actually grabs traction, we could expand it to
> something
> > > more sophisticated available via a public API in the future.
> > >
> >
> > In the target use-case, user still want to do checkpoint (though at a
> > larger interval) when there is backlog. And HybridSource need to know the
> > expected checkpoint interval during backlog in order to determine whether
> > it should keep throwing CheckpointException. Thus, we still need to add
> > execution.checkpointing.interval-during-backlog for user to specify this
> > information.
> >
> > It seems that the only benefit of this approach is to avoid
> > adding SplitEnumeratorContext#setIsProcessingBacklog.
> >
> > The downside of this approach is that it is hard to enforce the
> > semantics specified by execution.checkpointing.interval-during-backlog.
> For
> > example, suppose execution.checkpointing.interval =3 minute and
> > execution.checkpointing.interval-during-backlog = 7 minutes. During the
> > backlog phase, checkpoint coordinator will still trigger the checkpoint
> > once every 3 minutes. HybridSource will need to reject 2 out of the 3
> > checkpoint invocation, and the effective checkpoint interval will be 9
> > minutes.
> >
> > Overall, I think the solution is a bit hacky. I think it is preferred to
> > throw exception only when there is indeed error. If we don't need to
> check
> > a checkpoint, it is preferred to not trigger the checkpoint in the first
> > place. And I think adding SplitEnumeratorContext#setIsProcessingBacklog
> is
> > probably not that much of a big deal.
> >
> > Thanks for all the comments. I am looking forward to your thoughts.
> >
> > Best,
> > Dong
> >
> >
> > >
> > > ===============
> > >
> > > Sorry for disturbing this FLIP discussion and voting.
> > >
> > > Best,
> > > Piotrek
> > >
> > > czw., 29 cze 2023 o 05:08 feng xiangyu <xiangyu...@gmail.com>
> > napisał(a):
> > >
> > > > Hi Dong,
> > > >
> > > > Thanks for your quick reply. I think this has truly solved our
> problem
> > > and
> > > > will enable us upgrade our existing jobs more seamless.
> > > >
> > > > Best,
> > > > Xiangyu
> > > >
> > > > Dong Lin <lindon...@gmail.com> 于2023年6月29日周四 10:50写道:
> > > >
> > > > > Hi Feng,
> > > > >
> > > > > Thanks for the feedback. Yes, you can configure the
> > > > > execution.checkpointing.interval-during-backlog to effectively
> > disable
> > > > > checkpoint during backlog.
> > > > >
> > > > > Prior to your comment, the FLIP allows users to do this by setting
> > the
> > > > > config value to something large (e.g. 365 day). After thinking
> about
> > > this
> > > > > more, we think it is more usable to allow users to achieve this
> goal
> > by
> > > > > setting the config value to 0. This is consistent with the existing
> > > > > behavior of execution.checkpointing.interval -- the checkpoint is
> > > > disabled
> > > > > if user set execution.checkpointing.interval to 0.
> > > > >
> > > > > We have updated the description of
> > > > > execution.checkpointing.interval-during-backlog
> > > > > to say the following:
> > > > > ... it is not null, the value must either be 0, which means the
> > > > checkpoint
> > > > > is disabled during backlog, or be larger than or equal to
> > > > > execution.checkpointing.interval.
> > > > >
> > > > > Does this address your need?
> > > > >
> > > > > Best,
> > > > > Dong
> > > > >
> > > > >
> > > > >
> > > > > On Thu, Jun 29, 2023 at 9:23 AM feng xiangyu <xiangyu...@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > Hi Dong and Yunfeng,
> > > > > >
> > > > > > Thanks for the proposal, your flip sounds very useful from my
> > > > > perspective.
> > > > > > In our business, when we using hybrid source in production we
> also
> > > met
> > > > > the
> > > > > > problem described in your flip.
> > > > > > In our solution, we tend to skip making any checkpoints before
> all
> > > > batch
> > > > > > tasks have finished and resume the periodic checkpoint only in
> > > > streaming
> > > > > > phrase. Within this flip, we can solve our problem in a more
> > generic
> > > > way.
> > > > > >
> > > > > > However, I am wondering if we still want to skip making any
> > > checkpoints
> > > > > > during historical phrase, can we set this configuration
> > > > > > "execution.checkpointing.interval-during-backlog" equals "-1" to
> > > cover
> > > > > this
> > > > > > case?
> > > > > >
> > > > > > Best,
> > > > > > Xiangyu
> > > > > >
> > > > > > Hang Ruan <ruanhang1...@gmail.com> 于2023年6月28日周三 16:30写道:
> > > > > >
> > > > > > > Thanks for Dong and Yunfeng's work.
> > > > > > >
> > > > > > > The FLIP looks good to me. This new version is clearer to
> > > understand.
> > > > > > >
> > > > > > > Best,
> > > > > > > Hang
> > > > > > >
> > > > > > > Dong Lin <lindon...@gmail.com> 于2023年6月27日周二 16:53写道:
> > > > > > >
> > > > > > > > Thanks Jack, Jingsong, and Zhu for the review!
> > > > > > > >
> > > > > > > > Thanks Zhu for the suggestion. I have updated the
> configuration
> > > > name
> > > > > as
> > > > > > > > suggested.
> > > > > > > >
> > > > > > > > On Tue, Jun 27, 2023 at 4:45 PM Zhu Zhu <reed...@gmail.com>
> > > wrote:
> > > > > > > >
> > > > > > > > > Thanks Dong and Yunfeng for creating this FLIP and driving
> > this
> > > > > > > > discussion.
> > > > > > > > >
> > > > > > > > > The new design looks generally good to me. Increasing the
> > > > > checkpoint
> > > > > > > > > interval when the job is processing backlogs is easier for
> > > users
> > > > to
> > > > > > > > > understand and can help in more scenarios.
> > > > > > > > >
> > > > > > > > > I have one comment about the new configuration.
> > > > > > > > > Naming the new configuration
> > > > > > > > > "execution.checkpointing.interval-during-backlog" would be
> > > better
> > > > > > > > > according to Flink config naming convention.
> > > > > > > > > It is also because that nested config keys should be
> avoided.
> > > See
> > > > > > > > > FLINK-29372 for more details.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Zhu
> > > > > > > > >
> > > > > > > > > Jingsong Li <jingsongl...@gmail.com> 于2023年6月27日周二
> 15:45写道:
> > > > > > > > > >
> > > > > > > > > > Looks good to me!
> > > > > > > > > >
> > > > > > > > > > Thanks Dong, Yunfeng and all for your discussion and
> > design.
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Jingsong
> > > > > > > > > >
> > > > > > > > > > On Tue, Jun 27, 2023 at 3:35 PM Jark Wu <
> imj...@gmail.com>
> > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > Thank you Dong for driving this FLIP.
> > > > > > > > > > >
> > > > > > > > > > > The new design looks good to me!
> > > > > > > > > > >
> > > > > > > > > > > Best,
> > > > > > > > > > > Jark
> > > > > > > > > > >
> > > > > > > > > > > > 2023年6月27日 14:38,Dong Lin <lindon...@gmail.com> 写道:
> > > > > > > > > > > >
> > > > > > > > > > > > Thank you Leonard for the review!
> > > > > > > > > > > >
> > > > > > > > > > > > Hi Piotr, do you have any comments on the latest
> > > proposal?
> > > > > > > > > > > >
> > > > > > > > > > > > I am wondering if it is OK to start the voting thread
> > > this
> > > > > > week.
> > > > > > > > > > > >
> > > > > > > > > > > > On Mon, Jun 26, 2023 at 4:10 PM Leonard Xu <
> > > > > xbjt...@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > >> Thanks Dong for driving this FLIP forward!
> > > > > > > > > > > >>
> > > > > > > > > > > >> Introducing  `backlog status` concept for flink job
> > > makes
> > > > > > sense
> > > > > > > to
> > > > > > > > > me as
> > > > > > > > > > > >> following reasons:
> > > > > > > > > > > >>
> > > > > > > > > > > >> From concept/API design perspective, it’s more
> general
> > > and
> > > > > > > natural
> > > > > > > > > than
> > > > > > > > > > > >> above proposals as it can be used in HybridSource
> for
> > > > > bounded
> > > > > > > > > records, CDC
> > > > > > > > > > > >> Source for history snapshot and general sources like
> > > > > > KafkaSource
> > > > > > > > for
> > > > > > > > > > > >> historical messages.
> > > > > > > > > > > >>
> > > > > > > > > > > >> From user cases/requirements, I’ve seen many users
> > > > manually
> > > > > to
> > > > > > > set
> > > > > > > > > larger
> > > > > > > > > > > >> checkpoint interval during backfilling and then set
> a
> > > > > shorter
> > > > > > > > > checkpoint
> > > > > > > > > > > >> interval for real-time processing in their
> production
> > > > > > > environments
> > > > > > > > > as a
> > > > > > > > > > > >> flink application optimization. Now, the flink
> > framework
> > > > can
> > > > > > > make
> > > > > > > > > this
> > > > > > > > > > > >> optimization no longer require the user to set the
> > > > > checkpoint
> > > > > > > > > interval and
> > > > > > > > > > > >> restart the job multiple times.
> > > > > > > > > > > >>
> > > > > > > > > > > >> Following supporting using larger checkpoint for job
> > > under
> > > > > > > backlog
> > > > > > > > > status
> > > > > > > > > > > >> in current FLIP, we can explore supporting larger
> > > > > > > > > parallelism/memory/cpu
> > > > > > > > > > > >> for job under backlog status in the future.
> > > > > > > > > > > >>
> > > > > > > > > > > >> In short, the updated FLIP looks good to me.
> > > > > > > > > > > >>
> > > > > > > > > > > >>
> > > > > > > > > > > >> Best,
> > > > > > > > > > > >> Leonard
> > > > > > > > > > > >>
> > > > > > > > > > > >>
> > > > > > > > > > > >>> On Jun 22, 2023, at 12:07 PM, Dong Lin <
> > > > > lindon...@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > > > > >>>
> > > > > > > > > > > >>> Hi Piotr,
> > > > > > > > > > > >>>
> > > > > > > > > > > >>> Thanks again for proposing the isProcessingBacklog
> > > > concept.
> > > > > > > > > > > >>>
> > > > > > > > > > > >>> After discussing with Becket Qin and thinking about
> > > this
> > > > > > more,
> > > > > > > I
> > > > > > > > > agree it
> > > > > > > > > > > >>> is a better idea to add a top-level concept to all
> > > source
> > > > > > > > > operators to
> > > > > > > > > > > >>> address the target use-case.
> > > > > > > > > > > >>>
> > > > > > > > > > > >>> The main reason that changed my mind is that
> > > > > > > isProcessingBacklog
> > > > > > > > > can be
> > > > > > > > > > > >>> described as an inherent/nature attribute of every
> > > source
> > > > > > > > instance
> > > > > > > > > and
> > > > > > > > > > > >> its
> > > > > > > > > > > >>> semantics does not need to depend on any specific
> > > > > > checkpointing
> > > > > > > > > policy.
> > > > > > > > > > > >>> Also, we can hardcode the isProcessingBacklog
> > behavior
> > > > for
> > > > > > the
> > > > > > > > > sources we
> > > > > > > > > > > >>> have considered so far (e.g. HybridSource and MySQL
> > CDC
> > > > > > source)
> > > > > > > > > without
> > > > > > > > > > > >>> asking users to explicitly configure the per-source
> > > > > behavior,
> > > > > > > > which
> > > > > > > > > > > >> indeed
> > > > > > > > > > > >>> provides better user experience.
> > > > > > > > > > > >>>
> > > > > > > > > > > >>> I have updated the FLIP based on the latest
> > > suggestions.
> > > > > The
> > > > > > > > > latest FLIP
> > > > > > > > > > > >> no
> > > > > > > > > > > >>> longer introduces per-source config that can be
> used
> > by
> > > > > > > > end-users.
> > > > > > > > > While
> > > > > > > > > > > >> I
> > > > > > > > > > > >>> agree with you that CheckpointTrigger can be a
> useful
> > > > > feature
> > > > > > > to
> > > > > > > > > address
> > > > > > > > > > > >>> additional use-cases, I am not sure it is necessary
> > for
> > > > the
> > > > > > > > > use-case
> > > > > > > > > > > >>> targeted by FLIP-309. Maybe we can introduce
> > > > > > CheckpointTrigger
> > > > > > > > > separately
> > > > > > > > > > > >>> in another FLIP?
> > > > > > > > > > > >>>
> > > > > > > > > > > >>> Can you help take another look at the updated FLIP?
> > > > > > > > > > > >>>
> > > > > > > > > > > >>> Best,
> > > > > > > > > > > >>> Dong
> > > > > > > > > > > >>>
> > > > > > > > > > > >>>
> > > > > > > > > > > >>>
> > > > > > > > > > > >>> On Fri, Jun 16, 2023 at 11:59 PM Piotr Nowojski <
> > > > > > > > > pnowoj...@apache.org>
> > > > > > > > > > > >>> wrote:
> > > > > > > > > > > >>>
> > > > > > > > > > > >>>> Hi Dong,
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>>> Suppose there are 1000 subtask and each subtask
> has
> > > 1%
> > > > > > chance
> > > > > > > > of
> > > > > > > > > being
> > > > > > > > > > > >>>>> "backpressured" at a given time (due to random
> > > traffic
> > > > > > > spikes).
> > > > > > > > > Then at
> > > > > > > > > > > >>>> any
> > > > > > > > > > > >>>>> given time, the chance of the job
> > > > > > > > > > > >>>>> being considered not-backpressured =
> (1-0.01)^1000.
> > > > Since
> > > > > > we
> > > > > > > > > evaluate
> > > > > > > > > > > >> the
> > > > > > > > > > > >>>>> backpressure metric once a second, the estimated
> > time
> > > > for
> > > > > > the
> > > > > > > > job
> > > > > > > > > > > >>>>> to be considered not-backpressured is roughly 1 /
> > > > > > > > > ((1-0.01)^1000) =
> > > > > > > > > > > >> 23163
> > > > > > > > > > > >>>>> sec = 6.4 hours.
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> This means that the job will effectively always
> use
> > > the
> > > > > > > longer
> > > > > > > > > > > >>>>> checkpointing interval. It looks like a real
> > concern,
> > > > > > right?
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>> Sorry I don't understand where you are getting
> those
> > > > > numbers
> > > > > > > > from.
> > > > > > > > > > > >>>> Instead of trying to find loophole after loophole,
> > > could
> > > > > you
> > > > > > > try
> > > > > > > > > to
> > > > > > > > > > > >> think
> > > > > > > > > > > >>>> how a given loophole could be improved/solved?
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>>> Hmm... I honestly think it will be useful to know
> > the
> > > > > APIs
> > > > > > > due
> > > > > > > > > to the
> > > > > > > > > > > >>>>> following reasons.
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>> Please propose something. I don't think it's
> needed.
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>>> - For the use-case mentioned in FLIP-309
> motivation
> > > > > > section,
> > > > > > > > > would the
> > > > > > > > > > > >>>> APIs
> > > > > > > > > > > >>>>> of this alternative approach be more or less
> > usable?
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>> Everything that you originally wanted to achieve
> in
> > > > > > FLIP-309,
> > > > > > > > you
> > > > > > > > > could
> > > > > > > > > > > >> do
> > > > > > > > > > > >>>> as well in my proposal.
> > > > > > > > > > > >>>> Vide my many mentions of the "hacky solution".
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>>> - Can these APIs reliably address the extra
> > use-case
> > > > > (e.g.
> > > > > > > > allow
> > > > > > > > > > > >>>>> checkpointing interval to change dynamically even
> > > > during
> > > > > > the
> > > > > > > > > unbounded
> > > > > > > > > > > >>>>> phase) as it claims?
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>> I don't see why not.
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>>> - Can these APIs be decoupled from the APIs
> > currently
> > > > > > > proposed
> > > > > > > > in
> > > > > > > > > > > >>>> FLIP-309?
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>> Yes
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>>> For example, if the APIs of this alternative
> > approach
> > > > can
> > > > > > be
> > > > > > > > > decoupled
> > > > > > > > > > > >>>> from
> > > > > > > > > > > >>>>> the APIs currently proposed in FLIP-309, then it
> > > might
> > > > be
> > > > > > > > > reasonable to
> > > > > > > > > > > >>>>> work on this extra use-case with a more
> > > > > > advanced/complicated
> > > > > > > > > design
> > > > > > > > > > > >>>>> separately in a followup work.
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>> As I voiced my concerns previously, the current
> > design
> > > > of
> > > > > > > > > FLIP-309 would
> > > > > > > > > > > >>>> clog the public API and in the long run confuse
> the
> > > > users.
> > > > > > IMO
> > > > > > > > > It's
> > > > > > > > > > > >>>> addressing the
> > > > > > > > > > > >>>> problem in the wrong place.
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>>> Hmm.. do you mean we can do the following:
> > > > > > > > > > > >>>>> - Have all source operators emit a metric named
> > > > > > > > > "processingBacklog".
> > > > > > > > > > > >>>>> - Add a job-level config that specifies "the
> > > > > checkpointing
> > > > > > > > > interval to
> > > > > > > > > > > >> be
> > > > > > > > > > > >>>>> used when any source is processing backlog".
> > > > > > > > > > > >>>>> - The JM collects the "processingBacklog"
> > > periodically
> > > > > from
> > > > > > > all
> > > > > > > > > source
> > > > > > > > > > > >>>>> operators and uses the newly added config value
> as
> > > > > > > appropriate.
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>> Yes.
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>>> The challenge with this approach is that we need
> to
> > > > > define
> > > > > > > the
> > > > > > > > > > > >> semantics
> > > > > > > > > > > >>>> of
> > > > > > > > > > > >>>>> this "processingBacklog" metric and have all
> source
> > > > > > operators
> > > > > > > > > > > >>>>> implement this metric. I am not sure we are able
> to
> > > do
> > > > > this
> > > > > > > yet
> > > > > > > > > without
> > > > > > > > > > > >>>>> having users explicitly provide this information
> > on a
> > > > > > > > per-source
> > > > > > > > > basis.
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> Suppose the job read from a bounded Kafka source,
> > > > should
> > > > > it
> > > > > > > > emit
> > > > > > > > > > > >>>>> "processingBacklog=true"? If yes, then the job
> > might
> > > > use
> > > > > > long
> > > > > > > > > > > >>>> checkpointing
> > > > > > > > > > > >>>>> interval even
> > > > > > > > > > > >>>>> if the job is asked to process data starting from
> > now
> > > > to
> > > > > > the
> > > > > > > > > next 1
> > > > > > > > > > > >> hour.
> > > > > > > > > > > >>>>> If no, then the job might use the short
> > checkpointing
> > > > > > > interval
> > > > > > > > > > > >>>>> even if the job is asked to re-process data
> > starting
> > > > > from 7
> > > > > > > > days
> > > > > > > > > ago.
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>> Yes. The same can be said of your proposal. Your
> > > > proposal
> > > > > > has
> > > > > > > > the
> > > > > > > > > very
> > > > > > > > > > > >> same
> > > > > > > > > > > >>>> issues
> > > > > > > > > > > >>>> that every source would have to implement it
> > > > differently,
> > > > > > most
> > > > > > > > > sources
> > > > > > > > > > > >>>> would
> > > > > > > > > > > >>>> have no idea how to properly calculate the new
> > > requested
> > > > > > > > > checkpoint
> > > > > > > > > > > >>>> interval,
> > > > > > > > > > > >>>> for those that do know how to do that, user would
> > have
> > > > to
> > > > > > > > > configure
> > > > > > > > > > > >> every
> > > > > > > > > > > >>>> source
> > > > > > > > > > > >>>> individually and yet again we would end up with a
> > > > system,
> > > > > > that
> > > > > > > > > works
> > > > > > > > > > > >> only
> > > > > > > > > > > >>>> partially in
> > > > > > > > > > > >>>> some special use cases (HybridSource), that's
> > > confusing
> > > > > the
> > > > > > > > users
> > > > > > > > > even
> > > > > > > > > > > >>>> more.
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>> That's why I think the more generic solution,
> > working
> > > > > > > primarily
> > > > > > > > > on the
> > > > > > > > > > > >> same
> > > > > > > > > > > >>>> metrics that are used by various auto scaling
> > > solutions
> > > > > > (like
> > > > > > > > > Flink K8s
> > > > > > > > > > > >>>> operator's
> > > > > > > > > > > >>>> autosaler) would be better. The hacky solution I
> > > > proposed
> > > > > > to:
> > > > > > > > > > > >>>> 1. show you that the generic solution is simply a
> > > > superset
> > > > > > of
> > > > > > > > your
> > > > > > > > > > > >> proposal
> > > > > > > > > > > >>>> 2. if you are adamant that
> > > > busyness/backpressured/records
> > > > > > > > > processing
> > > > > > > > > > > >>>> rate/pending records
> > > > > > > > > > > >>>>   metrics wouldn't cover your use case
> sufficiently
> > > (imo
> > > > > > they
> > > > > > > > > can),
> > > > > > > > > > > >> then
> > > > > > > > > > > >>>> you can very easily
> > > > > > > > > > > >>>>   enhance this algorithm with using some hints
> from
> > > the
> > > > > > > sources.
> > > > > > > > > Like
> > > > > > > > > > > >>>> "processingBacklog==true"
> > > > > > > > > > > >>>>   to short circuit the main algorithm, if
> > > > > > `processingBacklog`
> > > > > > > is
> > > > > > > > > > > >>>> available.
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>> Best,
> > > > > > > > > > > >>>> Piotrek
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>> pt., 16 cze 2023 o 04:45 Dong Lin <
> > > lindon...@gmail.com>
> > > > > > > > > napisał(a):
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>>> Hi again Piotr,
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> Thank you for the reply. Please see my reply
> > inline.
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> On Fri, Jun 16, 2023 at 12:11 AM Piotr Nowojski <
> > > > > > > > > > > >>>> piotr.nowoj...@gmail.com>
> > > > > > > > > > > >>>>> wrote:
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>>> Hi again Dong,
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>>> I understand that JM will get the
> > > > backpressure-related
> > > > > > > > metrics
> > > > > > > > > every
> > > > > > > > > > > >>>>> time
> > > > > > > > > > > >>>>>>> the RestServerEndpoint receives the REST
> request
> > to
> > > > get
> > > > > > > these
> > > > > > > > > > > >>>> metrics.
> > > > > > > > > > > >>>>>> But
> > > > > > > > > > > >>>>>>> I am not sure if RestServerEndpoint is already
> > > always
> > > > > > > > > receiving the
> > > > > > > > > > > >>>>> REST
> > > > > > > > > > > >>>>>>> metrics at regular interval (suppose there is
> no
> > > > human
> > > > > > > > manually
> > > > > > > > > > > >>>>>>> opening/clicking the Flink Web UI). And if it
> > does,
> > > > > what
> > > > > > is
> > > > > > > > the
> > > > > > > > > > > >>>>> interval?
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>> Good catch, I've thought that metrics are
> > > > pre-emptively
> > > > > > sent
> > > > > > > > to
> > > > > > > > > JM
> > > > > > > > > > > >>>> every
> > > > > > > > > > > >>>>> 10
> > > > > > > > > > > >>>>>> seconds.
> > > > > > > > > > > >>>>>> Indeed that's not the case at the moment, and
> that
> > > > would
> > > > > > > have
> > > > > > > > > to be
> > > > > > > > > > > >>>>>> improved.
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>>> I would be surprised if Flink is already paying
> > > this
> > > > > much
> > > > > > > > > overhead
> > > > > > > > > > > >>>> just
> > > > > > > > > > > >>>>>> for
> > > > > > > > > > > >>>>>>> metrics monitoring. That is the main reason I
> > still
> > > > > doubt
> > > > > > > it
> > > > > > > > > is true.
> > > > > > > > > > > >>>>> Can
> > > > > > > > > > > >>>>>>> you show where this 100 ms is currently
> > configured?
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> Alternatively, maybe you mean that we should
> add
> > > > extra
> > > > > > code
> > > > > > > > to
> > > > > > > > > invoke
> > > > > > > > > > > >>>>> the
> > > > > > > > > > > >>>>>>> REST API at 100 ms interval. Then that means we
> > > need
> > > > to
> > > > > > > > > considerably
> > > > > > > > > > > >>>>>>> increase the network/cpu overhead at JM, where
> > the
> > > > > > overhead
> > > > > > > > > will
> > > > > > > > > > > >>>>> increase
> > > > > > > > > > > >>>>>>> as the number of TM/slots increase, which may
> > pose
> > > > risk
> > > > > > to
> > > > > > > > the
> > > > > > > > > > > >>>>>> scalability
> > > > > > > > > > > >>>>>>> of the proposed design. I am not sure we should
> > do
> > > > > this.
> > > > > > > What
> > > > > > > > > do you
> > > > > > > > > > > >>>>>> think?
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>> Sorry. I didn't mean metric should be reported
> > every
> > > > > > 100ms.
> > > > > > > I
> > > > > > > > > meant
> > > > > > > > > > > >>>> that
> > > > > > > > > > > >>>>>> "backPressuredTimeMsPerSecond (metric) would
> > report
> > > (a
> > > > > > value
> > > > > > > > of)
> > > > > > > > > > > >>>>> 100ms/s."
> > > > > > > > > > > >>>>>> once per metric interval (10s?).
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> Suppose there are 1000 subtask and each subtask
> has
> > > 1%
> > > > > > chance
> > > > > > > > of
> > > > > > > > > being
> > > > > > > > > > > >>>>> "backpressured" at a given time (due to random
> > > traffic
> > > > > > > spikes).
> > > > > > > > > Then at
> > > > > > > > > > > >>>> any
> > > > > > > > > > > >>>>> given time, the chance of the job
> > > > > > > > > > > >>>>> being considered not-backpressured =
> (1-0.01)^1000.
> > > > Since
> > > > > > we
> > > > > > > > > evaluate
> > > > > > > > > > > >> the
> > > > > > > > > > > >>>>> backpressure metric once a second, the estimated
> > time
> > > > for
> > > > > > the
> > > > > > > > job
> > > > > > > > > > > >>>>> to be considered not-backpressured is roughly 1 /
> > > > > > > > > ((1-0.01)^1000) =
> > > > > > > > > > > >> 23163
> > > > > > > > > > > >>>>> sec = 6.4 hours.
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> This means that the job will effectively always
> use
> > > the
> > > > > > > longer
> > > > > > > > > > > >>>>> checkpointing interval. It looks like a real
> > concern,
> > > > > > right?
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>>>> - What is the interface of this
> > CheckpointTrigger?
> > > > For
> > > > > > > > > example, are
> > > > > > > > > > > >>>> we
> > > > > > > > > > > >>>>>>> going to give CheckpointTrigger a context that
> it
> > > can
> > > > > use
> > > > > > > to
> > > > > > > > > fetch
> > > > > > > > > > > >>>>>>> arbitrary metric values? This can help us
> > > understand
> > > > > what
> > > > > > > > > information
> > > > > > > > > > > >>>>>> this
> > > > > > > > > > > >>>>>>> user-defined CheckpointTrigger can use to make
> > the
> > > > > > > checkpoint
> > > > > > > > > > > >>>> decision.
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>> I honestly don't think this is important at this
> > > stage
> > > > > of
> > > > > > > the
> > > > > > > > > > > >>>> discussion.
> > > > > > > > > > > >>>>>> It could have
> > > > > > > > > > > >>>>>> whatever interface we would deem to be best.
> > > Required
> > > > > > > things:
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>> - access to at least a subset of metrics that
> the
> > > > given
> > > > > > > > > > > >>>>> `CheckpointTrigger`
> > > > > > > > > > > >>>>>> requests,
> > > > > > > > > > > >>>>>> for example via some registration mechanism, so
> we
> > > > don't
> > > > > > > have
> > > > > > > > to
> > > > > > > > > > > >>>> fetch
> > > > > > > > > > > >>>>>> all of the
> > > > > > > > > > > >>>>>> metrics all the time from TMs.
> > > > > > > > > > > >>>>>> - some way to influence `CheckpointCoordinator`.
> > > > Either
> > > > > > via
> > > > > > > > > manually
> > > > > > > > > > > >>>>>> triggering
> > > > > > > > > > > >>>>>> checkpoints, and/or ability to change the
> > > > checkpointing
> > > > > > > > > interval.
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> Hmm... I honestly think it will be useful to know
> > the
> > > > > APIs
> > > > > > > due
> > > > > > > > > to the
> > > > > > > > > > > >>>>> following reasons.
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> We would need to know the concrete APIs to gauge
> > the
> > > > > > > following:
> > > > > > > > > > > >>>>> - For the use-case mentioned in FLIP-309
> motivation
> > > > > > section,
> > > > > > > > > would the
> > > > > > > > > > > >>>> APIs
> > > > > > > > > > > >>>>> of this alternative approach be more or less
> > usable?
> > > > > > > > > > > >>>>> - Can these APIs reliably address the extra
> > use-case
> > > > > (e.g.
> > > > > > > > allow
> > > > > > > > > > > >>>>> checkpointing interval to change dynamically even
> > > > during
> > > > > > the
> > > > > > > > > unbounded
> > > > > > > > > > > >>>>> phase) as it claims?
> > > > > > > > > > > >>>>> - Can these APIs be decoupled from the APIs
> > currently
> > > > > > > proposed
> > > > > > > > in
> > > > > > > > > > > >>>> FLIP-309?
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> For example, if the APIs of this alternative
> > approach
> > > > can
> > > > > > be
> > > > > > > > > decoupled
> > > > > > > > > > > >>>> from
> > > > > > > > > > > >>>>> the APIs currently proposed in FLIP-309, then it
> > > might
> > > > be
> > > > > > > > > reasonable to
> > > > > > > > > > > >>>>> work on this extra use-case with a more
> > > > > > advanced/complicated
> > > > > > > > > design
> > > > > > > > > > > >>>>> separately in a followup work.
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>>>> - Where is this CheckpointTrigger running? For
> > > > example,
> > > > > > is
> > > > > > > it
> > > > > > > > > going
> > > > > > > > > > > >>>> to
> > > > > > > > > > > >>>>>> run
> > > > > > > > > > > >>>>>>> on the subtask of every source operator? Or is
> it
> > > > going
> > > > > > to
> > > > > > > > run
> > > > > > > > > on the
> > > > > > > > > > > >>>>> JM?
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>> IMO on the JM.
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>>> - Are we going to provide a default
> > implementation
> > > of
> > > > > > this
> > > > > > > > > > > >>>>>>> CheckpointTrigger in Flink that implements the
> > > > > algorithm
> > > > > > > > > described
> > > > > > > > > > > >>>>> below,
> > > > > > > > > > > >>>>>>> or do we expect each source operator developer
> to
> > > > > > implement
> > > > > > > > > their own
> > > > > > > > > > > >>>>>>> CheckpointTrigger?
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>> As I mentioned before, I think we should provide
> > at
> > > > the
> > > > > > very
> > > > > > > > > least the
> > > > > > > > > > > >>>>>> implementation
> > > > > > > > > > > >>>>>> that replaces the current triggering mechanism
> > > > > (statically
> > > > > > > > > configured
> > > > > > > > > > > >>>>>> checkpointing interval)
> > > > > > > > > > > >>>>>> and it would be great to provide the
> backpressure
> > > > > > monitoring
> > > > > > > > > trigger
> > > > > > > > > > > >> as
> > > > > > > > > > > >>>>>> well.
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> I agree that if there is a good use-case that can
> > be
> > > > > > > addressed
> > > > > > > > > by the
> > > > > > > > > > > >>>>> proposed CheckpointTrigger, then it is reasonable
> > > > > > > > > > > >>>>> to add CheckpointTrigger and replace the current
> > > > > triggering
> > > > > > > > > mechanism
> > > > > > > > > > > >>>> with
> > > > > > > > > > > >>>>> it.
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> I also agree that we will likely find such a
> > > use-case.
> > > > > For
> > > > > > > > > example,
> > > > > > > > > > > >>>> suppose
> > > > > > > > > > > >>>>> the source records have event timestamps, then it
> > is
> > > > > likely
> > > > > > > > > > > >>>>> that we can use the trigger to dynamically
> control
> > > the
> > > > > > > > > checkpointing
> > > > > > > > > > > >>>>> interval based on the difference between the
> > > watermark
> > > > > and
> > > > > > > > > current
> > > > > > > > > > > >> system
> > > > > > > > > > > >>>>> time.
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> But I am not sure the addition of this
> > > > CheckpointTrigger
> > > > > > > should
> > > > > > > > > be
> > > > > > > > > > > >>>> coupled
> > > > > > > > > > > >>>>> with FLIP-309. Whether or not it is coupled
> > probably
> > > > > > depends
> > > > > > > on
> > > > > > > > > the
> > > > > > > > > > > >>>>> concrete API design around CheckpointTrigger.
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> If you would be adamant that the backpressure
> > > > monitoring
> > > > > > > > doesn't
> > > > > > > > > cover
> > > > > > > > > > > >>>> well
> > > > > > > > > > > >>>>>> enough your use case, I would be ok to provide
> the
> > > > hacky
> > > > > > > > > version that
> > > > > > > > > > > >> I
> > > > > > > > > > > >>>>>> also mentioned
> > > > > > > > > > > >>>>>> before:
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>>> """
> > > > > > > > > > > >>>>>> Especially that if my proposed algorithm
> wouldn't
> > > work
> > > > > > good
> > > > > > > > > enough,
> > > > > > > > > > > >>>> there
> > > > > > > > > > > >>>>>> is
> > > > > > > > > > > >>>>>> an obvious solution, that any source could add a
> > > > metric,
> > > > > > > like
> > > > > > > > > let say
> > > > > > > > > > > >>>>>> "processingBacklog: true/false", and the
> > > > > > `CheckpointTrigger`
> > > > > > > > > > > >>>>>> could use this as an override to always switch
> to
> > > the
> > > > > > > > > > > >>>>>> "slowCheckpointInterval". I don't think we need
> > it,
> > > > but
> > > > > > > that's
> > > > > > > > > always
> > > > > > > > > > > >>>> an
> > > > > > > > > > > >>>>>> option
> > > > > > > > > > > >>>>>> that would be basically equivalent to your
> > original
> > > > > > > proposal.
> > > > > > > > > > > >>>>>> """
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> Hmm.. do you mean we can do the following:
> > > > > > > > > > > >>>>> - Have all source operators emit a metric named
> > > > > > > > > "processingBacklog".
> > > > > > > > > > > >>>>> - Add a job-level config that specifies "the
> > > > > checkpointing
> > > > > > > > > interval to
> > > > > > > > > > > >> be
> > > > > > > > > > > >>>>> used when any source is processing backlog".
> > > > > > > > > > > >>>>> - The JM collects the "processingBacklog"
> > > periodically
> > > > > from
> > > > > > > all
> > > > > > > > > source
> > > > > > > > > > > >>>>> operators and uses the newly added config value
> as
> > > > > > > appropriate.
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> The challenge with this approach is that we need
> to
> > > > > define
> > > > > > > the
> > > > > > > > > > > >> semantics
> > > > > > > > > > > >>>> of
> > > > > > > > > > > >>>>> this "processingBacklog" metric and have all
> source
> > > > > > operators
> > > > > > > > > > > >>>>> implement this metric. I am not sure we are able
> to
> > > do
> > > > > this
> > > > > > > yet
> > > > > > > > > without
> > > > > > > > > > > >>>>> having users explicitly provide this information
> > on a
> > > > > > > > per-source
> > > > > > > > > basis.
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> Suppose the job read from a bounded Kafka source,
> > > > should
> > > > > it
> > > > > > > > emit
> > > > > > > > > > > >>>>> "processingBacklog=true"? If yes, then the job
> > might
> > > > use
> > > > > > long
> > > > > > > > > > > >>>> checkpointing
> > > > > > > > > > > >>>>> interval even
> > > > > > > > > > > >>>>> if the job is asked to process data starting from
> > now
> > > > to
> > > > > > the
> > > > > > > > > next 1
> > > > > > > > > > > >> hour.
> > > > > > > > > > > >>>>> If no, then the job might use the short
> > checkpointing
> > > > > > > interval
> > > > > > > > > > > >>>>> even if the job is asked to re-process data
> > starting
> > > > > from 7
> > > > > > > > days
> > > > > > > > > ago.
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>>> - How can users specify the
> > > > > > > > > > > >>>>>> fastCheckpointInterval/slowCheckpointInterval?
> > > > > > > > > > > >>>>>>> For example, will we provide APIs on the
> > > > > > CheckpointTrigger
> > > > > > > > that
> > > > > > > > > > > >>>>> end-users
> > > > > > > > > > > >>>>>>> can use to specify the checkpointing interval?
> > What
> > > > > would
> > > > > > > > that
> > > > > > > > > look
> > > > > > > > > > > >>>>> like?
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>> Also as I mentioned before, just like metric
> > > reporters
> > > > > are
> > > > > > > > > configured:
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/metric_reporters/
> > > > > > > > > > > >>>>>> Every CheckpointTrigger could have its own
> custom
> > > > > > > > configuration.
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>>> Overall, my gut feel is that the alternative
> > > approach
> > > > > > based
> > > > > > > > on
> > > > > > > > > > > >>>>>>> CheckpointTrigger is more complicated
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>> Yes, as usual, more generic things are more
> > > > complicated,
> > > > > > but
> > > > > > > > > often
> > > > > > > > > > > >> more
> > > > > > > > > > > >>>>>> useful in the long run.
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>>> and harder to use.
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>> I don't agree. Why setting in config
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>> execution.checkpointing.trigger:
> > > > > > > > > > > >>>> BackPressureMonitoringCheckpointTrigger
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> execution.checkpointing.BackPressureMonitoringCheckpointTrigger.fast-interval:
> > > > > > > > > > > >>>>>> 1s
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> execution.checkpointing.BackPressureMonitoringCheckpointTrigger.slow-interval:
> > > > > > > > > > > >>>>>> 30s
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>> that we could even provide a shortcut to the
> above
> > > > > > construct
> > > > > > > > > via:
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>> execution.checkpointing.fast-interval: 1s
> > > > > > > > > > > >>>>>> execution.checkpointing.slow-interval: 30s
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>> is harder compared to setting two/three
> checkpoint
> > > > > > > intervals,
> > > > > > > > > one in
> > > > > > > > > > > >>>> the
> > > > > > > > > > > >>>>>> config/or via `env.enableCheckpointing(x)`,
> > > > > > > > > > > >>>>>> secondly passing one/two (fast/slow) values on
> the
> > > > > source
> > > > > > > > > itself?
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> If we can address the use-case by providing just
> > the
> > > > two
> > > > > > > > > job-level
> > > > > > > > > > > >> config
> > > > > > > > > > > >>>>> as described above, I agree it will indeed be
> > > simpler.
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> I have tried to achieve this goal. But the caveat
> > is
> > > > that
> > > > > > it
> > > > > > > > > requires
> > > > > > > > > > > >>>> much
> > > > > > > > > > > >>>>> more work than described above in order to give
> the
> > > > > configs
> > > > > > > > > > > >> well-defined
> > > > > > > > > > > >>>>> semantics. So I find it simpler to just use the
> > > > approach
> > > > > in
> > > > > > > > > FLIP-309.
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> Let me explain my concern below. It will be great
> > if
> > > > you
> > > > > or
> > > > > > > > > someone
> > > > > > > > > > > >> else
> > > > > > > > > > > >>>>> can help provide a solution.
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> 1) We need to clearly document when the
> > fast-interval
> > > > and
> > > > > > > > > slow-interval
> > > > > > > > > > > >>>>> will be used so that users can derive the
> expected
> > > > > behavior
> > > > > > > of
> > > > > > > > > the job
> > > > > > > > > > > >>>> and
> > > > > > > > > > > >>>>> be able to config these values.
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> 2) The trigger of fast/slow interval depends on
> the
> > > > > > behavior
> > > > > > > of
> > > > > > > > > the
> > > > > > > > > > > >>>> source
> > > > > > > > > > > >>>>> (e.g. MySQL CDC, HybridSource). However, no
> > existing
> > > > > > concepts
> > > > > > > > of
> > > > > > > > > source
> > > > > > > > > > > >>>>> operator (e.g. boundedness) can describe the
> target
> > > > > > behavior.
> > > > > > > > For
> > > > > > > > > > > >>>> example,
> > > > > > > > > > > >>>>> MySQL CDC internally has two phases, namely
> > snapshot
> > > > > phase
> > > > > > > and
> > > > > > > > > binlog
> > > > > > > > > > > >>>>> phase, which are not explicitly exposed to its
> > users
> > > > via
> > > > > > > source
> > > > > > > > > > > >> operator
> > > > > > > > > > > >>>>> API. And we probably should not enumerate all
> > > internal
> > > > > > phases
> > > > > > > > of
> > > > > > > > > all
> > > > > > > > > > > >>>> source
> > > > > > > > > > > >>>>> operators that are affected by fast/slow
> interval.
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> 3) An alternative approach might be to define a
> new
> > > > > concept
> > > > > > > > (e.g.
> > > > > > > > > > > >>>>> processingBacklog) that is applied to all source
> > > > > operators.
> > > > > > > > Then
> > > > > > > > > the
> > > > > > > > > > > >>>>> fast/slow interval's documentation can depend on
> > this
> > > > > > > concept.
> > > > > > > > > That
> > > > > > > > > > > >> means
> > > > > > > > > > > >>>>> we have to add a top-level concept (similar to
> > source
> > > > > > > > > boundedness) and
> > > > > > > > > > > >>>>> require all source operators to specify how they
> > > > enforce
> > > > > > this
> > > > > > > > > concept
> > > > > > > > > > > >>>> (e.g.
> > > > > > > > > > > >>>>> FileSystemSource always emits
> > > processingBacklog=true).
> > > > > And
> > > > > > > > there
> > > > > > > > > might
> > > > > > > > > > > >> be
> > > > > > > > > > > >>>>> cases where the source itself (e.g. a bounded
> Kafka
> > > > > Source)
> > > > > > > can
> > > > > > > > > not
> > > > > > > > > > > >>>>> automatically derive the value of this concept,
> in
> > > > which
> > > > > > case
> > > > > > > > we
> > > > > > > > > need
> > > > > > > > > > > >> to
> > > > > > > > > > > >>>>> provide option for users to explicitly specify
> the
> > > > value
> > > > > > for
> > > > > > > > this
> > > > > > > > > > > >> concept
> > > > > > > > > > > >>>>> on a per-source basis.
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>>>> And it probably also has the issues of "having
> > two
> > > > > places
> > > > > > > to
> > > > > > > > > > > >>>> configure
> > > > > > > > > > > >>>>>> checkpointing
> > > > > > > > > > > >>>>>>> interval" and "giving flexibility for every
> > source
> > > to
> > > > > > > > > implement a
> > > > > > > > > > > >>>>>> different
> > > > > > > > > > > >>>>>>> API" (as mentioned below).
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>> No, it doesn't.
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>>> IMO, it is a hard-requirement for the
> user-facing
> > > API
> > > > > to
> > > > > > be
> > > > > > > > > > > >>>>>>> clearly defined and users should be able to use
> > the
> > > > API
> > > > > > > > without
> > > > > > > > > > > >>>> concern
> > > > > > > > > > > >>>>>> of
> > > > > > > > > > > >>>>>>> regression. And this requirement is more
> > important
> > > > than
> > > > > > the
> > > > > > > > > other
> > > > > > > > > > > >>>> goals
> > > > > > > > > > > >>>>>>> discussed above because it is related to the
> > > > > > > > > stability/performance of
> > > > > > > > > > > >>>>> the
> > > > > > > > > > > >>>>>>> production job. What do you think?
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>> I don't agree with this. There are many things
> > that
> > > > work
> > > > > > > > > something in
> > > > > > > > > > > >>>>>> between perfectly and well enough
> > > > > > > > > > > >>>>>> in some fraction of use cases (maybe in 99%,
> maybe
> > > 95%
> > > > > or
> > > > > > > > maybe
> > > > > > > > > 60%),
> > > > > > > > > > > >>>>> while
> > > > > > > > > > > >>>>>> still being very useful.
> > > > > > > > > > > >>>>>> Good examples are: selection of state backend,
> > > > unaligned
> > > > > > > > > checkpoints,
> > > > > > > > > > > >>>>>> buffer debloating but frankly if I go
> > > > > > > > > > > >>>>>> through list of currently available config
> > options,
> > > > > > > something
> > > > > > > > > like
> > > > > > > > > > > >> half
> > > > > > > > > > > >>>>> of
> > > > > > > > > > > >>>>>> them can cause regressions. Heck,
> > > > > > > > > > > >>>>>> even Flink itself doesn't work perfectly in 100%
> > of
> > > > the
> > > > > > use
> > > > > > > > > cases, due
> > > > > > > > > > > >>>>> to a
> > > > > > > > > > > >>>>>> variety of design choices. Of
> > > > > > > > > > > >>>>>> course, the more use cases are fine with said
> > > feature,
> > > > > the
> > > > > > > > > better, but
> > > > > > > > > > > >>>> we
> > > > > > > > > > > >>>>>> shouldn't fixate to perfectly cover
> > > > > > > > > > > >>>>>> 100% of the cases, as that's impossible.
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>> In this particular case, if back pressure
> > monitoring
> > > > > > > trigger
> > > > > > > > > can work
> > > > > > > > > > > >>>>> well
> > > > > > > > > > > >>>>>> enough in 95% of cases, I would
> > > > > > > > > > > >>>>>> say that's already better than the originally
> > > proposed
> > > > > > > > > alternative,
> > > > > > > > > > > >>>> which
> > > > > > > > > > > >>>>>> doesn't work at all if user has a large
> > > > > > > > > > > >>>>>> backlog to reprocess from Kafka, including when
> > > using
> > > > > > > > > HybridSource
> > > > > > > > > > > >>>> AFTER
> > > > > > > > > > > >>>>>> the switch to Kafka has
> > > > > > > > > > > >>>>>> happened. For the remaining 5%, we should try to
> > > > improve
> > > > > > the
> > > > > > > > > behaviour
> > > > > > > > > > > >>>>> over
> > > > > > > > > > > >>>>>> time, but ultimately, users can
> > > > > > > > > > > >>>>>> decide to just run a fixed checkpoint interval
> (or
> > > at
> > > > > > worst
> > > > > > > > use
> > > > > > > > > the
> > > > > > > > > > > >>>> hacky
> > > > > > > > > > > >>>>>> checkpoint trigger that I mentioned
> > > > > > > > > > > >>>>>> before a couple of times).
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>> Also to be pedantic, if a user naively selects
> > > > > > slow-interval
> > > > > > > > in
> > > > > > > > > your
> > > > > > > > > > > >>>>>> proposal to 30 minutes, when that user's
> > > > > > > > > > > >>>>>> job fails on average every 15-20minutes, his job
> > can
> > > > end
> > > > > > up
> > > > > > > in
> > > > > > > > > a state
> > > > > > > > > > > >>>>> that
> > > > > > > > > > > >>>>>> it can not make any progress,
> > > > > > > > > > > >>>>>> this arguably is quite serious regression.
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> I probably should not say it is "hard
> requirement".
> > > > After
> > > > > > all
> > > > > > > > > there are
> > > > > > > > > > > >>>>> pros/cons. We will need to consider
> implementation
> > > > > > > complexity,
> > > > > > > > > > > >> usability,
> > > > > > > > > > > >>>>> extensibility etc.
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> I just don't think we should take it for granted
> to
> > > > > > introduce
> > > > > > > > > > > >> regression
> > > > > > > > > > > >>>>> for one use-case in order to support another
> > > use-case.
> > > > If
> > > > > > we
> > > > > > > > can
> > > > > > > > > not
> > > > > > > > > > > >> find
> > > > > > > > > > > >>>>> an algorithm/solution that addresses
> > > > > > > > > > > >>>>> both use-case well, I hope we can be open to
> tackle
> > > > them
> > > > > > > > > separately so
> > > > > > > > > > > >>>> that
> > > > > > > > > > > >>>>> users can choose the option that best fits their
> > > needs.
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> All things else being equal, I think it is
> > preferred
> > > > for
> > > > > > > > > user-facing
> > > > > > > > > > > >> API
> > > > > > > > > > > >>>> to
> > > > > > > > > > > >>>>> be clearly defined and let users should be able
> to
> > > use
> > > > > the
> > > > > > > API
> > > > > > > > > without
> > > > > > > > > > > >>>>> concern of regression.
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> Maybe we can list pros/cons for the alternative
> > > > > approaches
> > > > > > we
> > > > > > > > > have been
> > > > > > > > > > > >>>>> discussing and see choose the best approach. And
> > > maybe
> > > > we
> > > > > > > will
> > > > > > > > > end up
> > > > > > > > > > > >>>>> finding that use-case
> > > > > > > > > > > >>>>> which needs CheckpointTrigger can be tackled
> > > separately
> > > > > > from
> > > > > > > > the
> > > > > > > > > > > >> use-case
> > > > > > > > > > > >>>>> in FLIP-309.
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>>>> I am not sure if there is a typo. Because if
> > > > > > > > > > > >>>>> backPressuredTimeMsPerSecond
> > > > > > > > > > > >>>>>> =
> > > > > > > > > > > >>>>>>> 0, then maxRecordsConsumedWithoutBackpressure =
> > > > > > > > > > > >>>> numRecordsInPerSecond /
> > > > > > > > > > > >>>>>>> 1000 * metricsUpdateInterval according to the
> > above
> > > > > > > > algorithm.
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> Do you mean
> > "maxRecordsConsumedWithoutBackpressure
> > > =
> > > > > > > > > > > >>>>>> (numRecordsInPerSecond
> > > > > > > > > > > >>>>>>> / (1 - backPressuredTimeMsPerSecond / 1000)) *
> > > > > > > > > > > >>>> metricsUpdateInterval"?
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>> It looks like there is indeed some mistake in my
> > > > > proposal
> > > > > > > > > above. Yours
> > > > > > > > > > > >>>>> look
> > > > > > > > > > > >>>>>> more correct, it probably
> > > > > > > > > > > >>>>>> still needs some safeguard/special handling if
> > > > > > > > > > > >>>>>> `backPressuredTimeMsPerSecond > 950`
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>>> The only information it can access is the
> > backlog.
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>> Again no. It can access whatever we want to
> > provide
> > > to
> > > > > it.
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>> Regarding the rest of your concerns. It's a
> matter
> > > of
> > > > > > > tweaking
> > > > > > > > > the
> > > > > > > > > > > >>>>>> parameters and the algorithm itself,
> > > > > > > > > > > >>>>>> and how much safety-net do we want to have.
> > > > Ultimately,
> > > > > > I'm
> > > > > > > > > pretty
> > > > > > > > > > > >> sure
> > > > > > > > > > > >>>>>> that's a (for 95-99% of cases)
> > > > > > > > > > > >>>>>> solvable problem. If not, there is always the
> > hacky
> > > > > > > solution,
> > > > > > > > > that
> > > > > > > > > > > >>>> could
> > > > > > > > > > > >>>>> be
> > > > > > > > > > > >>>>>> even integrated into this above
> > > > > > > > > > > >>>>>> mentioned algorithm as a short circuit to always
> > > reach
> > > > > > > > > > > >> `slow-interval`.
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>> Apart of that, you picked 3 minutes as the
> > > > checkpointing
> > > > > > > > > interval in
> > > > > > > > > > > >>>> your
> > > > > > > > > > > >>>>>> counter example. In most cases
> > > > > > > > > > > >>>>>> any interval above 1 minute would inflict pretty
> > > > > > negligible
> > > > > > > > > overheads,
> > > > > > > > > > > >>>> so
> > > > > > > > > > > >>>>>> all in all, I would doubt there is
> > > > > > > > > > > >>>>>> a significant benefit (in most cases) of
> > increasing
> > > 3
> > > > > > minute
> > > > > > > > > > > >> checkpoint
> > > > > > > > > > > >>>>>> interval to anything more, let alone
> > > > > > > > > > > >>>>>> 30 minutes.
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> I am not sure we should design the algorithm with
> > the
> > > > > > > > assumption
> > > > > > > > > that
> > > > > > > > > > > >> the
> > > > > > > > > > > >>>>> short checkpointing interval will always be
> higher
> > > > than 1
> > > > > > > > minute
> > > > > > > > > etc.
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> I agree the proposed algorithm can solve most
> cases
> > > > where
> > > > > > the
> > > > > > > > > resource
> > > > > > > > > > > >> is
> > > > > > > > > > > >>>>> sufficient and there is always no backlog in
> source
> > > > > > subtasks.
> > > > > > > > On
> > > > > > > > > the
> > > > > > > > > > > >>>> other
> > > > > > > > > > > >>>>> hand, what makes SRE
> > > > > > > > > > > >>>>> life hard is probably the remaining 1-5% cases
> > where
> > > > the
> > > > > > > > traffic
> > > > > > > > > is
> > > > > > > > > > > >> spiky
> > > > > > > > > > > >>>>> and the cluster is reaching its capacity limit.
> The
> > > > > ability
> > > > > > > to
> > > > > > > > > predict
> > > > > > > > > > > >>>> and
> > > > > > > > > > > >>>>> control Flink job's behavior (including
> > checkpointing
> > > > > > > interval)
> > > > > > > > > can
> > > > > > > > > > > >>>>> considerably reduce the burden of manging Flink
> > jobs.
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> Best,
> > > > > > > > > > > >>>>> Dong
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>> Best,
> > > > > > > > > > > >>>>>> Piotrek
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>> sob., 3 cze 2023 o 05:44 Dong Lin <
> > > > lindon...@gmail.com>
> > > > > > > > > napisał(a):
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>>> Hi Piotr,
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> Thanks for the explanations. I have some
> followup
> > > > > > questions
> > > > > > > > > below.
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> On Fri, Jun 2, 2023 at 10:55 PM Piotr Nowojski
> <
> > > > > > > > > pnowoj...@apache.org
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>>>> wrote:
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>>> Hi All,
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>> Thanks for chipping in the discussion Ahmed!
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>> Regarding using the REST API. Currently I'm
> > > leaning
> > > > > > > towards
> > > > > > > > > > > >>>>>> implementing
> > > > > > > > > > > >>>>>>>> this feature inside the Flink itself, via some
> > > > > pluggable
> > > > > > > > > interface.
> > > > > > > > > > > >>>>>>>> REST API solution would be tempting, but I
> guess
> > > not
> > > > > > > > everyone
> > > > > > > > > is
> > > > > > > > > > > >>>>> using
> > > > > > > > > > > >>>>>>>> Flink Kubernetes Operator.
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>> @Dong
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>>> I am not sure metrics such as isBackPressured
> > are
> > > > > > already
> > > > > > > > > sent to
> > > > > > > > > > > >>>>> JM.
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>> Fetching code path on the JM:
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl#queryTmMetricsFuture
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>
> > > > > > > > >
> > > > >
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore#add
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>> Example code path accessing Task level metrics
> > via
> > > > JM
> > > > > > > using
> > > > > > > > > the
> > > > > > > > > > > >>>>>>>> `MetricStore`:
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.rest.handler.job.metrics.AggregatingSubtasksMetricsHandler
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> Thanks for the code reference. I checked the
> code
> > > > that
> > > > > > > > invoked
> > > > > > > > > these
> > > > > > > > > > > >>>>> two
> > > > > > > > > > > >>>>>>> classes and found the following information:
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> - AggregatingSubtasksMetricsHandler#getStoresis
> > > > > currently
> > > > > > > > > invoked
> > > > > > > > > > > >>>> only
> > > > > > > > > > > >>>>>>> when AggregatingJobsMetricsHandler is invoked.
> > > > > > > > > > > >>>>>>> - AggregatingJobsMetricsHandler is only
> > > instantiated
> > > > > and
> > > > > > > > > returned by
> > > > > > > > > > > >>>>>>> WebMonitorEndpoint#initializeHandlers
> > > > > > > > > > > >>>>>>> - WebMonitorEndpoint#initializeHandlers is only
> > > used
> > > > by
> > > > > > > > > > > >>>>>> RestServerEndpoint.
> > > > > > > > > > > >>>>>>> And RestServerEndpoint invokes these handlers
> in
> > > > > response
> > > > > > > to
> > > > > > > > > external
> > > > > > > > > > > >>>>>> REST
> > > > > > > > > > > >>>>>>> request.
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> I understand that JM will get the
> > > > backpressure-related
> > > > > > > > metrics
> > > > > > > > > every
> > > > > > > > > > > >>>>> time
> > > > > > > > > > > >>>>>>> the RestServerEndpoint receives the REST
> request
> > to
> > > > get
> > > > > > > these
> > > > > > > > > > > >>>> metrics.
> > > > > > > > > > > >>>>>> But
> > > > > > > > > > > >>>>>>> I am not sure if RestServerEndpoint is already
> > > always
> > > > > > > > > receiving the
> > > > > > > > > > > >>>>> REST
> > > > > > > > > > > >>>>>>> metrics at regular interval (suppose there is
> no
> > > > human
> > > > > > > > manually
> > > > > > > > > > > >>>>>>> opening/clicking the Flink Web UI). And if it
> > does,
> > > > > what
> > > > > > is
> > > > > > > > the
> > > > > > > > > > > >>>>> interval?
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>>>> For example, let's say every source operator
> > > > subtask
> > > > > > > > reports
> > > > > > > > > this
> > > > > > > > > > > >>>>>>> metric
> > > > > > > > > > > >>>>>>>> to
> > > > > > > > > > > >>>>>>>>> JM once every 10 seconds. There are 100
> source
> > > > > > subtasks.
> > > > > > > > And
> > > > > > > > > each
> > > > > > > > > > > >>>>>>> subtask
> > > > > > > > > > > >>>>>>>>> is backpressured roughly 10% of the total
> time
> > > due
> > > > to
> > > > > > > > traffic
> > > > > > > > > > > >>>>> spikes
> > > > > > > > > > > >>>>>>> (and
> > > > > > > > > > > >>>>>>>>> limited buffer). Then at any given time,
> there
> > > are
> > > > 1
> > > > > -
> > > > > > > > > 0.9^100 =
> > > > > > > > > > > >>>>>>> 99.997%
> > > > > > > > > > > >>>>>>>>> chance that there is at least one subtask
> that
> > is
> > > > > > > > > backpressured.
> > > > > > > > > > > >>>>> Then
> > > > > > > > > > > >>>>>>> we
> > > > > > > > > > > >>>>>>>>> have to wait for at least 10 seconds to check
> > > > again.
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>> backPressuredTimeMsPerSecond and other related
> > > > metrics
> > > > > > > (like
> > > > > > > > > > > >>>>>>>> busyTimeMsPerSecond) are not subject to that
> > > > problem.
> > > > > > > > > > > >>>>>>>> They are recalculated once every metric
> fetching
> > > > > > interval,
> > > > > > > > > and they
> > > > > > > > > > > >>>>>>> report
> > > > > > > > > > > >>>>>>>> accurately on average the given subtask spent
> > > > > > > > > > > >>>>>> busy/idling/backpressured.
> > > > > > > > > > > >>>>>>>> In your example, backPressuredTimeMsPerSecond
> > > would
> > > > > > report
> > > > > > > > > 100ms/s.
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> Suppose every subtask is already reporting
> > > > > > > > > > > >>>> backPressuredTimeMsPerSecond
> > > > > > > > > > > >>>>>> to
> > > > > > > > > > > >>>>>>> JM once every 100 ms. If a job has 10 operators
> > > (that
> > > > > are
> > > > > > > not
> > > > > > > > > > > >>>> chained)
> > > > > > > > > > > >>>>>> and
> > > > > > > > > > > >>>>>>> each operator has 100 subtasks, then JM would
> > need
> > > to
> > > > > > > handle
> > > > > > > > > 10000
> > > > > > > > > > > >>>>>> requests
> > > > > > > > > > > >>>>>>> per second to receive metrics from these 1000
> > > > subtasks.
> > > > > > It
> > > > > > > > > seems
> > > > > > > > > > > >>>> like a
> > > > > > > > > > > >>>>>>> non-trivial overhead for medium-to-large sized
> > jobs
> > > > and
> > > > > > can
> > > > > > > > > make JM
> > > > > > > > > > > >>>> the
> > > > > > > > > > > >>>>>>> performance bottleneck during job execution.
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> I would be surprised if Flink is already paying
> > > this
> > > > > much
> > > > > > > > > overhead
> > > > > > > > > > > >>>> just
> > > > > > > > > > > >>>>>> for
> > > > > > > > > > > >>>>>>> metrics monitoring. That is the main reason I
> > still
> > > > > doubt
> > > > > > > it
> > > > > > > > > is true.
> > > > > > > > > > > >>>>> Can
> > > > > > > > > > > >>>>>>> you show where this 100 ms is currently
> > configured?
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> Alternatively, maybe you mean that we should
> add
> > > > extra
> > > > > > code
> > > > > > > > to
> > > > > > > > > invoke
> > > > > > > > > > > >>>>> the
> > > > > > > > > > > >>>>>>> REST API at 100 ms interval. Then that means we
> > > need
> > > > to
> > > > > > > > > considerably
> > > > > > > > > > > >>>>>>> increase the network/cpu overhead at JM, where
> > the
> > > > > > overhead
> > > > > > > > > will
> > > > > > > > > > > >>>>> increase
> > > > > > > > > > > >>>>>>> as the number of TM/slots increase, which may
> > pose
> > > > risk
> > > > > > to
> > > > > > > > the
> > > > > > > > > > > >>>>>> scalability
> > > > > > > > > > > >>>>>>> of the proposed design. I am not sure we should
> > do
> > > > > this.
> > > > > > > What
> > > > > > > > > do you
> > > > > > > > > > > >>>>>> think?
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>>> While it will be nice to support additional
> > > > use-cases
> > > > > > > > > > > >>>>>>>>> with one proposal, it is probably also
> > reasonable
> > > > to
> > > > > > make
> > > > > > > > > > > >>>>> incremental
> > > > > > > > > > > >>>>>>>>> progress and support the low-hanging-fruit
> > > use-case
> > > > > > > first.
> > > > > > > > > The
> > > > > > > > > > > >>>>> choice
> > > > > > > > > > > >>>>>>>>> really depends on the complexity and the
> > > importance
> > > > > of
> > > > > > > > > supporting
> > > > > > > > > > > >>>>> the
> > > > > > > > > > > >>>>>>>> extra
> > > > > > > > > > > >>>>>>>>> use-cases.
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>> That would be true, if that was a private
> > > > > implementation
> > > > > > > > > detail or
> > > > > > > > > > > >>>> if
> > > > > > > > > > > >>>>>> the
> > > > > > > > > > > >>>>>>>> low-hanging-fruit-solution would be on the
> > direct
> > > > path
> > > > > > to
> > > > > > > > the
> > > > > > > > > final
> > > > > > > > > > > >>>>>>>> solution.
> > > > > > > > > > > >>>>>>>> That's unfortunately not the case here. This
> > will
> > > > add
> > > > > > > public
> > > > > > > > > facing
> > > > > > > > > > > >>>>>> API,
> > > > > > > > > > > >>>>>>>> that we will later need to maintain, no matter
> > > what
> > > > > the
> > > > > > > > final
> > > > > > > > > > > >>>>> solution
> > > > > > > > > > > >>>>>>> will
> > > > > > > > > > > >>>>>>>> be,
> > > > > > > > > > > >>>>>>>> and at the moment at least I don't see it
> being
> > > > > related
> > > > > > > to a
> > > > > > > > > > > >>>>> "perfect"
> > > > > > > > > > > >>>>>>>> solution.
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> Sure. Then let's decide the final solution
> first.
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>>>> I guess the point is that the suggested
> > approach,
> > > > > which
> > > > > > > > > > > >>>> dynamically
> > > > > > > > > > > >>>>>>>>> determines the checkpointing interval based
> on
> > > the
> > > > > > > > > backpressure,
> > > > > > > > > > > >>>>> may
> > > > > > > > > > > >>>>>>>> cause
> > > > > > > > > > > >>>>>>>>> regression when the checkpointing interval is
> > > > > > relatively
> > > > > > > > low.
> > > > > > > > > > > >>>> This
> > > > > > > > > > > >>>>>>> makes
> > > > > > > > > > > >>>>>>>> it
> > > > > > > > > > > >>>>>>>>> hard for users to enable this feature in
> > > > production.
> > > > > It
> > > > > > > is
> > > > > > > > > like
> > > > > > > > > > > >>>> an
> > > > > > > > > > > >>>>>>>>> auto-driving system that is not guaranteed to
> > > work
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>> Yes, creating a more generic solution that
> would
> > > > > require
> > > > > > > > less
> > > > > > > > > > > >>>>>>> configuration
> > > > > > > > > > > >>>>>>>> is usually more difficult then static
> > > > configurations.
> > > > > > > > > > > >>>>>>>> It doesn't mean we shouldn't try to do that.
> > > > > Especially
> > > > > > > that
> > > > > > > > > if my
> > > > > > > > > > > >>>>>>> proposed
> > > > > > > > > > > >>>>>>>> algorithm wouldn't work good enough, there is
> > > > > > > > > > > >>>>>>>> an obvious solution, that any source could
> add a
> > > > > metric,
> > > > > > > > like
> > > > > > > > > let
> > > > > > > > > > > >>>> say
> > > > > > > > > > > >>>>>>>> "processingBacklog: true/false", and the
> > > > > > > `CheckpointTrigger`
> > > > > > > > > > > >>>>>>>> could use this as an override to always switch
> > to
> > > > the
> > > > > > > > > > > >>>>>>>> "slowCheckpointInterval". I don't think we
> need
> > > it,
> > > > > but
> > > > > > > > that's
> > > > > > > > > > > >>>> always
> > > > > > > > > > > >>>>>> an
> > > > > > > > > > > >>>>>>>> option
> > > > > > > > > > > >>>>>>>> that would be basically equivalent to your
> > > original
> > > > > > > > proposal.
> > > > > > > > > Or
> > > > > > > > > > > >>>> even
> > > > > > > > > > > >>>>>>>> source could add "suggestedCheckpointInterval
> :
> > > > int",
> > > > > > and
> > > > > > > > > > > >>>>>>>> `CheckpointTrigger` could use that value if
> > > present
> > > > > as a
> > > > > > > > hint
> > > > > > > > > in
> > > > > > > > > > > >>>> one
> > > > > > > > > > > >>>>>> way
> > > > > > > > > > > >>>>>>> or
> > > > > > > > > > > >>>>>>>> another.
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> So far we have talked about the possibility of
> > > using
> > > > > > > > > > > >>>> CheckpointTrigger
> > > > > > > > > > > >>>>>> and
> > > > > > > > > > > >>>>>>> mentioned the CheckpointTrigger
> > > > > > > > > > > >>>>>>> and read metric values.
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> Can you help answer the following questions so
> > > that I
> > > > > can
> > > > > > > > > understand
> > > > > > > > > > > >>>>> the
> > > > > > > > > > > >>>>>>> alternative solution more concretely:
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> - What is the interface of this
> > CheckpointTrigger?
> > > > For
> > > > > > > > > example, are
> > > > > > > > > > > >>>> we
> > > > > > > > > > > >>>>>>> going to give CheckpointTrigger a context that
> it
> > > can
> > > > > use
> > > > > > > to
> > > > > > > > > fetch
> > > > > > > > > > > >>>>>>> arbitrary metric values? This can help us
> > > understand
> > > > > what
> > > > > > > > > information
> > > > > > > > > > > >>>>>> this
> > > > > > > > > > > >>>>>>> user-defined CheckpointTrigger can use to make
> > the
> > > > > > > checkpoint
> > > > > > > > > > > >>>> decision.
> > > > > > > > > > > >>>>>>> - Where is this CheckpointTrigger running? For
> > > > example,
> > > > > > is
> > > > > > > it
> > > > > > > > > going
> > > > > > > > > > > >>>> to
> > > > > > > > > > > >>>>>> run
> > > > > > > > > > > >>>>>>> on the subtask of every source operator? Or is
> it
> > > > going
> > > > > > to
> > > > > > > > run
> > > > > > > > > on the
> > > > > > > > > > > >>>>> JM?
> > > > > > > > > > > >>>>>>> - Are we going to provide a default
> > implementation
> > > of
> > > > > > this
> > > > > > > > > > > >>>>>>> CheckpointTrigger in Flink that implements the
> > > > > algorithm
> > > > > > > > > described
> > > > > > > > > > > >>>>> below,
> > > > > > > > > > > >>>>>>> or do we expect each source operator developer
> to
> > > > > > implement
> > > > > > > > > their own
> > > > > > > > > > > >>>>>>> CheckpointTrigger?
> > > > > > > > > > > >>>>>>> - How can users specify the
> > > > > > > > > > > >>>>>> fastCheckpointInterval/slowCheckpointInterval?
> > > > > > > > > > > >>>>>>> For example, will we provide APIs on the
> > > > > > CheckpointTrigger
> > > > > > > > that
> > > > > > > > > > > >>>>> end-users
> > > > > > > > > > > >>>>>>> can use to specify the checkpointing interval?
> > What
> > > > > would
> > > > > > > > that
> > > > > > > > > look
> > > > > > > > > > > >>>>> like?
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> Overall, my gut feel is that the alternative
> > > approach
> > > > > > based
> > > > > > > > on
> > > > > > > > > > > >>>>>>> CheckpointTrigger is more complicated and
> harder
> > to
> > > > > use.
> > > > > > > And
> > > > > > > > it
> > > > > > > > > > > >>>>> probably
> > > > > > > > > > > >>>>>>> also has the issues of "having two places to
> > > > configure
> > > > > > > > > checkpointing
> > > > > > > > > > > >>>>>>> interval" and "giving flexibility for every
> > source
> > > to
> > > > > > > > > implement a
> > > > > > > > > > > >>>>>> different
> > > > > > > > > > > >>>>>>> API" (as mentioned below).
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> Maybe we can evaluate it more after knowing the
> > > > answers
> > > > > > to
> > > > > > > > the
> > > > > > > > > above
> > > > > > > > > > > >>>>>>> questions.
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>>> On the other hand, the approach currently
> > > proposed
> > > > in
> > > > > > the
> > > > > > > > > FLIP is
> > > > > > > > > > > >>>>>> much
> > > > > > > > > > > >>>>>>>>> simpler as it does not depend on
> backpressure.
> > > > Users
> > > > > > > > specify
> > > > > > > > > the
> > > > > > > > > > > >>>>>> extra
> > > > > > > > > > > >>>>>>>>> interval requirement on the specific sources
> > > (e.g.
> > > > > > > > > HybridSource,
> > > > > > > > > > > >>>>>> MySQL
> > > > > > > > > > > >>>>>>>> CDC
> > > > > > > > > > > >>>>>>>>> Source) and can easily know the checkpointing
> > > > > interval
> > > > > > > will
> > > > > > > > > be
> > > > > > > > > > > >>>> used
> > > > > > > > > > > >>>>>> on
> > > > > > > > > > > >>>>>>>> the
> > > > > > > > > > > >>>>>>>>> continuous phase of the corresponding source.
> > > This
> > > > is
> > > > > > > > pretty
> > > > > > > > > much
> > > > > > > > > > > >>>>>> same
> > > > > > > > > > > >>>>>>> as
> > > > > > > > > > > >>>>>>>>> how users use the existing
> > > > > > > execution.checkpointing.interval
> > > > > > > > > > > >>>> config.
> > > > > > > > > > > >>>>>> So
> > > > > > > > > > > >>>>>>>>> there is no extra concern of regression
> caused
> > by
> > > > > this
> > > > > > > > > approach.
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>> To an extent, but as I have already previously
> > > > > > mentioned I
> > > > > > > > > really
> > > > > > > > > > > >>>>>> really
> > > > > > > > > > > >>>>>>> do
> > > > > > > > > > > >>>>>>>> not like idea of:
> > > > > > > > > > > >>>>>>>> - having two places to configure checkpointing
> > > > > interval
> > > > > > > > > (config
> > > > > > > > > > > >>>>> file
> > > > > > > > > > > >>>>>>> and
> > > > > > > > > > > >>>>>>>> in the Source builders)
> > > > > > > > > > > >>>>>>>> - giving flexibility for every source to
> > > implement a
> > > > > > > > different
> > > > > > > > > > > >>>> API
> > > > > > > > > > > >>>>>> for
> > > > > > > > > > > >>>>>>>> that purpose
> > > > > > > > > > > >>>>>>>> - creating a solution that is not generic
> > enough,
> > > so
> > > > > > that
> > > > > > > we
> > > > > > > > > will
> > > > > > > > > > > >>>>>> need
> > > > > > > > > > > >>>>>>> a
> > > > > > > > > > > >>>>>>>> completely different mechanism in the future
> > > anyway
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> Yeah, I understand different developers might
> > have
> > > > > > > different
> > > > > > > > > > > >>>>>>> concerns/tastes for these APIs. Ultimately,
> there
> > > > might
> > > > > > not
> > > > > > > > be
> > > > > > > > > a
> > > > > > > > > > > >>>>> perfect
> > > > > > > > > > > >>>>>>> solution and we have to choose based on the
> > > pros/cons
> > > > > of
> > > > > > > > these
> > > > > > > > > > > >>>>> solutions.
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> I agree with you that, all things being equal,
> it
> > > is
> > > > > > > > > preferable to 1)
> > > > > > > > > > > >>>>>> have
> > > > > > > > > > > >>>>>>> one place to configure checkpointing intervals,
> > 2)
> > > > have
> > > > > > all
> > > > > > > > > source
> > > > > > > > > > > >>>>>>> operators use the same API, and 3) create a
> > > solution
> > > > > that
> > > > > > > is
> > > > > > > > > generic
> > > > > > > > > > > >>>>> and
> > > > > > > > > > > >>>>>>> last lasting. Note that these three goals
> affects
> > > the
> > > > > > > > > usability and
> > > > > > > > > > > >>>>>>> extensibility of the API, but not necessarily
> the
> > > > > > > > > > > >>>> stability/performance
> > > > > > > > > > > >>>>>> of
> > > > > > > > > > > >>>>>>> the production job.
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> BTW, there are also other preferrable goals.
> For
> > > > > example,
> > > > > > > it
> > > > > > > > > is very
> > > > > > > > > > > >>>>>> useful
> > > > > > > > > > > >>>>>>> for the job's behavior to be predictable and
> > > > > > interpretable
> > > > > > > so
> > > > > > > > > that
> > > > > > > > > > > >>>> SRE
> > > > > > > > > > > >>>>>> can
> > > > > > > > > > > >>>>>>> operator/debug the Flink in an easier way. We
> can
> > > > list
> > > > > > > these
> > > > > > > > > > > >>>> pros/cons
> > > > > > > > > > > >>>>>>> altogether later.
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> I am wondering if we can first agree on the
> > > priority
> > > > of
> > > > > > > goals
> > > > > > > > > we want
> > > > > > > > > > > >>>>> to
> > > > > > > > > > > >>>>>>> achieve. IMO, it is a hard-requirement for the
> > > > > > user-facing
> > > > > > > > API
> > > > > > > > > to be
> > > > > > > > > > > >>>>>>> clearly defined and users should be able to use
> > the
> > > > API
> > > > > > > > without
> > > > > > > > > > > >>>> concern
> > > > > > > > > > > >>>>>> of
> > > > > > > > > > > >>>>>>> regression. And this requirement is more
> > important
> > > > than
> > > > > > the
> > > > > > > > > other
> > > > > > > > > > > >>>> goals
> > > > > > > > > > > >>>>>>> discussed above because it is related to the
> > > > > > > > > stability/performance of
> > > > > > > > > > > >>>>> the
> > > > > > > > > > > >>>>>>> production job. What do you think?
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>>> Sounds good. Looking forward to learning more
> > > > ideas.
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>> I have thought about this a bit more, and I
> > think
> > > we
> > > > > > don't
> > > > > > > > > need to
> > > > > > > > > > > >>>>>> check
> > > > > > > > > > > >>>>>>>> for the backpressure status, or how much
> > > overloaded
> > > > > all
> > > > > > of
> > > > > > > > the
> > > > > > > > > > > >>>>>> operators
> > > > > > > > > > > >>>>>>>> are.
> > > > > > > > > > > >>>>>>>> We could just check three things for source
> > > > operators:
> > > > > > > > > > > >>>>>>>> 1. pendingRecords (backlog length)
> > > > > > > > > > > >>>>>>>> 2. numRecordsInPerSecond
> > > > > > > > > > > >>>>>>>> 3. backPressuredTimeMsPerSecond
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>> // int metricsUpdateInterval = 10s // obtained
> > > from
> > > > > > config
> > > > > > > > > > > >>>>>>>> // Next line calculates how many records can
> we
> > > > > consume
> > > > > > > from
> > > > > > > > > the
> > > > > > > > > > > >>>>>> backlog,
> > > > > > > > > > > >>>>>>>> assuming
> > > > > > > > > > > >>>>>>>> // that magically the reason behind a
> > backpressure
> > > > > > > vanishes.
> > > > > > > > > We
> > > > > > > > > > > >>>> will
> > > > > > > > > > > >>>>>> use
> > > > > > > > > > > >>>>>>>> this only as
> > > > > > > > > > > >>>>>>>> // a safeguard  against scenarios like for
> > example
> > > > if
> > > > > > > > > backpressure
> > > > > > > > > > > >>>>> was
> > > > > > > > > > > >>>>>>>> caused by some
> > > > > > > > > > > >>>>>>>> // intermittent failure/performance
> degradation.
> > > > > > > > > > > >>>>>>>> maxRecordsConsumedWithoutBackpressure =
> > > > > > > > > (numRecordsInPerSecond /
> > > > > > > > > > > >>>>> (1000
> > > > > > > > > > > >>>>>>>> - backPressuredTimeMsPerSecond / 1000)) *
> > > > > > > > > metricsUpdateInterval
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> I am not sure if there is a typo. Because if
> > > > > > > > > > > >>>>>> backPressuredTimeMsPerSecond =
> > > > > > > > > > > >>>>>>> 0, then maxRecordsConsumedWithoutBackpressure =
> > > > > > > > > > > >>>> numRecordsInPerSecond /
> > > > > > > > > > > >>>>>>> 1000 * metricsUpdateInterval according to the
> > above
> > > > > > > > algorithm.
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> Do you mean
> > "maxRecordsConsumedWithoutBackpressure
> > > =
> > > > > > > > > > > >>>>>> (numRecordsInPerSecond
> > > > > > > > > > > >>>>>>> / (1 - backPressuredTimeMsPerSecond / 1000)) *
> > > > > > > > > > > >>>> metricsUpdateInterval"?
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>> // we are excluding
> > > > > > maxRecordsConsumedWithoutBackpressure
> > > > > > > > > from the
> > > > > > > > > > > >>>>>>> backlog
> > > > > > > > > > > >>>>>>>> as
> > > > > > > > > > > >>>>>>>> // a safeguard against an intermittent back
> > > pressure
> > > > > > > > > problems, so
> > > > > > > > > > > >>>>> that
> > > > > > > > > > > >>>>>> we
> > > > > > > > > > > >>>>>>>> don't
> > > > > > > > > > > >>>>>>>> // calculate next checkpoint interval far far
> in
> > > the
> > > > > > > future,
> > > > > > > > > while
> > > > > > > > > > > >>>>> the
> > > > > > > > > > > >>>>>>>> backpressure
> > > > > > > > > > > >>>>>>>> // goes away before we will recalculate
> metrics
> > > and
> > > > > new
> > > > > > > > > > > >>>> checkpointing
> > > > > > > > > > > >>>>>>>> interval
> > > > > > > > > > > >>>>>>>> timeToConsumeBacklog = (pendingRecords -
> > > > > > > > > > > >>>>>>>> maxRecordsConsumedWithoutBackpressure) /
> > > > > > > > numRecordsInPerSecond
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>> Then we can use those numbers to calculate
> > desired
> > > > > > > > > checkpointed
> > > > > > > > > > > >>>>>> interval
> > > > > > > > > > > >>>>>>>> for example like this:
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>> long calculatedCheckpointInterval =
> > > > > > timeToConsumeBacklog /
> > > > > > > > 10;
> > > > > > > > > > > >>>> //this
> > > > > > > > > > > >>>>>> may
> > > > > > > > > > > >>>>>>>> need some refining
> > > > > > > > > > > >>>>>>>> long nextCheckpointInterval =
> > > > > > > > min(max(fastCheckpointInterval,
> > > > > > > > > > > >>>>>>>> calculatedCheckpointInterval),
> > > > > slowCheckpointInterval);
> > > > > > > > > > > >>>>>>>> long nextCheckpointTs = lastCheckpointTs +
> > > > > > > > > nextCheckpointInterval;
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>> WDYT?
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> I think the idea of the above algorithm is to
> > > incline
> > > > > to
> > > > > > > use
> > > > > > > > > the
> > > > > > > > > > > >>>>>>> fastCheckpointInterval unless we are very sure
> > the
> > > > > > backlog
> > > > > > > > > will take
> > > > > > > > > > > >>>> a
> > > > > > > > > > > >>>>>> long
> > > > > > > > > > > >>>>>>> time to process. This can alleviate the concern
> > of
> > > > > > > regression
> > > > > > > > > during
> > > > > > > > > > > >>>>> the
> > > > > > > > > > > >>>>>>> continuous_bounded phase since we are more
> likely
> > > to
> > > > > use
> > > > > > > the
> > > > > > > > > > > >>>>>>> fastCheckpointInterval. However, it can cause
> > > > > regression
> > > > > > > > > during the
> > > > > > > > > > > >>>>>> bounded
> > > > > > > > > > > >>>>>>> phase.
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> I will use a concrete example to explain the
> risk
> > > of
> > > > > > > > > regression:
> > > > > > > > > > > >>>>>>> - The user is using HybridSource to read from
> > HDFS
> > > > > > followed
> > > > > > > > by
> > > > > > > > > Kafka.
> > > > > > > > > > > >>>>> The
> > > > > > > > > > > >>>>>>> data in HDFS is old and there is no need for
> data
> > > > > > freshness
> > > > > > > > > for the
> > > > > > > > > > > >>>>> data
> > > > > > > > > > > >>>>>> in
> > > > > > > > > > > >>>>>>> HDFS.
> > > > > > > > > > > >>>>>>> - The user configures the job as below:
> > > > > > > > > > > >>>>>>> - fastCheckpointInterval = 3 minutes
> > > > > > > > > > > >>>>>>> - slowCheckpointInterval = 30 minutes
> > > > > > > > > > > >>>>>>> - metricsUpdateInterval = 100 ms
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> Using the above formulate, we can know that
> once
> > > > > > > > pendingRecords
> > > > > > > > > > > >>>>>>> <= numRecordsInPerSecond * 30-minutes, then
> > > > > > > > > > > >>>>> calculatedCheckpointInterval
> > > > > > > > > > > >>>>>> <=
> > > > > > > > > > > >>>>>>> 3 minutes, meaning that we will use
> > > > > > slowCheckpointInterval
> > > > > > > as
> > > > > > > > > the
> > > > > > > > > > > >>>>>>> checkpointing interval. Then in the last 30
> > minutes
> > > > of
> > > > > > the
> > > > > > > > > bounded
> > > > > > > > > > > >>>>> phase,
> > > > > > > > > > > >>>>>>> the checkpointing frequency will be 10X higher
> > than
> > > > > what
> > > > > > > the
> > > > > > > > > user
> > > > > > > > > > > >>>>> wants.
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> Also note that the same issue would also
> > > considerably
> > > > > > limit
> > > > > > > > the
> > > > > > > > > > > >>>>> benefits
> > > > > > > > > > > >>>>>> of
> > > > > > > > > > > >>>>>>> the algorithm. For example, during the
> continuous
> > > > > phase,
> > > > > > > the
> > > > > > > > > > > >>>> algorithm
> > > > > > > > > > > >>>>>> will
> > > > > > > > > > > >>>>>>> only be better than the approach in FLIP-309
> when
> > > > there
> > > > > > is
> > > > > > > at
> > > > > > > > > least
> > > > > > > > > > > >>>>>>> 30-minutes worth of backlog in the source.
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> Sure, having a slower checkpointing interval in
> > > this
> > > > > > > extreme
> > > > > > > > > case
> > > > > > > > > > > >>>>> (where
> > > > > > > > > > > >>>>>>> there is 30-minutes backlog in the
> > > > continous-unbounded
> > > > > > > phase)
> > > > > > > > > is
> > > > > > > > > > > >>>> still
> > > > > > > > > > > >>>>>>> useful when this happens. But since this is the
> > > > > un-common
> > > > > > > > > case, and
> > > > > > > > > > > >>>> the
> > > > > > > > > > > >>>>>>> right solution is probably to do capacity
> > planning
> > > to
> > > > > > avoid
> > > > > > > > > this from
> > > > > > > > > > > >>>>>>> happening in the first place, I am not sure it
> is
> > > > worth
> > > > > > > > > optimizing
> > > > > > > > > > > >>>> for
> > > > > > > > > > > >>>>>> this
> > > > > > > > > > > >>>>>>> case at the cost of regression in the bounded
> > phase
> > > > and
> > > > > > the
> > > > > > > > > reduced
> > > > > > > > > > > >>>>>>> operational predictability for users (e.g. what
> > > > > > > checkpointing
> > > > > > > > > > > >>>> interval
> > > > > > > > > > > >>>>>>> should I expect at this stage of the job).
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> I think the fundamental issue with this
> algorithm
> > > is
> > > > > that
> > > > > > > it
> > > > > > > > is
> > > > > > > > > > > >>>> applied
> > > > > > > > > > > >>>>>> to
> > > > > > > > > > > >>>>>>> both the bounded phases and the
> > continous_unbounded
> > > > > > phases
> > > > > > > > > without
> > > > > > > > > > > >>>>>> knowing
> > > > > > > > > > > >>>>>>> which phase the job is running at. The only
> > > > information
> > > > > > it
> > > > > > > > can
> > > > > > > > > access
> > > > > > > > > > > >>>>> is
> > > > > > > > > > > >>>>>>> the backlog. But two sources with the same
> amount
> > > of
> > > > > > > backlog
> > > > > > > > > do not
> > > > > > > > > > > >>>>>>> necessarily mean they have the same data
> > freshness
> > > > > > > > requirement.
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> In this particular example, users know that the
> > > data
> > > > in
> > > > > > > HDFS
> > > > > > > > > is very
> > > > > > > > > > > >>>>> old
> > > > > > > > > > > >>>>>>> and there is no need for data freshness. Users
> > can
> > > > > > express
> > > > > > > > > signals
> > > > > > > > > > > >>>> via
> > > > > > > > > > > >>>>>> the
> > > > > > > > > > > >>>>>>> per-source API proposed in the FLIP. This is
> why
> > > the
> > > > > > > current
> > > > > > > > > approach
> > > > > > > > > > > >>>>> in
> > > > > > > > > > > >>>>>>> FLIP-309 can be better in this case.
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> What do you think?
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> Best,
> > > > > > > > > > > >>>>>>> Dong
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>> Best,
> > > > > > > > > > > >>>>>>>> Piotrek
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to