Hi, Thanks for the proposal. However, are you sure that the OperatorCoordinator is the right place to place such logic? What would happen if there are two (or more) operator coordinators with conflicting desired checkpoint trigger behaviour? If one source is processing a backlog and the other is already processing real time data, I would assume that in most use cases you would like to still have the longer checkpointing interval, not the shorter one. Also apart from that, it might be a bit confusing and not user friendly to have multiple places that can override the checkpointing behaviour in a different way.
FIY in the past, we had some discussions about similar requests and back then we chose to keep the system simpler, and exposed a more generic REST API checkpoint triggering mechanism. I know that having to implement such logic outside of Flink and having to call REST calls to trigger checkpoints might not be ideal, but that's already implemented and is simple from the perspective of Flink. I don't know, maybe instead of adding this logic to operator coordinators, `CheckpointCoordinator` should have a pluggable `CheckpointTrigger`, that the user could configure like a `MetricReporter`. The default one would be just periodically triggering checkpoints. Maybe `BacklogDynamicCheckpointTrigger` could look at metrics[1], check if `pendingRecords` for some source has exceeded the configured threshold and based on that adjust the checkpointing interval accordingly? This would at least address some of my concerns. WDYT? Best, Piotrek [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics wt., 9 maj 2023 o 19:11 Yunfeng Zhou <flink.zhouyunf...@gmail.com> napisaĆ(a): > Hi all, > > Dong(cc'ed) and I are opening this thread to discuss our proposal to > support dynamically triggering checkpoints from operators, which has > been documented in FLIP-309 > < > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255069517 > >. > > With the help of the ability proposed in this FLIP, users could > improve the performance of their Flink job in cases like when the job > needs to process both historical batch data and real-time streaming > data, by adjusting the checkpoint triggerings in different phases of a > HybridSource or CDC source. > > This proposal would be a fundamental component in the effort to > further unify Flink's batch and stream processing ability. Please feel > free to reply to this email thread and share with us your opinions. > > Best regards. > > Dong and Yunfeng >