Hi Jing,

Thanks for your comments!

Regarding the idea of using the existing "boundedness" attribute of
sources, that is indeed something that we might find intuitive initially. I
have thought about this idea, but could not find a good way to make it
work. I will try to explain my thoughts and see if we can find a better
solution.

Here is my understanding of the idea mentioned above: provide a job level
config execution.checkpoint.interval.bounded. Flink will use this as the
checkpointing interval whenever there exists at least one running source
which claims it is under the "bounded" stage.

Note that we can not simply re-use the existing "boundedness" attribute of
source operators. The reason is that for sources such as MySQL CDC, its
boundedness can be "continuous_unbounded" because it can run continuously.
But MySQL CDC has two phases internally, where the source needs to first
read a snapshot (with bounded amount of data) and then read a binlog (with
unbounded amount of data).

As a result, in order to support optimization for souces like MySQL CDC, we
need to expose an API for the source operator to declare whether it is
running at a bounded or continuous_unbounded stage. *This introduces the
need to define a new concept named "bounded stage".*

Then, we will need to *introduce a new contract between source operators
and the Flink runtime*, saying that if there is a source that claims it is
running at the bounded stage, then Flink will use the "
execution.checkpoint.interval.bounded" as the checkpointing interval.

Here are the the concerns I have with this approach:

- The execution.checkpoint.interval.bounded is a top-level config, meaning
that every Flink user needs to read about its semantics. In comparison, the
proposed approach only requires users of specific sources (e.g.
HybridSource, MySQL CDC) to know the new source-specific config.

- It introduces a new top-level concept in Flink to describe the internal
stages of specific sources (e.g. MySQL CDC). In comparison, the proposed
approach only requires users of specific sources (e.g. HybridSource, MySQL
CDC) to know this concept, which not only makes the explanation much
simpler (since they are already using the specific sources), but also
limits the scope of this new concept (only these users need to know this
concept).

- It is harder to understand the existing config execution.checkpoint.interval.
Because we need to explain that it is only used when there is no source
with "bounded stage", introducing more if-else for this config. In
comparison, with the proposed approach, the semantics of
execution.checkpoint.interval is simpler without if/else, as it will always
be applied regardless which sources users are using.

I am happy to discuss if there are better approaches.

Thanks,
Dong


On Wed, May 24, 2023 at 8:23 AM Jing Ge <j...@ververica.com.invalid> wrote:

> Hi Yunfeng, Hi Dong
>
> Thanks for the informative discussion! It is a rational requirement to set
> different checkpoint intervals for different sources in a hybridSource. The
> tiny downside of this proposal, at least for me, is that I have to
> understand the upper-bound definition of the interval and the built-in rule
> for Flink to choose the minimum value between it and the default interval
> setting. However, afaiac, the intention of this built-in rule is to
> minimize changes in Flink to support the request feature which is a very
> thoughtful move. Thanks for taking care of it. +1 for the Proposal.
>
> Another very rough idea was rising in my mind while I was reading the FLIP.
> I didn't do a deep dive with related source code yet, so please correct me
> if I am wrong. The use case shows that two different checkpoint intervals
> should be set for bounded(historical) stream and unbounded(fresh real-time)
> stream sources. It is a trade-off between throughput and latency, i.e.
> bounded stream with large checkpoint interval for better throughput and
> unbounded stream with small checkpoint interval for lower latency (in case
> of failover). As we could see that the different interval setting depends
> on the boundedness of streams. Since the Source API already has its own
> boundedness flag[1], is it possible to define two interval configurations
> and let Flink automatically set the related one to the source based on the
> known boundedness? The interval for bounded stream could be like
> execution.checkpoint.interval.bounded(naming could be reconsidered), and
> the other one for unbounded stream, we could use the existing one
> execution.checkpoint.interval by default, or introduce a new one like
> execution.checkpoint.interval.unbounded. In this way, no API change is
> required.
>
> @Piotr
> Just out of curiosity, do you know any real use cases where real-time data
> is processed before the backlog? Semantically, the backlog contains
> historical data that has to be processed before the real-time data is
> allowed to be processed. Otherwise, up-to-date data will be overwritten by
> out-of-date data which turns out to be unexpected results in real business
> scenarios.
>
>
> Best regards,
> Jing
>
> [1]
>
> https://github.com/apache/flink/blob/fadde2a378aac4293676944dd513291919a481e3/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java#L41
>
> On Tue, May 23, 2023 at 5:53 PM Dong Lin <lindon...@gmail.com> wrote:
>
> > Hi Piotr,
> >
> > Thanks for the comments. Let me try to understand your concerns and
> > hopefully address the concerns.
> >
> > >> What would happen if there are two (or more) operator coordinators
> with
> > conflicting desired checkpoint trigger behaviour
> >
> > With the proposed change, there won't exist any "*conflicting* desired
> > checkpoint trigger" by definition. Both job-level config and the proposed
> > API upperBoundCheckpointingInterval() means the upper-bound of the
> > checkpointing interval. If there are different upper-bounds proposed by
> > different source operators and the job-level config, Flink will try to
> > periodically trigger checkpoints at the interval corresponding to the
> > minimum of all these proposed upper-bounds.
> >
> > >> If one source is processing a backlog and the other is already
> > processing real time data..
> >
> > Overall, I am not sure we always want to have a longer checkpointing
> > interval. That really depends on the specific use-case and the job graph.
> >
> > The proposed API change mechanism for operators and users to specify
> > different checkpoint intervals at different periods of the job. Users
> have
> > the option to use the new API to get better performance in the use-case
> > specified in the motivation section. I believe there can be use-case
> where
> > the proposed API is not useful, in which case users can choose not to use
> > the API without incurring any performance regression.
> >
> > >> it might be a bit confusing and not user friendly to have multiple
> > places that can override the checkpointing behaviour in a different way
> >
> > Admittedly, adding more APIs always incur more complexity. But sometimes
> we
> > have to incur this complexity to address new use-cases. Maybe we can see
> if
> > there are more user-friendly way to address this use-case.
> >
> > >> already implemented and is simple from the perspective of Flink
> >
> > Do you mean that the HybridSource operator should invoke the rest API to
> > trigger checkpoints? The downside of this approach is that it makes it
> hard
> > for developers of source operators (e.g. MySQL CDC, HybridSource) to
> > address the target use-case. AFAIK, there is no existing case where we
> > require operator developers to use REST API to do their job.
> >
> > Can you help explain the benefit of using REST API over using the
> proposed
> > API?
> >
> > Note that this approach also seems to have the same downside mentioned
> > above: "multiple places that can override the checkpointing behaviour". I
> > am not sure there can be a solution to address the target use-case
> without
> > having multiple places that can affect the checkpointing behavior.
> >
> > >> check if `pendingRecords` for some source has exceeded the configured
> > threshold and based on that adjust the checkpointing interval accordingly
> >
> > I am not sure this approach can address the target use-case in a better
> > way. In the target use-case, we would like to HybridSource to trigger
> > checkpoint more frequently when it is read the Kafka Source (than when it
> > is reading the HDFS source). We would need to set a flag for the
> checkpoint
> > trigger to know which source the HybridSource is reading from. But IMO
> the
> > approach is less intuitive and more complex than having the HybridSource
> > invoke upperBoundCheckpointingInterval() directly once it is reading
> Kafka
> > Source.
> >
> > Maybe I did not understand the alternative approach rightly. I am happy
> to
> > discuss more on this topic. WDYT?
> >
> >
> > Best,
> > Dong
> >
> > On Tue, May 23, 2023 at 10:27 PM Piotr Nowojski <pnowoj...@apache.org>
> > wrote:
> >
> > > Hi,
> > >
> > > Thanks for the proposal. However, are you sure that the
> > > OperatorCoordinator is the right place to place such logic? What would
> > > happen if there are two (or more) operator coordinators with
> conflicting
> > > desired checkpoint trigger behaviour? If one source is processing a
> > backlog
> > > and the other is already processing real time data, I would assume that
> > in
> > > most use cases you would like to still have the longer checkpointing
> > > interval, not the shorter one. Also apart from that, it might be a bit
> > > confusing and not user friendly to have multiple places that can
> override
> > > the checkpointing behaviour in a different way.
> > >
> > > FIY in the past, we had some discussions about similar requests and
> back
> > > then we chose to keep the system simpler, and exposed a more generic
> REST
> > > API checkpoint triggering mechanism. I know that having to implement
> such
> > > logic outside of Flink and having to call REST calls to trigger
> > checkpoints
> > > might not be ideal, but that's already implemented and is simple from
> the
> > > perspective of Flink.
> > >
> > > I don't know, maybe instead of adding this logic to operator
> > coordinators,
> > > `CheckpointCoordinator` should have a pluggable `CheckpointTrigger`,
> that
> > > the user could configure like a `MetricReporter`. The default one would
> > be
> > > just periodically triggering checkpoints. Maybe
> > > `BacklogDynamicCheckpointTrigger` could look at metrics[1], check if
> > > `pendingRecords` for some source has exceeded the configured threshold
> > and
> > > based on that adjust the checkpointing interval accordingly? This would
> > at
> > > least address some of my concerns.
> > >
> > > WDYT?
> > >
> > > Best,
> > > Piotrek
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > >
> > > wt., 9 maj 2023 o 19:11 Yunfeng Zhou <flink.zhouyunf...@gmail.com>
> > > napisał(a):
> > >
> > >> Hi all,
> > >>
> > >> Dong(cc'ed) and I are opening this thread to discuss our proposal to
> > >> support dynamically triggering checkpoints from operators, which has
> > >> been documented in FLIP-309
> > >> <
> > >>
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255069517
> > >> >.
> > >>
> > >> With the help of the ability proposed in this FLIP, users could
> > >> improve the performance of their Flink job in cases like when the job
> > >> needs to process both historical batch data and real-time streaming
> > >> data, by adjusting the checkpoint triggerings in different phases of a
> > >> HybridSource or CDC source.
> > >>
> > >> This proposal would be a fundamental component in the effort to
> > >> further unify Flink's batch and stream processing ability. Please feel
> > >> free to reply to this email thread and share with us your opinions.
> > >>
> > >> Best regards.
> > >>
> > >> Dong and Yunfeng
> > >>
> > >
> >
>

Reply via email to