Hi Piotr,

Thanks for the comments. Let me try to understand your concerns and
hopefully address the concerns.

>> What would happen if there are two (or more) operator coordinators with
conflicting desired checkpoint trigger behaviour

With the proposed change, there won't exist any "*conflicting* desired
checkpoint trigger" by definition. Both job-level config and the proposed
API upperBoundCheckpointingInterval() means the upper-bound of the
checkpointing interval. If there are different upper-bounds proposed by
different source operators and the job-level config, Flink will try to
periodically trigger checkpoints at the interval corresponding to the
minimum of all these proposed upper-bounds.

>> If one source is processing a backlog and the other is already
processing real time data..

Overall, I am not sure we always want to have a longer checkpointing
interval. That really depends on the specific use-case and the job graph.

The proposed API change mechanism for operators and users to specify
different checkpoint intervals at different periods of the job. Users have
the option to use the new API to get better performance in the use-case
specified in the motivation section. I believe there can be use-case where
the proposed API is not useful, in which case users can choose not to use
the API without incurring any performance regression.

>> it might be a bit confusing and not user friendly to have multiple
places that can override the checkpointing behaviour in a different way

Admittedly, adding more APIs always incur more complexity. But sometimes we
have to incur this complexity to address new use-cases. Maybe we can see if
there are more user-friendly way to address this use-case.

>> already implemented and is simple from the perspective of Flink

Do you mean that the HybridSource operator should invoke the rest API to
trigger checkpoints? The downside of this approach is that it makes it hard
for developers of source operators (e.g. MySQL CDC, HybridSource) to
address the target use-case. AFAIK, there is no existing case where we
require operator developers to use REST API to do their job.

Can you help explain the benefit of using REST API over using the proposed
API?

Note that this approach also seems to have the same downside mentioned
above: "multiple places that can override the checkpointing behaviour". I
am not sure there can be a solution to address the target use-case without
having multiple places that can affect the checkpointing behavior.

>> check if `pendingRecords` for some source has exceeded the configured
threshold and based on that adjust the checkpointing interval accordingly

I am not sure this approach can address the target use-case in a better
way. In the target use-case, we would like to HybridSource to trigger
checkpoint more frequently when it is read the Kafka Source (than when it
is reading the HDFS source). We would need to set a flag for the checkpoint
trigger to know which source the HybridSource is reading from. But IMO the
approach is less intuitive and more complex than having the HybridSource
invoke upperBoundCheckpointingInterval() directly once it is reading Kafka
Source.

Maybe I did not understand the alternative approach rightly. I am happy to
discuss more on this topic. WDYT?


Best,
Dong

On Tue, May 23, 2023 at 10:27 PM Piotr Nowojski <pnowoj...@apache.org>
wrote:

> Hi,
>
> Thanks for the proposal. However, are you sure that the
> OperatorCoordinator is the right place to place such logic? What would
> happen if there are two (or more) operator coordinators with conflicting
> desired checkpoint trigger behaviour? If one source is processing a backlog
> and the other is already processing real time data, I would assume that in
> most use cases you would like to still have the longer checkpointing
> interval, not the shorter one. Also apart from that, it might be a bit
> confusing and not user friendly to have multiple places that can override
> the checkpointing behaviour in a different way.
>
> FIY in the past, we had some discussions about similar requests and back
> then we chose to keep the system simpler, and exposed a more generic REST
> API checkpoint triggering mechanism. I know that having to implement such
> logic outside of Flink and having to call REST calls to trigger checkpoints
> might not be ideal, but that's already implemented and is simple from the
> perspective of Flink.
>
> I don't know, maybe instead of adding this logic to operator coordinators,
> `CheckpointCoordinator` should have a pluggable `CheckpointTrigger`, that
> the user could configure like a `MetricReporter`. The default one would be
> just periodically triggering checkpoints. Maybe
> `BacklogDynamicCheckpointTrigger` could look at metrics[1], check if
> `pendingRecords` for some source has exceeded the configured threshold and
> based on that adjust the checkpointing interval accordingly? This would at
> least address some of my concerns.
>
> WDYT?
>
> Best,
> Piotrek
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
>
> wt., 9 maj 2023 o 19:11 Yunfeng Zhou <flink.zhouyunf...@gmail.com>
> napisał(a):
>
>> Hi all,
>>
>> Dong(cc'ed) and I are opening this thread to discuss our proposal to
>> support dynamically triggering checkpoints from operators, which has
>> been documented in FLIP-309
>> <
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255069517
>> >.
>>
>> With the help of the ability proposed in this FLIP, users could
>> improve the performance of their Flink job in cases like when the job
>> needs to process both historical batch data and real-time streaming
>> data, by adjusting the checkpoint triggerings in different phases of a
>> HybridSource or CDC source.
>>
>> This proposal would be a fundamental component in the effort to
>> further unify Flink's batch and stream processing ability. Please feel
>> free to reply to this email thread and share with us your opinions.
>>
>> Best regards.
>>
>> Dong and Yunfeng
>>
>

Reply via email to