Hi,

Thanks for the proposal. However, are you sure that the OperatorCoordinator
is the right place to place such logic? What would happen if there are two
(or more) operator coordinators with conflicting desired checkpoint trigger
behaviour? If one source is processing a backlog and the other is already
processing real time data, I would assume that in most use cases you would
like to still have the longer checkpointing interval, not the shorter one.
Also apart from that, it might be a bit confusing and not user friendly to
have multiple places that can override the checkpointing behaviour in a
different way.

FIY in the past, we had some discussions about similar requests and back
then we chose to keep the system simpler, and exposed a more generic REST
API checkpoint triggering mechanism. I know that having to implement such
logic outside of Flink and having to call REST calls to trigger checkpoints
might not be ideal, but that's already implemented and is simple from the
perspective of Flink.

I don't know, maybe instead of adding this logic to operator coordinators,
`CheckpointCoordinator` should have a pluggable `CheckpointTrigger`, that
the user could configure like a `MetricReporter`. The default one would be
just periodically triggering checkpoints. Maybe
`BacklogDynamicCheckpointTrigger` could look at metrics[1], check if
`pendingRecords` for some source has exceeded the configured threshold and
based on that adjust the checkpointing interval accordingly? This would at
least address some of my concerns.

WDYT?

Best,
Piotrek

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics

wt., 9 maj 2023 o 19:11 Yunfeng Zhou <flink.zhouyunf...@gmail.com>
napisaƂ(a):

> Hi all,
>
> Dong(cc'ed) and I are opening this thread to discuss our proposal to
> support dynamically triggering checkpoints from operators, which has
> been documented in FLIP-309
> <
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255069517
> >.
>
> With the help of the ability proposed in this FLIP, users could
> improve the performance of their Flink job in cases like when the job
> needs to process both historical batch data and real-time streaming
> data, by adjusting the checkpoint triggerings in different phases of a
> HybridSource or CDC source.
>
> This proposal would be a fundamental component in the effort to
> further unify Flink's batch and stream processing ability. Please feel
> free to reply to this email thread and share with us your opinions.
>
> Best regards.
>
> Dong and Yunfeng
>

Reply via email to