Hi Dong,
Thanks for the quick reply and for clarification, yeah that makes sense!
Best Regards,
Ahmed Hamdy

On Fri, 2 Jun 2023 at 02:59, Dong Lin <lindon...@gmail.com> wrote:

> Hi Ahmed,
>
> Thanks for the comments.
>
> I agree with you and Piotr that it would be useful to provide a more
> generic approach to address more use-case in one proposal. On the other
> hand, I also think it is important to make sure that the alternative (more
> generic) approach can indeed address the extra use-cases reliably as
> expected. Then we can compare the pros/cons of these approaches and make
> the best choice for Flink users.
>
> If I understand your question correctly, you are asking whether it would be
> better to replace upperBoundCheckpointingIntervalForLastSource() with an
> API on the source/operator interface.
>
> The short answer is probably no. This is because the expected users of the
> API *HybridSourceBuilder#upperBoundCheckpointingIntervalForLastSource*()
> are end-users who use Flink API and connector API to develop Flink job. We
> probably don't want end-users to directly use the source/operator
> interface, which is generally more complicated and intended to be used by
> developers of source operators.
>
> FLIP-309 currently proposes to add the API
> *SplitEnumeratorContext#upperBoundCheckpointingInterval* for developers of
> source operators (e.g. HybridSource, MySQL CDC source) to upper-bound
> checkpointing interval. Are you suggesting that we should replace this API
> with a config on the source or operator constructor?
>
> This approach probably works for HybridSource. But I am not sure it works
> for MySQL CDC Source (which is also mentioned in the latest FLIP-309
> motivation section), which is implemented as one source operator rather
> than multiple source operators (which HybridSource does). And we need to
> enable the new checkpointing interval in the middle of this source
> operator's execution.
>
> If I misunderstood your suggestion, can you provide more details regarding
> the proposed API and explain its benefits?
>
> Best,
> Dong
>
>
>
> On Fri, Jun 2, 2023 at 2:12 AM Ahmed Hamdy <hamdy10...@gmail.com> wrote:
>
> > Hi Dong,
> > Thanks for the great proposal.
> > The thread is very intuitive along with suggestions from Jing and Piotr.
> > As much as I like the simplicity of the proposed approach I think a much
> > wider benefit is achieved by taking a more generic approach similar to
> > Piotr's suggestion of having a `CheckpointTrigger`. I think this even
> > solidifies the argument you are discussing
> > >  On the other hand, the approach currently proposed in the FLIP is much
> > simpler as it does not depend on backpressure. Users specify the extra
> > interval requirement on the specific sources (e.g. HybridSource, MySQL
> CDC
> > Source) and can easily know the checkpointing interval will be used on
> the
> > continuous phase of the corresponding source.
> >
> > where the base HybridSource can use a `CheckpointTrigger` that doesn't
> > depend on backpressure.
> >
> >
> >
> >
> > I have a couple of questions for clarification.
> >
> > @Dong
> > Do you think in the solution in FLIP 309, instead of using
> > ```
> > /**
> >          * Upper-bound the checkpointing interval when the last source
> > added right before this
> >          * method invocation is reading data.
> >          */
> >         public <ToEnumT extends SplitEnumerator, NextSourceT extends
> > Source<T,
> > ?, ?>>
> >                 HybridSourceBuilder<T, ToEnumT>
> > upperBoundCheckpointingIntervalForLastSource(
> >                         Duration duration) {
> >             ...
> >         }
> > ```
> >
> > We can have an upperBoundCheckpointingInterval configured in the Source
> > Interface, or even better in the Operator one.
> > then we can easily implement the one for HybridSource by relying on
> > delegation to the `currentReader`.
> >
> >
> > @Piotr
> >
> > Regarding the more general approach of adjusting based on generic
> > triggers/backpressure metrics. I saw you mentioned the resemblance with
> > FLIP-271,
> > Do you think it is worth going with the REST API proposal for dynamically
> > configuring the interval hence the trigger logic could be implemented on
> > Flink or external systems like Flink Kubernetes Operator?
> > Wdyt? I think the REST API proposal here sounds more and more
> interesting.
> >
> >
> > Best Regards,
> > Ahmed Hamdy
> >
> >
> > On Wed, 31 May 2023 at 07:59, Dong Lin <lindon...@gmail.com> wrote:
> >
> > > Hi Piotr,
> > >
> > > Thanks for the reply. Please see my comments inline.
> > >
> > > On Wed, May 31, 2023 at 12:58 AM Piotr Nowojski <pnowoj...@apache.org>
> > > wrote:
> > >
> > > > Hi Dong,
> > > >
> > > > First of all we don't need to send any extra signal from source (or
> non
> > > > source) operators. All of the operators are already reporting
> > > backpressured
> > > > metrics [1]
> > > > and all of the metrics are already sent to JobManager. We would only
> > need
> > > >
> > >
> > > Hmm... I am not sure metrics such as isBackPressured are already sent
> to
> > > JM. According to the doc
> > > <
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#io
> > > >,
> > > this metric is only available on TaskManager. And I could't find the
> code
> > > that sends these metrics to JM. Can you help provide link to the code
> and
> > > doc that shows this metric is reported to JM.
> > >
> > > Suppose this metric is indeed reported to JM, we also need to confirm
> > that
> > > the frequency meets our need. For example, typically metrics are
> updated
> > on
> > > the order of seconds. The default metric reporter interval (as
> specified
> > in
> > > MetricOptions) is 10 seconds, which is probably not sufficient for the
> > > suggested approach to work reliably. This is because the longer the
> > > interval, the more likely that the algorithm will not trigger
> checkpoint
> > > using the short interval even if all subtasks are not-backpressured.
> > >
> > > For example, let's say every source operator subtask reports this
> metric
> > to
> > > JM once every 10 seconds. There are 100 source subtasks. And each
> subtask
> > > is backpressured roughly 10% of the total time due to traffic spikes
> (and
> > > limited buffer). Then at any given time, there are 1 - 0.9^100 =
> 99.997%
> > > chance that there is at least one subtask that is backpressured. Then
> we
> > > have to wait for at least 10 seconds to check again. The expected
> > > checkpointing interval can be very close to 30 minutes in the use-case
> > > mentioned earlier.
> > >
> > > to pass some accessor to the metrics to the `CheckpointTrigger`.
> > >
> > >
> > > > > execution.checkpointing.interval.no-backpressure
> > > >
> > > > Maybe that's the way to go, but as I mentioned before, I could see
> this
> > > > `CheckpointTrigger` to be a pluggable component, that could have been
> > > > configured
> > > > the same way as `MetricReporters` are right now [2]. We could just
> > > provide
> > > > out of the box two plugins, one implementing current checkpoint
> > > triggering
> > > > strategy,
> > > > and the other using backpressure.
> > > >
> > >
> > > Yes, it is possible to add a CheckpointTrigger as a pluggable
> component.
> > I
> > > am open to this idea as long as it provides benefits over the job-level
> > > config (e.g. covers more use-case, or simpler configuration for
> > > common-case).
> > >
> > > I think we can decide how to let user specify this interval after we
> are
> > > able to address the other issues related to the feasibility and
> > reliability
> > > of the suggested approach.
> > >
> > >
> > > > > I think the root cause of this issue is that the decision of the
> > > > > checkpointing interval really depends on the expected impact of a
> > > > > checkpoint on the throughput.
> > > >
> > > > Yes, I agree. Ideally we probably should adjust the checkpointing
> > > interval
> > > > based on measured latency, for example using latency markers [3], but
> > > that
> > > > would
> > > > require some investigation if latency markers are indeed that costly
> as
> > > > documented and if so optimizing them to solve the performance
> > degradation
> > > > of enabling
> > > > e2e latency tracking.
> > >
> > >
> > > > However, given that the new back pressure monitoring strategy would
> be
> > > > optional AND users could implement their own `CheckpointTrigger` if
> > > really
> > > > needed
> > > > AND I have a feeling that there might be an even better solution
> (more
> > > > about that later).
> > > >
> > >
> > > Overall I guess you are suggesting that 1) we can optimize the overhead
> > of
> > > latency tracking so that we can always turn it on and 2) we can use the
> > > measured latency to dynamically determine checkpointing interval.
> > >
> > > I can understand this intuition. Still, the devil is in the details.
> > After
> > > thinking more about this, I am not sure I can find a good way to make
> it
> > > work. I am happy to discuss proc/cons if you provide more concrete
> > > solutions.
> > >
> > > Note that goals of the alternative approach include 1) support sources
> > > other than HybridSource and 2) reduce checkpointing interval when the
> job
> > > is backpressured. These goals are not necessary to achieve the use-case
> > > targed by FLIP-309. While it will be nice to support additional
> use-cases
> > > with one proposal, it is probably also reasonable to make incremental
> > > progress and support the low-hanging-fruit use-case first. The choice
> > > really depends on the complexity and the importance of supporting the
> > extra
> > > use-cases.
> > >
> > > I am hoping that we can still be open to using the approach proposed in
> > > FLIP-309 and we can not make the alternative approach work. What do you
> > > think?
> > >
> > >
> > > > > if the checkpointing overhead is
> > > > > close to none, then it is beneficial to the e2e latency to still
> > > > checkpoint
> > > > > a high frequency even if there exists (intermittent) backpressure.
> > > >
> > > > In that case users could just configure a slow checkpointing interval
> > to
> > > a
> > > > lower value, or just use static checkpoint interval strategy.
> > > >
> > >
> > > I guess the point is that the suggested approach, which dynamically
> > > determines the checkpointing interval based on the backpressure, may
> > cause
> > > regression when the checkpointing interval is relatively low. This
> makes
> > it
> > > hard for users to enable this feature in production. It is like an
> > > auto-driving system that is not guaranteed to work
> > >
> > > On the other hand, the approach currently proposed in the FLIP is much
> > > simpler as it does not depend on backpressure. Users specify the extra
> > > interval requirement on the specific sources (e.g. HybridSource, MySQL
> > CDC
> > > Source) and can easily know the checkpointing interval will be used on
> > the
> > > continuous phase of the corresponding source. This is pretty much same
> as
> > > how users use the existing execution.checkpointing.interval config. So
> > > there is no extra concern of regression caused by this approach.
> > >
> > >
> > > >
> > > > > With the suggested approach, the e2e latency introduced by Flink is
> > > > roughly
> > > > > 72 seconds. This is because it takes 1 minute for 11MBps phase to
> > end,
> > > > and
> > > > > another 12 seconds for the accumulated backlog to be cleared. And
> > Flink
> > > > can
> > > > > not do checkpoint before the backlog is cleared.
> > > >
> > > > Indeed that's a valid concern. After thinking more about this issue,
> > > maybe
> > > > the proper solution would be to calculate "how much overloaded is the
> > > most
> > > > overloaded subtask".
> > > > In this case, that would be 10% (we are trying to push 110% of the
> > > > available capacity in the current job/cluster). Then we could use
> that
> > > > number as some kind of weighted average.
> > > > We could figure out a function mapping the overload percentage, into
> a
> > > > floating point number from range [0, 1]
> > > >
> > > > f(overload_factor) = weight // weight is from [0, 1]
> > > >
> > > > and then the desired checkpoint interval would be something like
> > > >
> > > > (1 - weight) * fastCheckpointInterval + weight *
> slowCheckpointInterval
> > > >
> > > > In your problematic example, we would like the weight to be pretty
> > small
> > > > (<10%?), so the calculated checkpoint interval would be pretty close
> to
> > > the
> > > > fastCheckpointInterval.
> > > >
> > >
> > > Hmm... I am not sure it will always be pretty close to the
> > > fastCheckpointInterval. We can discuss when there is concrete
> definition
> > of
> > > this algorithm.
> > >
> > > While each source subtask can measure its current throughput, I am not
> > sure
> > > it can measure the "input throughput", which is defined as the
> throughput
> > > when the subtask (and it downstream operators) as the unlimited
> > processing
> > > capacity. Therefore, it seems pretty hard to determine the
> > > "overload_factor" timely and accurately.
> > >
> > >
> > > > The overload factor we could calculate the same way as FLIP-271 is
> > > > calculating how much should we rescale given operator [4].
> > > >
> > > > I can think about this more and elaborate/refine this idea tomorrow.
> > > >
> > >
> > > Sounds good. Looking forward to learning more ideas.
> > >
> > > Best,
> > > Dong
> > >
> > >
> > > >
> > > > Best,
> > > > Piotrek
> > > >
> > > >
> > > > [1]
> > > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#io
> > > > [2]
> > > >
> > > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/metric_reporters/
> > > > [3]
> > > >
> > > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#end-to-end-latency-tracking
> > > > [4]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-271%3A+Autoscaling
> > > >
> > > > wt., 30 maj 2023 o 13:58 Dong Lin <lindon...@gmail.com> napisał(a):
> > > >
> > > > > Hi Piotr,
> > > > >
> > > > > Thank you for providing those details.
> > > > >
> > > > > I understand you suggested using the existing "isBackPressured"
> > signal
> > > to
> > > > > determine whether we should use the less frequent checkpointing
> > > > interval. I
> > > > > followed your thoughts and tried to make it work. Below are the
> > issues
> > > > that
> > > > > I am not able to address. Can you see if there is a way to address
> > > these
> > > > > issues?
> > > > >
> > > > > Let's will use the following use-case to make the discussion more
> > > > concrete:
> > > > > a) Users want to checkpoint at least once every 30 minutes to
> > > upper-bound
> > > > > the amount of duplicate work after job failover.
> > > > > b) Users want to checkpoint at least once every 30 seconds to
> > > > > upper-bound *extra
> > > > > e2e lag introduced by the Flink job* during the continuous
> processing
> > > > > phase.
> > > > >
> > > > > The suggested approach is designed to do this:
> > > > > - If any of the source subtasks is backpressured, the job will
> > > checkpoint
> > > > > at 30-minutes interval.
> > > > > - If none of the source subtasks is backpressured, the job will
> > > > checkpoint
> > > > > at 30-seconds interval.
> > > > >
> > > > > And we would need to add the following public APIs to implement
> this
> > > > > approach:
> > > > > - Add a job level config, maybe
> > > > > execution.checkpointing.interval.no-backpressure. This is the
> > > > checkpointing
> > > > > interval when none of the source subtasks is backpressured.
> > > > > - Add a public API for source operator subtasks to report their
> > > > > backpressure status to the checkpointing coordinator. The subtask
> > > should
> > > > > invoke this API whenever its backpressure status changed.
> > > > >
> > > > > Now, in order to make the suggested approach work for all users
> (i.e.
> > > no
> > > > > regression), we need to make sure that whenever we use the
> 30-minutes
> > > > > checkpointing interval, the e2e latency will be less than or equal
> to
> > > the
> > > > > case where we use the 30-seconds checkpointing interval.
> > > > >
> > > > > I thought about this in detail, and found the following fabricated
> > > > > scenarios where this approach might cause regression:
> > > > >
> > > > > During the continuous processing phase, the input throughput is
> 5MBps
> > > > for 1
> > > > > minute, and 11MBps for 1 minutes, in lock-steps. The maximum
> > throughput
> > > > > achievable by this job is 10Mbps. For simplicity, suppose the
> buffer
> > > size
> > > > > can hold roughly 1 second worth-of-data, then the job is
> > > > > backpressured roughly 1 minutes out of every 2 minutes.
> > > > >
> > > > > With the suggested approach, the e2e latency introduced by Flink is
> > > > roughly
> > > > > 72 seconds. This is because it takes 1 minute for 11MBps phase to
> > end,
> > > > and
> > > > > another 12 seconds for the accumulated backlog to be cleared. And
> > Flink
> > > > can
> > > > > not do checkpoint before the backlog is cleared.
> > > > >
> > > > > On the other hand, if we continue to checkpoint at 30-seconds
> > interval,
> > > > the
> > > > > e2e latency introduced by Flink is at most 42 seconds, plus the
> extra
> > > > delay
> > > > > introduced by the checkpoint overhead. The e2e latency will be
> better
> > > > than
> > > > > the suggested approach, if the impact of the checkpoint is less
> than
> > 30
> > > > > seconds.
> > > > >
> > > > > I think the root cause of this issue is that the decision of the
> > > > > checkpointing interval really depends on the expected impact of a
> > > > > checkpoint on the throughput. For example, if the checkpointing
> > > overhead
> > > > is
> > > > > close to none, then it is beneficial to the e2e latency to still
> > > > checkpoint
> > > > > a high frequency even if there exists (intermittent) backpressure.
> > > > >
> > > > > Here is another fabricated use-case where the suggested approach
> > might
> > > > > cause regression. Let's say user's job is
> > > > > *hybridSource.keyBy(...).transform(operatorA).sinkTo(PaimonSink)*.
> > The
> > > > > parallelism is 2. As we can see, there is all-to-all edge between
> > > source
> > > > > and operatorA. And due to limited resources (e.g. buffer), at any
> > given
> > > > > time, each operatorA subtask can only process data from one of its
> > > > upstream
> > > > > subtask at a time, meaning that the other upstream subtask will be
> > > > > backpressured. So there might always be at least one source subtask
> > > that
> > > > is
> > > > > backpressured even though the job's throughput can catch up with
> the
> > > > input
> > > > > throughput. However, the suggested approach might end up always
> using
> > > the
> > > > > less frequent checkpointing interval in this case.
> > > > >
> > > > > Suppose we can find a way to address the above issues, another
> issue
> > > with
> > > > > the suggested approach is the extra communication overhead between
> > the
> > > > > source operator subtasks and the checkpointing coordinator. The
> > source
> > > > > subtask needs to send a message to checkpointing coordinator
> whenever
> > > its
> > > > > backpressure status changes. The more frequently we check (e.g.
> once
> > > > every
> > > > > 10 ms), the larger the overhead. And if we check not so frequently
> > > (e.g.
> > > > > once every second), we might be more vulnerable to
> random/occasional
> > > > > backpressure. So there seems to be tradeoff between the reliability
> > and
> > > > the
> > > > > cost of this approach.
> > > > >
> > > > > Thanks again for the suggestion. I am looking forward to your
> > comments.
> > > > >
> > > > > Best,
> > > > > Dong
> > > > >
> > > > >
> > > > > On Tue, May 30, 2023 at 4:37 PM Piotr Nowojski <
> pnowoj...@apache.org
> > >
> > > > > wrote:
> > > > >
> > > > > > Hi again,
> > > > > >
> > > > > > Thanks Dong, yes I think your concerns are valid, and that's why
> I
> > > have
> > > > > > previously refined my idea to use one of the backpressure
> measuring
> > > > > metrics
> > > > > > that we already have.
> > > > > > Either simply `isBackPressured == true` check [1], or
> > > > > > `backPressuredTimeMsPerSecond >= N` (where `N ~= 990`) [2]. That
> > > would
> > > > > > address your three first concerns:
> > > > > >   - lack of event time
> > > > > >   - event time unreliability
> > > > > >   - lack of universal threshold value for `pendingRecords`
> > > > > >
> > > > > > In a bit more detail, we probably should check (using [1] or [2])
> > > > either:
> > > > > >   a) if any of the source subtasks is backpressured
> > > > > >   b) if any of the subtasks is backpressured
> > > > > >
> > > > > > In most cases a == b. The only time when that's not true, if some
> > > > > windowed
> > > > > > operator in the middle of the job graph started triggering so
> many
> > > > > results
> > > > > > that it became backpressured,
> > > > > > but the backpressure didn't last long enough to propagate to
> > sources.
> > > > For
> > > > > > example that especially might occur if sources are idle. So
> > probably
> > > b)
> > > > > is
> > > > > > a better and more generic option.
> > > > > >
> > > > > > Regarding your last concern, with spiky traffic, I think the
> > > following
> > > > > > algorithm of triggering checkpoints would work pretty well:
> > > > > >
> > > > > > public BackpressureDetectingCheckpointTrigger {
> > > > > >
> > > > > > private long lastCheckpointTs = System.currentTimeMillis();
> > > > > > private long slowCheckpointInterval = ...;
> > > > > > private long fastCheckpointInteveral = ...;
> > > > > >
> > > > > > //code executed periodically, for example once a second, once
> every
> > > > 10ms,
> > > > > > or at the 1/10th of the fast checkpoint interval
> > > > > > void maybeTriggerCheckpoint(...) {
> > > > > >
> > > > > >   long nextCheckpointTs = lastCheckpointTs;
> > > > > >   if (isAnySubtaskBackpressured()) {
> > > > > >     nextCheckpointTs += slowCheckpointInterval;
> > > > > >   }
> > > > > >   else {
> > > > > >       nextCheckpointTs += fastCheckpointInterval;
> > > > > >   }
> > > > > >
> > > > > >   if (nextCheckpointTs >= System.currentTimeMillis()) {
> > > > > >     triggerCheckpoint();
> > > > > >     lastCheckpointTs = System.currentTimeMillis();
> > > > > >   }
> > > > > > }
> > > > > > }
> > > > > >
> > > > > > This way, if there is a spike of backpressure, it doesn't matter
> > that
> > > > > much.
> > > > > > If the backpressure goes away until the next iteration, the next
> > > check
> > > > > will
> > > > > > trigger a checkpoint according to the
> > > > > > fast interval. The slow checkpoint interval will be used only if
> > the
> > > > > > backpressure persists for the whole duration of the
> > > > > slowCheckpointInterval.
> > > > > >
> > > > > > We could also go a little bit more fancy, and instead of using
> only
> > > > fast
> > > > > or
> > > > > > slow intervals, we could use a continuous spectrum to gradually
> > > adjust
> > > > > the
> > > > > > interval, by replacing the first if/else
> > > > > > check with a weighted average:
> > > > > >
> > > > > >   int maxBackPressureTime =
> > > > getSubtaskMaxBackPressuredTimeMsPerSecond();
> > > > > >   long nextCheckpointTs = lastCheckpointTs +
> > slowCheckpointInterval *
> > > > > > maxBackPressureTime + fastCheckpointInterval * (1000 -
> > > > > > maxBackPressureTime);
> > > > > >
> > > > > > This would further eliminate some potential jitter and make the
> > > actual
> > > > > > checkpoint interval a bit more predictable.
> > > > > >
> > > > > > Best,
> > > > > > Piotrek
> > > > > >
> > > > > >
> > > > > > wt., 30 maj 2023 o 04:40 Dong Lin <lindon...@gmail.com>
> > napisał(a):
> > > > > >
> > > > > > > Let me correct the typo in the last paragraph as below:
> > > > > > >
> > > > > > > To make the problem even harder, the incoming traffic can be
> > spiky.
> > > > And
> > > > > > the
> > > > > > > overhead of triggering checkpointing can be relatively low, in
> > > which
> > > > > case
> > > > > > > it might be more performant (w.r.t. e2e lag) for the Flink job
> to
> > > > > > > checkpoint at the more frequent interval in the continuous
> phase
> > in
> > > > > face
> > > > > > of
> > > > > > > a spike in the number of pending records buffered in the source
> > > > > operator.
> > > > > > >
> > > > > > >
> > > > > > > On Tue, May 30, 2023 at 9:17 AM Dong Lin <lindon...@gmail.com>
> > > > wrote:
> > > > > > >
> > > > > > > > Hi Piotrek,
> > > > > > > >
> > > > > > > > Thanks for providing more details of the alternative
> approach!
> > > > > > > >
> > > > > > > > If I understand your proposal correctly, here are the
> > > requirements
> > > > > for
> > > > > > it
> > > > > > > > to work without incurring any regression:
> > > > > > > >
> > > > > > > > 1) The source needs a way to determine whether there exists
> > > > > > backpressure.
> > > > > > > > 2) If there is backpressure, then it means e2e latency is
> > already
> > > > > high
> > > > > > > > and there should be no harm to use the less frequent
> > > checkpointing
> > > > > > > interval.
> > > > > > > > 3) The configuration of the "less frequent checkpointing
> > > interval"
> > > > > > needs
> > > > > > > > to be a job-level config so that it works for sources other
> > than
> > > > > > > > HybridSource.
> > > > > > > >
> > > > > > > > I would say that if we can find a way for the source to
> > determine
> > > > the
> > > > > > > > "existence of backpressure" and meet the requirement 2), it
> > would
> > > > > > indeed
> > > > > > > be
> > > > > > > > a much more elegant approach that solves more use-cases.
> > > > > > > >
> > > > > > > > The devil is in the details. I am not sure how to determine
> the
> > > > > > > "existence
> > > > > > > > of backpressure". Let me explain my thoughts and maybe you
> can
> > > help
> > > > > > > > provide the answers.
> > > > > > > >
> > > > > > > > To make the discussion more concrete, let's say the input
> > records
> > > > do
> > > > > > not
> > > > > > > > have event timestamps. Users want to checkpoint at least once
> > > every
> > > > > 30
> > > > > > > > minutes to upper-bound the amount of duplicate work after job
> > > > > failover.
> > > > > > > And
> > > > > > > > users want to checkpoint at least once every 30 seconds to
> > > > > upper-bound
> > > > > > > *extra
> > > > > > > > e2e lag introduced by the Flink job* during the continuous
> > > > processing
> > > > > > > > phase.
> > > > > > > >
> > > > > > > > Since the input records do not have event timestamps, we can
> > not
> > > > rely
> > > > > > on
> > > > > > > > metrics such as currentFetchEventTimeLag [1] to determine the
> > > > > absolute
> > > > > > > e2e
> > > > > > > > lag, because currentFetchEventTimeLag depends on the
> existence
> > of
> > > > > event
> > > > > > > > timestamps.
> > > > > > > >
> > > > > > > > Also note that, even if the input records have event
> timestamps
> > > and
> > > > > we
> > > > > > > can
> > > > > > > > measure currentFetchEventTimeLag, we still need a threshold
> to
> > > > > > determine
> > > > > > > > whether the value of currentFetchEventTimeLag is too high.
> One
> > > idea
> > > > > > might
> > > > > > > > be to use the user-specified "less frequent checkpointing
> > > interval"
> > > > > as
> > > > > > > > this threshold, which in this case is 30 seconds. But this
> > > approach
> > > > > can
> > > > > > > > also cause regression. For example, let's say the records go
> > > > through
> > > > > > > > several Kafka/MirrorMaker pipelines after it is generated and
> > > > before
> > > > > it
> > > > > > > is
> > > > > > > > received by Flink, causing its currentFetchEventTimeLag to be
> > > > always
> > > > > > > higher
> > > > > > > > than 30 seconds. Then Flink will end up always using the
> "less
> > > > > frequent
> > > > > > > > checkpointing interval" in the continuous phase, which in
> this
> > > case
> > > > > is
> > > > > > 30
> > > > > > > > minutes.
> > > > > > > >
> > > > > > > > Other options to determine the "existence of backpressure"
> > > includes
> > > > > > using
> > > > > > > > the absolute number of records in the source storage system
> > that
> > > > are
> > > > > > > > waiting to be fetched (e.g. pendingRecords [1]), or using the
> > > > > absolute
> > > > > > > > number of buffered records in the source output queue.
> > However, I
> > > > > find
> > > > > > it
> > > > > > > > hard to reliably determine "e2e latency is already high"
> based
> > on
> > > > the
> > > > > > > > absolute number of records. What threshold should we choose
> to
> > > > > > determine
> > > > > > > > that the number of pending records is too many (and it is
> safe
> > to
> > > > > > > increase
> > > > > > > > the checkpointing interval)?
> > > > > > > >
> > > > > > > > To make the problem even harder, the incoming traffic can be
> > > spiky.
> > > > > And
> > > > > > > > the overhead of triggering checkpointing can be relative low,
> > in
> > > > > which
> > > > > > > case
> > > > > > > > it might be more performance (w.r.t. e2e lag) for the Flink
> job
> > > to
> > > > > > > > checkpoint at the higher interval in the continuous phase in
> > face
> > > > of
> > > > > a
> > > > > > > > spike in the number of pending records buffered in the source
> > > > > operator.
> > > > > > > >
> > > > > > > > The problems described above are the main reasons that I can
> > not
> > > > > find a
> > > > > > > > way to make the alternative approach work. Any thoughts?
> > > > > > > >
> > > > > > > > [1]
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > > > > > > >
> > > > > > > >
> > > > > > > > On Mon, May 29, 2023 at 11:23 PM Piotr Nowojski <
> > > > > pnowoj...@apache.org>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > >> Hi
> > > > > > > >>
> > > > > > > >> @Jing
> > > > > > > >>
> > > > > > > >> > Your proposal to dynamically adjust the checkpoint
> intervals
> > > is
> > > > > > > elegant!
> > > > > > > >> It
> > > > > > > >> > makes sense to build it as a generic feature in Flink.
> > Looking
> > > > > > forward
> > > > > > > >> to
> > > > > > > >> > it. However, for some user cases, e.g. when users were
> aware
> > > of
> > > > > the
> > > > > > > >> bounded
> > > > > > > >> > sources (in the HybridSource) and care more about the
> > > > throughput,
> > > > > > the
> > > > > > > >> > dynamic adjustment might not be required. Just let those
> > > bounded
> > > > > > > sources
> > > > > > > >> > always have larger checkpoint intervals even when there is
> > no
> > > > back
> > > > > > > >> > pressure. Because no one cares about latency in this case,
> > > let's
> > > > > > turn
> > > > > > > >> off
> > > > > > > >> > the dynamic adjustment, reduce the checkpoint frequency,
> > have
> > > > > better
> > > > > > > >> > throughput, and save unnecessary source consumption. Did I
> > > miss
> > > > > > > anything
> > > > > > > >> > here?
> > > > > > > >>
> > > > > > > >> But why do we need to have two separate mechanisms, if the
> > > dynamic
> > > > > > > >> adjustment based on the backpressure/backlog would
> > > > > > > >> achieve basically the same goal as your proposal and would
> > solve
> > > > > both
> > > > > > of
> > > > > > > >> the problems? Having two independent solutions
> > > > > > > >> in the same codebase, in the docs, that are achieving
> > basically
> > > > the
> > > > > > same
> > > > > > > >> thing is far from ideal. It would increase both the
> > > > > > > >> complexity of the system and confuse potential users.
> > > > > > > >>
> > > > > > > >> Moreover, as I have already mentioned before, I don't like
> the
> > > > > current
> > > > > > > >> proposal as it's focusing ONLY on the HybridSource,
> > > > > > > >> which can lead to even worse problem in the future, where
> many
> > > > > > different
> > > > > > > >> sources would have each a completely custom
> > > > > > > >> solution to solve the same/similar problems, complicating
> the
> > > > system
> > > > > > and
> > > > > > > >> confusing the users even more.
> > > > > > > >>
> > > > > > > >> @Dong,
> > > > > > > >>
> > > > > > > >> > For now I am not able to come up with a good way to
> support
> > > > this.
> > > > > I
> > > > > > am
> > > > > > > >> happy to discuss the
> > > > > > > >> > pros/cons if you can provide more detail (e.g. API design)
> > > > > regarding
> > > > > > > how
> > > > > > > >> to support this approach
> > > > > > > >>
> > > > > > > >> I have already described such proposal:
> > > > > > > >>
> > > > > > > >> > Piotr:
> > > > > > > >> > I don't know, maybe instead of adding this logic to
> operator
> > > > > > > >> coordinators, `CheckpointCoordinator` should have a
> pluggable
> > > > > > > >> `CheckpointTrigger`,
> > > > > > > >> > that the user could configure like a `MetricReporter`. The
> > > > default
> > > > > > one
> > > > > > > >> would be just periodically triggering checkpoints. Maybe
> > > > > > > >> > `BacklogDynamicCheckpointTrigger` could look at
> metrics[1],
> > > > check
> > > > > if
> > > > > > > >> `pendingRecords` for some source has exceeded the configured
> > > > > > > >> > threshold and based on that adjust the checkpointing
> > interval
> > > > > > > >> accordingly? This would at least address some of my
> concerns.
> > > > > > > >>
> > > > > > > >> plus
> > > > > > > >>
> > > > > > > >> > Piotr:
> > > > > > > >> >  Either way, I would like to refine my earlier idea, and
> > > instead
> > > > > of
> > > > > > > >> using
> > > > > > > >> metrics like `pendingRecords`, I think we could switch
> between
> > > > fast
> > > > > > and
> > > > > > > >> > slow checkpointing intervals based on the information if
> the
> > > job
> > > > > is
> > > > > > > >> backpressured or not. My thinking is as follows:
> > > > > > > >> >
> > > > > > > >> > As a user, I would like to have my regular fast
> > checkpointing
> > > > > > interval
> > > > > > > >> for low latency, but the moment my system is not keeping up,
> > if
> > > > the
> > > > > > > >> backpressure
> > > > > > > >> > builds up, or simply we have a huge backlog to reprocess,
> > > > latency
> > > > > > > >> doesn't
> > > > > > > >> matter anymore. Only throughput matters. So I would like the
> > > > > > > checkpointing
> > > > > > > >> to slow down.
> > > > > > > >> >
> > > > > > > >> > I think this should cover pretty well most of the cases,
> > what
> > > do
> > > > > you
> > > > > > > >> think? If this backpressured based behaviour is still not
> > > enough,
> > > > I
> > > > > > > would
> > > > > > > >> still say
> > > > > > > >> > that we should provide plugable checkpoint triggering
> > > > controllers
> > > > > > that
> > > > > > > >> would work based on metrics.
> > > > > > > >>
> > > > > > > >> > change the checkpointing interval based on the "backlog
> > > signal",
> > > > > > > >>
> > > > > > > >> What's wrong with the job being backpressured? If job is
> > > > > > backpressured,
> > > > > > > we
> > > > > > > >> don't care about individual records latency, only about
> > > increasing
> > > > > > > >> the throughput to get out of the backpressure situation
> ASAP.
> > > > > > > >>
> > > > > > > >> > In the mentioned use-case, users want to have two
> different
> > > > > > > >> checkpointing
> > > > > > > >> > intervals at different phases of the HybridSource. We
> should
> > > > > provide
> > > > > > > an
> > > > > > > >> API
> > > > > > > >> > for users to express the extra checkpointing interval in
> > > > addition
> > > > > to
> > > > > > > the
> > > > > > > >> > existing execution.checkpointing.interval. What would be
> the
> > > > > > > definition
> > > > > > > >> of
> > > > > > > >> > that API with this alternative approach?
> > > > > > > >>
> > > > > > > >> I think my proposal with `BacklogDynamicCheckpointTrigger`
> or
> > > > > > > >> `BackpressureDetectingCheckpointTrigger` would solve your
> > > > motivating
> > > > > > use
> > > > > > > >> case
> > > > > > > >> just as well.
> > > > > > > >>
> > > > > > > >> 1. In the catch up phase (reading the bounded source):
> > > > > > > >>   a) if we are under backpressure (common case), system
> would
> > > > > fallback
> > > > > > > to
> > > > > > > >> the less frequent checkpointing interval
> > > > > > > >>   b) if there is no backpressure (I hope a rare case, there
> > is a
> > > > > > > backlog,
> > > > > > > >> but the source is too slow), Flink cluster has spare
> resources
> > > to
> > > > > > > actually
> > > > > > > >> run more
> > > > > > > >>       frequent checkpointing interval. No harm should be
> done.
> > > But
> > > > > > > >> arguably
> > > > > > > >> using a less frequent checkpointing interval here should be
> > more
> > > > > > > >> desirable.
> > > > > > > >>
> > > > > > > >> 2. In the continuous processing phase (unbounded source)
> > > > > > > >>   a) if we are under backpressure, as I mentioned above, no
> > one
> > > > > cares
> > > > > > > >> about
> > > > > > > >> checkpointing interval and the frequency of committing
> records
> > > to
> > > > > the
> > > > > > > >>       output, as e2e latency is already high due to the
> > backlog
> > > in
> > > > > the
> > > > > > > >> sources
> > > > > > > >>   b) if there is no backpressure, that's the only case where
> > the
> > > > > user
> > > > > > > >> actually cares about the frequency of committing records to
> > the
> > > > > > output,
> > > > > > > we
> > > > > > > >> are
> > > > > > > >>       using the more frequent checkpointing interval.
> > > > > > > >>
> > > > > > > >> 1b) I think is mostly harmless, and I think could be solved
> > with
> > > > > some
> > > > > > > >> extra
> > > > > > > >> effort
> > > > > > > >> 2a) and 2b) are not solved by your proposal
> > > > > > > >> 2a) and 2b) are applicable to any source, not just
> > HybridSource,
> > > > > which
> > > > > > > is
> > > > > > > >> also not covered by your proposal.
> > > > > > > >>
> > > > > > > >> Best,
> > > > > > > >> Piotrek
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> czw., 25 maj 2023 o 17:29 Jing Ge
> <j...@ververica.com.invalid
> > >
> > > > > > > >> napisał(a):
> > > > > > > >>
> > > > > > > >> > Hi Dong, Hi Piotr,
> > > > > > > >> >
> > > > > > > >> > Thanks for the clarification.
> > > > > > > >> >
> > > > > > > >> > @Dong
> > > > > > > >> >
> > > > > > > >> > According to the code examples in the FLIP, I thought we
> are
> > > > > > focusing
> > > > > > > on
> > > > > > > >> > the HybridSource scenario. With the current HybridSource
> > > > > > > >> implementation, we
> > > > > > > >> > don't even need to know the boundedness of sources in the
> > > > > > > HybridSource,
> > > > > > > >> > since all sources except the last one must be bounded[1],
> > i.e.
> > > > > only
> > > > > > > the
> > > > > > > >> > last source is unbounded. This makes it much easier to set
> > > > > different
> > > > > > > >> > intervals to sources with different boundedness.
> > > > > > > >> >
> > > > > > > >> > Boundedness in Flink is a top level concept. I think it
> > should
> > > > be
> > > > > ok
> > > > > > > to
> > > > > > > >> > introduce a top level config for the top level concept. I
> am
> > > not
> > > > > > > >> familiar
> > > > > > > >> > with MySQL CDC. For those specific cases, you are right,
> > your
> > > > > > proposal
> > > > > > > >> can
> > > > > > > >> > provide the feature with minimal changes, like I mentioned
> > > > > > previously,
> > > > > > > >> it
> > > > > > > >> > is a thoughtful design.  +1
> > > > > > > >> >
> > > > > > > >> > @Piotr
> > > > > > > >> >
> > > > > > > >> > > For example join (windowed/temporal) of two tables
> backed
> > > by a
> > > > > > > hybrid
> > > > > > > >> > > source? I could easily see a scenario where one table
> with
> > > > > little
> > > > > > > data
> > > > > > > >> > > catches up much more quickly.
> > > > > > > >> >
> > > > > > > >> > I am confused. I thought we were talking about
> HybridSource
> > > > which
> > > > > > > >> "solves
> > > > > > > >> > the problem of sequentially reading input from
> heterogeneous
> > > > > sources
> > > > > > > to
> > > > > > > >> > produce a single input stream."[2]
> > > > > > > >> > I could not find any join within a HybridSource. So, your
> > > might
> > > > > mean
> > > > > > > >> > something else the join example and it should be out of
> the
> > > > scope,
> > > > > > if
> > > > > > > I
> > > > > > > >> am
> > > > > > > >> > not mistaken.
> > > > > > > >> >
> > > > > > > >> > > About the (un)boundness of the input stream. I'm not
> sure
> > if
> > > > > that
> > > > > > > >> should
> > > > > > > >> > > actually matter. Actually the same issue, with two
> > frequent
> > > > > > > >> checkpointing
> > > > > > > >> > > during a catch up period or when Flink is overloaded,
> > could
> > > > > affect
> > > > > > > >> jobs
> > > > > > > >> > > that are purely unbounded, like continuously reading
> from
> > > > Kafka.
> > > > > > > Even
> > > > > > > >> > more,
> > > > > > > >> > > nothing prevents users from actually storing bounded
> data
> > > in a
> > > > > > Kafka
> > > > > > > >> > topic.
> > > > > > > >> > > Either way, I would like to refine my earlier idea, and
> > > > instead
> > > > > of
> > > > > > > >> using
> > > > > > > >> > > metrics like `pendingRecords`, I think we could switch
> > > between
> > > > > > fast
> > > > > > > >> and
> > > > > > > >> > > slow checkpointing intervals based on the information if
> > the
> > > > job
> > > > > > is
> > > > > > > >> > > backpressured or not. My thinking is as follows:
> > > > > > > >> >
> > > > > > > >> > This is again a very different use case as HybridSource.
> > Users
> > > > do
> > > > > > > allow
> > > > > > > >> > storing bounded data in a Kafka and if it is not used as
> the
> > > > last
> > > > > > > >> source in
> > > > > > > >> > a HybridSource, it is a bounded source and can still
> benefit
> > > > from
> > > > > > > larger
> > > > > > > >> > checkpoint interval wrt the high throughput (Kafka or any
> > > other
> > > > > > > storage
> > > > > > > >> > does not matter). BTW, the larger checkpoint interval for
> > > > bounded
> > > > > > > >> source is
> > > > > > > >> > optional, users can use it but must not use it, if they
> > don't
> > > > care
> > > > > > > about
> > > > > > > >> > the throughput with bounded data.
> > > > > > > >> >
> > > > > > > >> > Your proposal to dynamically adjust the checkpoint
> intervals
> > > is
> > > > > > > >> elegant! It
> > > > > > > >> > makes sense to build it as a generic feature in Flink.
> > Looking
> > > > > > forward
> > > > > > > >> to
> > > > > > > >> > it. However, for some user cases, e.g. when users were
> aware
> > > of
> > > > > the
> > > > > > > >> bounded
> > > > > > > >> > sources (in the HybridSource) and care more about the
> > > > throughput,
> > > > > > the
> > > > > > > >> > dynamic adjustment might not be required. Just let those
> > > bounded
> > > > > > > sources
> > > > > > > >> > always have larger checkpoint intervals even when there is
> > no
> > > > back
> > > > > > > >> > pressure. Because no one cares about latency in this case,
> > > let's
> > > > > > turn
> > > > > > > >> off
> > > > > > > >> > the dynamic adjustment, reduce the checkpoint frequency,
> > have
> > > > > better
> > > > > > > >> > throughput, and save unnecessary source consumption. Did I
> > > miss
> > > > > > > anything
> > > > > > > >> > here?
> > > > > > > >> >
> > > > > > > >> > Best regards,
> > > > > > > >> > Jing
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> > [1]
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/6b6df3db466d6a030d5a38ec786ac3297cb41c38/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java#L244
> > > > > > > >> > [2]
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/#hybrid-source
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> > On Thu, May 25, 2023 at 3:03 PM Dong Lin <
> > lindon...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > > >> >
> > > > > > > >> > > Hi Piotr,
> > > > > > > >> > >
> > > > > > > >> > > Thanks for the discussion. Please see my comments
> inline.
> > > > > > > >> > >
> > > > > > > >> > > On Thu, May 25, 2023 at 6:34 PM Piotr Nowojski <
> > > > > > > pnowoj...@apache.org>
> > > > > > > >> > > wrote:
> > > > > > > >> > >
> > > > > > > >> > > > Hi all,
> > > > > > > >> > > >
> > > > > > > >> > > > Thanks for the discussion.
> > > > > > > >> > > >
> > > > > > > >> > > > @Dong
> > > > > > > >> > > >
> > > > > > > >> > > > > In the target use-case, we would like to
> HybridSource
> > to
> > > > > > > trigger>
> > > > > > > >> > > > checkpoint more frequently when it is read the Kafka
> > > Source
> > > > > > (than
> > > > > > > >> when
> > > > > > > >> > it
> > > > > > > >> > > > > is reading the HDFS source). We would need to set a
> > flag
> > > > for
> > > > > > the
> > > > > > > >> > > > checkpoint
> > > > > > > >> > > > > trigger to know which source the HybridSource is
> > reading
> > > > > from.
> > > > > > > >> > > >
> > > > > > > >> > > > Is this really your actual goal? Should users care if
> > some
> > > > > table
> > > > > > > >> > defined
> > > > > > > >> > > in
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> > > My actual goal is to address the use-case described in
> the
> > > > > > > motivation
> > > > > > > >> > > section. More specifically,
> > > > > > > >> > > my goal is to provide API that uses can use to express
> > their
> > > > > > needed
> > > > > > > >> > > checkpointing interval
> > > > > > > >> > > at different phases of the job. So that Flink can
> achieve
> > > the
> > > > > > > maximum
> > > > > > > >> > > throughput while also meeting
> > > > > > > >> > > users' need for data freshness and failover time.
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >> > > > a meta store is backed by HybridSource or not? I think
> > the
> > > > > > actual
> > > > > > > >> goal
> > > > > > > >> > is
> > > > > > > >> > > > this:
> > > > > > > >> > >
> > > > > > > >> > > As a user I would like to have a self adjusting
> mechanism
> > > for
> > > > > > > >> > checkpointing
> > > > > > > >> > > > intervals, so that during the catch up phase my job
> > > focuses
> > > > on
> > > > > > > >> > throughput
> > > > > > > >> > > > to catch up ASAP, while during normal processing
> > (without
> > > a
> > > > > > large
> > > > > > > >> > > backlog)
> > > > > > > >> > > > Flink is trying to minimize e2e latency.
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> > > Sure. It will be great to have a way to support this
> > > > > > self-adjusting
> > > > > > > >> > > mechanism. For now I am not able
> > > > > > > >> > > to come up with a good way to support this. I am happy
> to
> > > > > discuss
> > > > > > > the
> > > > > > > >> > > pros/cons if you can provide
> > > > > > > >> > > more detail (e.g. API design) regarding how to support
> > this
> > > > > > > approach.
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >> > > >
> > > > > > > >> > > > Am I right here?
> > > > > > > >> > > >
> > > > > > > >> > > > > there won't exist any "*conflicting* desired
> > checkpoint
> > > > > > trigger"
> > > > > > > >> by
> > > > > > > >> > > > definition
> > > > > > > >> > > >
> > > > > > > >> > > > Ok, arguably there won't be a conflict, but the
> decision
> > > to
> > > > > pick
> > > > > > > >> > minimum
> > > > > > > >> > > > out of the upper bounds might be sub-optimal.
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> > > As of today, users need checkpoint in order to address
> two
> > > > > goals.
> > > > > > > One
> > > > > > > >> > goal
> > > > > > > >> > > is to upper-bound
> > > > > > > >> > > data staleness when there is sink with exactly-once
> > > semantics
> > > > > > (e.g.
> > > > > > > >> > > Paimon), since those sinks
> > > > > > > >> > > can only output data when checkpoint is triggered. The
> > other
> > > > > goal
> > > > > > is
> > > > > > > >> to
> > > > > > > >> > > upper-bound the amount of
> > > > > > > >> > > duplicate work needed after failover.
> > > > > > > >> > >
> > > > > > > >> > > In both cases, users need to upper-bound the
> checkpointing
> > > > > > interval.
> > > > > > > >> This
> > > > > > > >> > > makes it more intuitive
> > > > > > > >> > > for the config to only express the checkpointing
> interval
> > > > > > > upper-bound.
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >> > > >
> > > > > > > >> > > > > Overall, I am not sure we always want to have a
> longer
> > > > > > > >> checkpointing
> > > > > > > >> > > > > interval. That really depends on the specific
> use-case
> > > and
> > > > > the
> > > > > > > job
> > > > > > > >> > > graph.
> > > > > > > >> > > >
> > > > > > > >> > > > Yes, that's why I proposed something a little bit more
> > > > > generic.
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> > > I am not sure I fully understand the alternative
> proposal
> > > that
> > > > > is
> > > > > > > >> meant
> > > > > > > >> > to
> > > > > > > >> > > be more generic. So it is hard for me to evaluate the
> > > > pros/cons.
> > > > > > > >> > >
> > > > > > > >> > > I understand that you preferred for the source operator
> to
> > > use
> > > > > the
> > > > > > > >> REST
> > > > > > > >> > API
> > > > > > > >> > > to trigger checkpoints. This sounds
> > > > > > > >> > > like a downside since using REST API is not as easy as
> > using
> > > > the
> > > > > > > >> > > programming API proposed in the FLIP.
> > > > > > > >> > >
> > > > > > > >> > > Can you help explain the generic approach more
> concretely,
> > > > such
> > > > > as
> > > > > > > the
> > > > > > > >> > APIs
> > > > > > > >> > > you would suggest introducing? That would
> > > > > > > >> > > allow me to evaluate the pros/cons and hopefully pick
> the
> > > best
> > > > > > > option.
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >> > > >
> > > > > > > >> > > > > I believe there can be use-case where
> > > > > > > >> > > > > the proposed API is not useful, in which case users
> > can
> > > > > choose
> > > > > > > >> not to
> > > > > > > >> > > use
> > > > > > > >> > > > > the API without incurring any performance
> regression.
> > > > > > > >> > > >
> > > > > > > >> > > > I'm not saying that this proposal is not useful. Just
> > that
> > > > we
> > > > > > > might
> > > > > > > >> be
> > > > > > > >> > > able
> > > > > > > >> > > > to solve this problem in a more flexible manner. If we
> > > > > > introduce a
> > > > > > > >> > > > partially working solution now at the source level,
> and
> > > > later
> > > > > we
> > > > > > > >> will
> > > > > > > >> > > still
> > > > > > > >> > > > need a different solution on another level to cover
> > other
> > > > use
> > > > > > > cases,
> > > > > > > >> > that
> > > > > > > >> > > > would clog the API and confuse users.
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> > > Can you explain why this is "partially working"? Is it
> > > because
> > > > > > there
> > > > > > > >> are
> > > > > > > >> > > use-cases that should
> > > > > > > >> > > be addressed but not already covered by the proposed
> > > approach?
> > > > > > > >> > >
> > > > > > > >> > > If so, can you help explain the use-case that would be
> > > useful
> > > > to
> > > > > > > >> address?
> > > > > > > >> > > With concrete
> > > > > > > >> > > use-cases in mind, we can pick the API with minimal
> change
> > > to
> > > > > > > address
> > > > > > > >> > these
> > > > > > > >> > > use-cases.
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >> > > >
> > > > > > > >> > > > @Jing
> > > > > > > >> > > >
> > > > > > > >> > > > > @Piotr
> > > > > > > >> > > > > Just out of curiosity, do you know any real use
> cases
> > > > where
> > > > > > > >> real-time
> > > > > > > >> > > > data is processed before the backlog?
> > > > > > > >> > > >
> > > > > > > >> > > > For example join (windowed/temporal) of two tables
> > backed
> > > > by a
> > > > > > > >> hybrid
> > > > > > > >> > > > source? I could easily see a scenario where one table
> > with
> > > > > > little
> > > > > > > >> data
> > > > > > > >> > > > catches up much more quickly.
> > > > > > > >> > > >
> > > > > > > >> > > > @Jing and @Dong
> > > > > > > >> > > >
> > > > > > > >> > > > About the (un)boundness of the input stream. I'm not
> > sure
> > > if
> > > > > > that
> > > > > > > >> > should
> > > > > > > >> > > > actually matter. Actually the same issue, with two
> > > frequent
> > > > > > > >> > checkpointing
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> > > Indeed, I agree with you on this point and prefer not to
> > > have
> > > > > this
> > > > > > > >> > proposal
> > > > > > > >> > > depend on the (un)boundness.
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >> > > > during a catch up period or when Flink is overloaded,
> > > could
> > > > > > affect
> > > > > > > >> jobs
> > > > > > > >> > > > that are purely unbounded, like continuously reading
> > from
> > > > > Kafka.
> > > > > > > >> Even
> > > > > > > >> > > more,
> > > > > > > >> > > > nothing prevents users from actually storing bounded
> > data
> > > > in a
> > > > > > > Kafka
> > > > > > > >> > > topic.
> > > > > > > >> > > > Either way, I would like to refine my earlier idea,
> and
> > > > > instead
> > > > > > of
> > > > > > > >> > using
> > > > > > > >> > > > metrics like `pendingRecords`, I think we could switch
> > > > between
> > > > > > > fast
> > > > > > > >> and
> > > > > > > >> > > > slow checkpointing intervals based on the information
> if
> > > the
> > > > > job
> > > > > > > is
> > > > > > > >> > > > backpressured or not. My thinking is as follows:
> > > > > > > >> > > >
> > > > > > > >> > > > As a user, I would like to have my regular fast
> > > > checkpointing
> > > > > > > >> interval
> > > > > > > >> > > for
> > > > > > > >> > > > low latency, but the moment my system is not keeping
> up,
> > > if
> > > > > the
> > > > > > > >> > > > backpressure builds up, or simply we have a huge
> backlog
> > > to
> > > > > > > >> reprocess,
> > > > > > > >> > > > latency doesn't matter anymore. Only throughput
> matters.
> > > So
> > > > I
> > > > > > > would
> > > > > > > >> > like
> > > > > > > >> > > > the checkpointing to slow down.
> > > > > > > >> > > >
> > > > > > > >> > > > I think this should cover pretty well most of the
> cases,
> > > > what
> > > > > do
> > > > > > > you
> > > > > > > >> > > think?
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> > > Thank you for all the comments and this idea. I like
> this
> > > > idea.
> > > > > We
> > > > > > > >> > actually
> > > > > > > >> > > thought about this idea before proposing this FLIP.
> > > > > > > >> > >
> > > > > > > >> > > In order to make this idea work, we need to come-up
> with a
> > > > good
> > > > > > > >> algorithm
> > > > > > > >> > > that can dynamically change the checkpointing interval
> > based
> > > > on
> > > > > > the
> > > > > > > >> > > "backlog signal", without causing regression w.r.t.
> > failover
> > > > > time
> > > > > > > and
> > > > > > > >> > data
> > > > > > > >> > > freshness. I find it hard to come up with this algorithm
> > due
> > > > to
> > > > > > > >> > > insufficient "backlog signal".
> > > > > > > >> > >
> > > > > > > >> > > For the use-case mentioned in the motivation section,
> the
> > > data
> > > > > in
> > > > > > > the
> > > > > > > >> > > source does not have event timestamps to help determine
> > the
> > > > > amount
> > > > > > > of
> > > > > > > >> > > backlog. So the only source-of-truth for determining
> > backlog
> > > > is
> > > > > > the
> > > > > > > >> > amount
> > > > > > > >> > > of data buffered in operators. But the buffer size is
> > > > typically
> > > > > > > >> chosen to
> > > > > > > >> > > be proportional to round-trip-time and throughput.
> Having
> > a
> > > > full
> > > > > > > >> buffer
> > > > > > > >> > > does not necessarily mean that the data is lagging
> behind.
> > > And
> > > > > > > >> increasing
> > > > > > > >> > > the checkpointing interval with insufficient "backlog
> > > signal"
> > > > > can
> > > > > > > >> have a
> > > > > > > >> > > negative impact on data freshness and failover time.
> > > > > > > >> > >
> > > > > > > >> > > In order to make this idea work, we would need to
> > *provide*
> > > > that
> > > > > > the
> > > > > > > >> > > algorithm would not negatively hurt data freshness and
> > > > failover
> > > > > > time
> > > > > > > >> when
> > > > > > > >> > > it decides to increase checkpointing intervals. For now
> I
> > > cold
> > > > > not
> > > > > > > >> come
> > > > > > > >> > up
> > > > > > > >> > > with such an algorithm.
> > > > > > > >> > >
> > > > > > > >> > > If this backpressured based behaviour is still not
> > enough, I
> > > > > would
> > > > > > > >> still
> > > > > > > >> > > > say that we should provide plugable checkpoint
> > triggering
> > > > > > > >> controllers
> > > > > > > >> > > that
> > > > > > > >> > > > would work based on metrics.
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >> > > I am not sure how to address the use-case mentioned in
> the
> > > > > > > motivation
> > > > > > > >> > > section, with the pluggable checkpoint trigger +
> metrics.
> > > Can
> > > > > you
> > > > > > > help
> > > > > > > >> > > provide the definition of these APIs and kindly explain
> > how
> > > > that
> > > > > > > >> works to
> > > > > > > >> > > address the mentioned use-case.
> > > > > > > >> > >
> > > > > > > >> > > In the mentioned use-case, users want to have two
> > different
> > > > > > > >> checkpointing
> > > > > > > >> > > intervals at different phases of the HybridSource. We
> > should
> > > > > > provide
> > > > > > > >> an
> > > > > > > >> > API
> > > > > > > >> > > for users to express the extra checkpointing interval in
> > > > > addition
> > > > > > to
> > > > > > > >> the
> > > > > > > >> > > existing execution.checkpointing.interval. What would be
> > the
> > > > > > > >> definition
> > > > > > > >> > of
> > > > > > > >> > > that API with this alternative approach?
> > > > > > > >> > >
> > > > > > > >> > > Best,
> > > > > > > >> > > Dong
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >> > > >
> > > > > > > >> > > > Best,
> > > > > > > >> > > > Piotrek
> > > > > > > >> > > >
> > > > > > > >> > > > czw., 25 maj 2023 o 07:47 Dong Lin <
> lindon...@gmail.com
> > >
> > > > > > > >> napisał(a):
> > > > > > > >> > > >
> > > > > > > >> > > > > Hi Jing,
> > > > > > > >> > > > >
> > > > > > > >> > > > > Thanks for your comments!
> > > > > > > >> > > > >
> > > > > > > >> > > > > Regarding the idea of using the existing
> "boundedness"
> > > > > > attribute
> > > > > > > >> of
> > > > > > > >> > > > > sources, that is indeed something that we might find
> > > > > intuitive
> > > > > > > >> > > > initially. I
> > > > > > > >> > > > > have thought about this idea, but could not find a
> > good
> > > > way
> > > > > to
> > > > > > > >> make
> > > > > > > >> > it
> > > > > > > >> > > > > work. I will try to explain my thoughts and see if
> we
> > > can
> > > > > > find a
> > > > > > > >> > better
> > > > > > > >> > > > > solution.
> > > > > > > >> > > > >
> > > > > > > >> > > > > Here is my understanding of the idea mentioned
> above:
> > > > > provide
> > > > > > a
> > > > > > > >> job
> > > > > > > >> > > level
> > > > > > > >> > > > > config execution.checkpoint.interval.bounded. Flink
> > will
> > > > use
> > > > > > > this
> > > > > > > >> as
> > > > > > > >> > > the
> > > > > > > >> > > > > checkpointing interval whenever there exists at
> least
> > > one
> > > > > > > running
> > > > > > > >> > > source
> > > > > > > >> > > > > which claims it is under the "bounded" stage.
> > > > > > > >> > > > >
> > > > > > > >> > > > > Note that we can not simply re-use the existing
> > > > > "boundedness"
> > > > > > > >> > attribute
> > > > > > > >> > > > of
> > > > > > > >> > > > > source operators. The reason is that for sources
> such
> > as
> > > > > MySQL
> > > > > > > >> CDC,
> > > > > > > >> > its
> > > > > > > >> > > > > boundedness can be "continuous_unbounded" because it
> > can
> > > > run
> > > > > > > >> > > > continuously.
> > > > > > > >> > > > > But MySQL CDC has two phases internally, where the
> > > source
> > > > > > needs
> > > > > > > to
> > > > > > > >> > > first
> > > > > > > >> > > > > read a snapshot (with bounded amount of data) and
> then
> > > > read
> > > > > a
> > > > > > > >> binlog
> > > > > > > >> > > > (with
> > > > > > > >> > > > > unbounded amount of data).
> > > > > > > >> > > > >
> > > > > > > >> > > > > As a result, in order to support optimization for
> > souces
> > > > > like
> > > > > > > >> MySQL
> > > > > > > >> > > CDC,
> > > > > > > >> > > > we
> > > > > > > >> > > > > need to expose an API for the source operator to
> > declare
> > > > > > whether
> > > > > > > >> it
> > > > > > > >> > is
> > > > > > > >> > > > > running at a bounded or continuous_unbounded stage.
> > > *This
> > > > > > > >> introduces
> > > > > > > >> > > the
> > > > > > > >> > > > > need to define a new concept named "bounded stage".*
> > > > > > > >> > > > >
> > > > > > > >> > > > > Then, we will need to *introduce a new contract
> > between
> > > > > source
> > > > > > > >> > > operators
> > > > > > > >> > > > > and the Flink runtime*, saying that if there is a
> > source
> > > > > that
> > > > > > > >> claims
> > > > > > > >> > it
> > > > > > > >> > > > is
> > > > > > > >> > > > > running at the bounded stage, then Flink will use
> the
> > "
> > > > > > > >> > > > > execution.checkpoint.interval.bounded" as the
> > > > checkpointing
> > > > > > > >> interval.
> > > > > > > >> > > > >
> > > > > > > >> > > > > Here are the the concerns I have with this approach:
> > > > > > > >> > > > >
> > > > > > > >> > > > > - The execution.checkpoint.interval.bounded is a
> > > top-level
> > > > > > > config,
> > > > > > > >> > > > meaning
> > > > > > > >> > > > > that every Flink user needs to read about its
> > semantics.
> > > > In
> > > > > > > >> > comparison,
> > > > > > > >> > > > the
> > > > > > > >> > > > > proposed approach only requires users of specific
> > > sources
> > > > > > (e.g.
> > > > > > > >> > > > > HybridSource, MySQL CDC) to know the new
> > source-specific
> > > > > > config.
> > > > > > > >> > > > >
> > > > > > > >> > > > > - It introduces a new top-level concept in Flink to
> > > > describe
> > > > > > the
> > > > > > > >> > > internal
> > > > > > > >> > > > > stages of specific sources (e.g. MySQL CDC). In
> > > > comparison,
> > > > > > the
> > > > > > > >> > > proposed
> > > > > > > >> > > > > approach only requires users of specific sources
> (e.g.
> > > > > > > >> HybridSource,
> > > > > > > >> > > > MySQL
> > > > > > > >> > > > > CDC) to know this concept, which not only makes the
> > > > > > explanation
> > > > > > > >> much
> > > > > > > >> > > > > simpler (since they are already using the specific
> > > > sources),
> > > > > > but
> > > > > > > >> also
> > > > > > > >> > > > > limits the scope of this new concept (only these
> users
> > > > need
> > > > > to
> > > > > > > >> know
> > > > > > > >> > > this
> > > > > > > >> > > > > concept).
> > > > > > > >> > > > >
> > > > > > > >> > > > > - It is harder to understand the existing config
> > > > > > > >> > > > > execution.checkpoint.interval.
> > > > > > > >> > > > > Because we need to explain that it is only used when
> > > there
> > > > > is
> > > > > > no
> > > > > > > >> > source
> > > > > > > >> > > > > with "bounded stage", introducing more if-else for
> > this
> > > > > > config.
> > > > > > > In
> > > > > > > >> > > > > comparison, with the proposed approach, the
> semantics
> > of
> > > > > > > >> > > > > execution.checkpoint.interval is simpler without
> > > if/else,
> > > > as
> > > > > > it
> > > > > > > >> will
> > > > > > > >> > > > always
> > > > > > > >> > > > > be applied regardless which sources users are using.
> > > > > > > >> > > > >
> > > > > > > >> > > > > I am happy to discuss if there are better
> approaches.
> > > > > > > >> > > > >
> > > > > > > >> > > > > Thanks,
> > > > > > > >> > > > > Dong
> > > > > > > >> > > > >
> > > > > > > >> > > > >
> > > > > > > >> > > > > On Wed, May 24, 2023 at 8:23 AM Jing Ge
> > > > > > > >> <j...@ververica.com.invalid>
> > > > > > > >> > > > > wrote:
> > > > > > > >> > > > >
> > > > > > > >> > > > > > Hi Yunfeng, Hi Dong
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > Thanks for the informative discussion! It is a
> > > rational
> > > > > > > >> requirement
> > > > > > > >> > > to
> > > > > > > >> > > > > set
> > > > > > > >> > > > > > different checkpoint intervals for different
> sources
> > > in
> > > > a
> > > > > > > >> > > hybridSource.
> > > > > > > >> > > > > The
> > > > > > > >> > > > > > tiny downside of this proposal, at least for me,
> is
> > > > that I
> > > > > > > have
> > > > > > > >> to
> > > > > > > >> > > > > > understand the upper-bound definition of the
> > interval
> > > > and
> > > > > > the
> > > > > > > >> > > built-in
> > > > > > > >> > > > > rule
> > > > > > > >> > > > > > for Flink to choose the minimum value between it
> and
> > > the
> > > > > > > default
> > > > > > > >> > > > interval
> > > > > > > >> > > > > > setting. However, afaiac, the intention of this
> > > built-in
> > > > > > rule
> > > > > > > >> is to
> > > > > > > >> > > > > > minimize changes in Flink to support the request
> > > feature
> > > > > > which
> > > > > > > >> is a
> > > > > > > >> > > > very
> > > > > > > >> > > > > > thoughtful move. Thanks for taking care of it. +1
> > for
> > > > the
> > > > > > > >> Proposal.
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > Another very rough idea was rising in my mind
> while
> > I
> > > > was
> > > > > > > >> reading
> > > > > > > >> > the
> > > > > > > >> > > > > FLIP.
> > > > > > > >> > > > > > I didn't do a deep dive with related source code
> > yet,
> > > so
> > > > > > > please
> > > > > > > >> > > correct
> > > > > > > >> > > > > me
> > > > > > > >> > > > > > if I am wrong. The use case shows that two
> different
> > > > > > > checkpoint
> > > > > > > >> > > > intervals
> > > > > > > >> > > > > > should be set for bounded(historical) stream and
> > > > > > > unbounded(fresh
> > > > > > > >> > > > > real-time)
> > > > > > > >> > > > > > stream sources. It is a trade-off between
> throughput
> > > and
> > > > > > > >> latency,
> > > > > > > >> > > i.e.
> > > > > > > >> > > > > > bounded stream with large checkpoint interval for
> > > better
> > > > > > > >> throughput
> > > > > > > >> > > and
> > > > > > > >> > > > > > unbounded stream with small checkpoint interval
> for
> > > > lower
> > > > > > > >> latency
> > > > > > > >> > (in
> > > > > > > >> > > > > case
> > > > > > > >> > > > > > of failover). As we could see that the different
> > > > interval
> > > > > > > >> setting
> > > > > > > >> > > > depends
> > > > > > > >> > > > > > on the boundedness of streams. Since the Source
> API
> > > > > already
> > > > > > > has
> > > > > > > >> its
> > > > > > > >> > > own
> > > > > > > >> > > > > > boundedness flag[1], is it possible to define two
> > > > interval
> > > > > > > >> > > > configurations
> > > > > > > >> > > > > > and let Flink automatically set the related one to
> > the
> > > > > > source
> > > > > > > >> based
> > > > > > > >> > > on
> > > > > > > >> > > > > the
> > > > > > > >> > > > > > known boundedness? The interval for bounded stream
> > > could
> > > > > be
> > > > > > > like
> > > > > > > >> > > > > > execution.checkpoint.interval.bounded(naming could
> > be
> > > > > > > >> > reconsidered),
> > > > > > > >> > > > and
> > > > > > > >> > > > > > the other one for unbounded stream, we could use
> the
> > > > > > existing
> > > > > > > >> one
> > > > > > > >> > > > > > execution.checkpoint.interval by default, or
> > > introduce a
> > > > > new
> > > > > > > one
> > > > > > > >> > like
> > > > > > > >> > > > > > execution.checkpoint.interval.unbounded. In this
> > way,
> > > no
> > > > > API
> > > > > > > >> change
> > > > > > > >> > > is
> > > > > > > >> > > > > > required.
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > @Piotr
> > > > > > > >> > > > > > Just out of curiosity, do you know any real use
> > cases
> > > > > where
> > > > > > > >> > real-time
> > > > > > > >> > > > > data
> > > > > > > >> > > > > > is processed before the backlog? Semantically, the
> > > > backlog
> > > > > > > >> contains
> > > > > > > >> > > > > > historical data that has to be processed before
> the
> > > > > > real-time
> > > > > > > >> data
> > > > > > > >> > is
> > > > > > > >> > > > > > allowed to be processed. Otherwise, up-to-date
> data
> > > will
> > > > > be
> > > > > > > >> > > overwritten
> > > > > > > >> > > > > by
> > > > > > > >> > > > > > out-of-date data which turns out to be unexpected
> > > > results
> > > > > in
> > > > > > > >> real
> > > > > > > >> > > > > business
> > > > > > > >> > > > > > scenarios.
> > > > > > > >> > > > > >
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > Best regards,
> > > > > > > >> > > > > > Jing
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > [1]
> > > > > > > >> > > > > >
> > > > > > > >> > > > > >
> > > > > > > >> > > > >
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/fadde2a378aac4293676944dd513291919a481e3/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java#L41
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > On Tue, May 23, 2023 at 5:53 PM Dong Lin <
> > > > > > lindon...@gmail.com
> > > > > > > >
> > > > > > > >> > > wrote:
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > > Hi Piotr,
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > Thanks for the comments. Let me try to
> understand
> > > your
> > > > > > > >> concerns
> > > > > > > >> > and
> > > > > > > >> > > > > > > hopefully address the concerns.
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > >> What would happen if there are two (or more)
> > > > operator
> > > > > > > >> > > coordinators
> > > > > > > >> > > > > > with
> > > > > > > >> > > > > > > conflicting desired checkpoint trigger behaviour
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > With the proposed change, there won't exist any
> > > > > > > "*conflicting*
> > > > > > > >> > > > desired
> > > > > > > >> > > > > > > checkpoint trigger" by definition. Both
> job-level
> > > > config
> > > > > > and
> > > > > > > >> the
> > > > > > > >> > > > > proposed
> > > > > > > >> > > > > > > API upperBoundCheckpointingInterval() means the
> > > > > > upper-bound
> > > > > > > of
> > > > > > > >> > the
> > > > > > > >> > > > > > > checkpointing interval. If there are different
> > > > > > upper-bounds
> > > > > > > >> > > proposed
> > > > > > > >> > > > by
> > > > > > > >> > > > > > > different source operators and the job-level
> > config,
> > > > > Flink
> > > > > > > >> will
> > > > > > > >> > try
> > > > > > > >> > > > to
> > > > > > > >> > > > > > > periodically trigger checkpoints at the interval
> > > > > > > >> corresponding to
> > > > > > > >> > > the
> > > > > > > >> > > > > > > minimum of all these proposed upper-bounds.
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > >> If one source is processing a backlog and the
> > > other
> > > > > is
> > > > > > > >> already
> > > > > > > >> > > > > > > processing real time data..
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > Overall, I am not sure we always want to have a
> > > longer
> > > > > > > >> > > checkpointing
> > > > > > > >> > > > > > > interval. That really depends on the specific
> > > use-case
> > > > > and
> > > > > > > the
> > > > > > > >> > job
> > > > > > > >> > > > > graph.
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > The proposed API change mechanism for operators
> > and
> > > > > users
> > > > > > to
> > > > > > > >> > > specify
> > > > > > > >> > > > > > > different checkpoint intervals at different
> > periods
> > > of
> > > > > the
> > > > > > > >> job.
> > > > > > > >> > > Users
> > > > > > > >> > > > > > have
> > > > > > > >> > > > > > > the option to use the new API to get better
> > > > performance
> > > > > in
> > > > > > > the
> > > > > > > >> > > > use-case
> > > > > > > >> > > > > > > specified in the motivation section. I believe
> > there
> > > > can
> > > > > > be
> > > > > > > >> > > use-case
> > > > > > > >> > > > > > where
> > > > > > > >> > > > > > > the proposed API is not useful, in which case
> > users
> > > > can
> > > > > > > choose
> > > > > > > >> > not
> > > > > > > >> > > to
> > > > > > > >> > > > > use
> > > > > > > >> > > > > > > the API without incurring any performance
> > > regression.
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > >> it might be a bit confusing and not user
> > friendly
> > > > to
> > > > > > have
> > > > > > > >> > > multiple
> > > > > > > >> > > > > > > places that can override the checkpointing
> > behaviour
> > > > in
> > > > > a
> > > > > > > >> > different
> > > > > > > >> > > > way
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > Admittedly, adding more APIs always incur more
> > > > > complexity.
> > > > > > > But
> > > > > > > >> > > > > sometimes
> > > > > > > >> > > > > > we
> > > > > > > >> > > > > > > have to incur this complexity to address new
> > > > use-cases.
> > > > > > > Maybe
> > > > > > > >> we
> > > > > > > >> > > can
> > > > > > > >> > > > > see
> > > > > > > >> > > > > > if
> > > > > > > >> > > > > > > there are more user-friendly way to address this
> > > > > use-case.
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > >> already implemented and is simple from the
> > > > > perspective
> > > > > > of
> > > > > > > >> > Flink
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > Do you mean that the HybridSource operator
> should
> > > > invoke
> > > > > > the
> > > > > > > >> rest
> > > > > > > >> > > API
> > > > > > > >> > > > > to
> > > > > > > >> > > > > > > trigger checkpoints? The downside of this
> approach
> > > is
> > > > > that
> > > > > > > it
> > > > > > > >> > makes
> > > > > > > >> > > > it
> > > > > > > >> > > > > > hard
> > > > > > > >> > > > > > > for developers of source operators (e.g. MySQL
> > CDC,
> > > > > > > >> HybridSource)
> > > > > > > >> > > to
> > > > > > > >> > > > > > > address the target use-case. AFAIK, there is no
> > > > existing
> > > > > > > case
> > > > > > > >> > where
> > > > > > > >> > > > we
> > > > > > > >> > > > > > > require operator developers to use REST API to
> do
> > > > their
> > > > > > job.
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > Can you help explain the benefit of using REST
> API
> > > > over
> > > > > > > using
> > > > > > > >> the
> > > > > > > >> > > > > > proposed
> > > > > > > >> > > > > > > API?
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > Note that this approach also seems to have the
> > same
> > > > > > downside
> > > > > > > >> > > > mentioned
> > > > > > > >> > > > > > > above: "multiple places that can override the
> > > > > > checkpointing
> > > > > > > >> > > > > behaviour". I
> > > > > > > >> > > > > > > am not sure there can be a solution to address
> the
> > > > > target
> > > > > > > >> > use-case
> > > > > > > >> > > > > > without
> > > > > > > >> > > > > > > having multiple places that can affect the
> > > > checkpointing
> > > > > > > >> > behavior.
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > >> check if `pendingRecords` for some source has
> > > > > exceeded
> > > > > > > the
> > > > > > > >> > > > > configured
> > > > > > > >> > > > > > > threshold and based on that adjust the
> > checkpointing
> > > > > > > interval
> > > > > > > >> > > > > accordingly
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > I am not sure this approach can address the
> target
> > > > > > use-case
> > > > > > > >> in a
> > > > > > > >> > > > better
> > > > > > > >> > > > > > > way. In the target use-case, we would like to
> > > > > HybridSource
> > > > > > > to
> > > > > > > >> > > trigger
> > > > > > > >> > > > > > > checkpoint more frequently when it is read the
> > Kafka
> > > > > > Source
> > > > > > > >> (than
> > > > > > > >> > > > when
> > > > > > > >> > > > > it
> > > > > > > >> > > > > > > is reading the HDFS source). We would need to
> set
> > a
> > > > flag
> > > > > > for
> > > > > > > >> the
> > > > > > > >> > > > > > checkpoint
> > > > > > > >> > > > > > > trigger to know which source the HybridSource is
> > > > reading
> > > > > > > from.
> > > > > > > >> > But
> > > > > > > >> > > > IMO
> > > > > > > >> > > > > > the
> > > > > > > >> > > > > > > approach is less intuitive and more complex than
> > > > having
> > > > > > the
> > > > > > > >> > > > > HybridSource
> > > > > > > >> > > > > > > invoke upperBoundCheckpointingInterval()
> directly
> > > once
> > > > > it
> > > > > > is
> > > > > > > >> > > reading
> > > > > > > >> > > > > > Kafka
> > > > > > > >> > > > > > > Source.
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > Maybe I did not understand the alternative
> > approach
> > > > > > rightly.
> > > > > > > >> I am
> > > > > > > >> > > > happy
> > > > > > > >> > > > > > to
> > > > > > > >> > > > > > > discuss more on this topic. WDYT?
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > Best,
> > > > > > > >> > > > > > > Dong
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > On Tue, May 23, 2023 at 10:27 PM Piotr Nowojski
> <
> > > > > > > >> > > > pnowoj...@apache.org>
> > > > > > > >> > > > > > > wrote:
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > > Hi,
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > > Thanks for the proposal. However, are you sure
> > > that
> > > > > the
> > > > > > > >> > > > > > > > OperatorCoordinator is the right place to
> place
> > > such
> > > > > > > logic?
> > > > > > > >> > What
> > > > > > > >> > > > > would
> > > > > > > >> > > > > > > > happen if there are two (or more) operator
> > > > > coordinators
> > > > > > > with
> > > > > > > >> > > > > > conflicting
> > > > > > > >> > > > > > > > desired checkpoint trigger behaviour? If one
> > > source
> > > > is
> > > > > > > >> > > processing a
> > > > > > > >> > > > > > > backlog
> > > > > > > >> > > > > > > > and the other is already processing real time
> > > data,
> > > > I
> > > > > > > would
> > > > > > > >> > > assume
> > > > > > > >> > > > > that
> > > > > > > >> > > > > > > in
> > > > > > > >> > > > > > > > most use cases you would like to still have
> the
> > > > longer
> > > > > > > >> > > > checkpointing
> > > > > > > >> > > > > > > > interval, not the shorter one. Also apart from
> > > that,
> > > > > it
> > > > > > > >> might
> > > > > > > >> > be
> > > > > > > >> > > a
> > > > > > > >> > > > > bit
> > > > > > > >> > > > > > > > confusing and not user friendly to have
> multiple
> > > > > places
> > > > > > > that
> > > > > > > >> > can
> > > > > > > >> > > > > > override
> > > > > > > >> > > > > > > > the checkpointing behaviour in a different
> way.
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > > FIY in the past, we had some discussions about
> > > > similar
> > > > > > > >> requests
> > > > > > > >> > > and
> > > > > > > >> > > > > > back
> > > > > > > >> > > > > > > > then we chose to keep the system simpler, and
> > > > exposed
> > > > > a
> > > > > > > more
> > > > > > > >> > > > generic
> > > > > > > >> > > > > > REST
> > > > > > > >> > > > > > > > API checkpoint triggering mechanism. I know
> that
> > > > > having
> > > > > > to
> > > > > > > >> > > > implement
> > > > > > > >> > > > > > such
> > > > > > > >> > > > > > > > logic outside of Flink and having to call REST
> > > calls
> > > > > to
> > > > > > > >> trigger
> > > > > > > >> > > > > > > checkpoints
> > > > > > > >> > > > > > > > might not be ideal, but that's already
> > implemented
> > > > and
> > > > > > is
> > > > > > > >> > simple
> > > > > > > >> > > > from
> > > > > > > >> > > > > > the
> > > > > > > >> > > > > > > > perspective of Flink.
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > > I don't know, maybe instead of adding this
> logic
> > > to
> > > > > > > operator
> > > > > > > >> > > > > > > coordinators,
> > > > > > > >> > > > > > > > `CheckpointCoordinator` should have a
> pluggable
> > > > > > > >> > > > `CheckpointTrigger`,
> > > > > > > >> > > > > > that
> > > > > > > >> > > > > > > > the user could configure like a
> > `MetricReporter`.
> > > > The
> > > > > > > >> default
> > > > > > > >> > one
> > > > > > > >> > > > > would
> > > > > > > >> > > > > > > be
> > > > > > > >> > > > > > > > just periodically triggering checkpoints.
> Maybe
> > > > > > > >> > > > > > > > `BacklogDynamicCheckpointTrigger` could look
> at
> > > > > > > metrics[1],
> > > > > > > >> > check
> > > > > > > >> > > > if
> > > > > > > >> > > > > > > > `pendingRecords` for some source has exceeded
> > the
> > > > > > > configured
> > > > > > > >> > > > > threshold
> > > > > > > >> > > > > > > and
> > > > > > > >> > > > > > > > based on that adjust the checkpointing
> interval
> > > > > > > accordingly?
> > > > > > > >> > This
> > > > > > > >> > > > > would
> > > > > > > >> > > > > > > at
> > > > > > > >> > > > > > > > least address some of my concerns.
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > > WDYT?
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > > Best,
> > > > > > > >> > > > > > > > Piotrek
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > > [1]
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > >
> > > > > > > >> > > > >
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > > wt., 9 maj 2023 o 19:11 Yunfeng Zhou <
> > > > > > > >> > > flink.zhouyunf...@gmail.com>
> > > > > > > >> > > > > > > > napisał(a):
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > >> Hi all,
> > > > > > > >> > > > > > > >>
> > > > > > > >> > > > > > > >> Dong(cc'ed) and I are opening this thread to
> > > > discuss
> > > > > > our
> > > > > > > >> > > proposal
> > > > > > > >> > > > to
> > > > > > > >> > > > > > > >> support dynamically triggering checkpoints
> from
> > > > > > > operators,
> > > > > > > >> > which
> > > > > > > >> > > > has
> > > > > > > >> > > > > > > >> been documented in FLIP-309
> > > > > > > >> > > > > > > >> <
> > > > > > > >> > > > > > > >>
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > >
> > > > > > > >> > > > >
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255069517
> > > > > > > >> > > > > > > >> >.
> > > > > > > >> > > > > > > >>
> > > > > > > >> > > > > > > >> With the help of the ability proposed in this
> > > FLIP,
> > > > > > users
> > > > > > > >> > could
> > > > > > > >> > > > > > > >> improve the performance of their Flink job in
> > > cases
> > > > > > like
> > > > > > > >> when
> > > > > > > >> > > the
> > > > > > > >> > > > > job
> > > > > > > >> > > > > > > >> needs to process both historical batch data
> and
> > > > > > real-time
> > > > > > > >> > > > streaming
> > > > > > > >> > > > > > > >> data, by adjusting the checkpoint triggerings
> > in
> > > > > > > different
> > > > > > > >> > > phases
> > > > > > > >> > > > > of a
> > > > > > > >> > > > > > > >> HybridSource or CDC source.
> > > > > > > >> > > > > > > >>
> > > > > > > >> > > > > > > >> This proposal would be a fundamental
> component
> > in
> > > > the
> > > > > > > >> effort
> > > > > > > >> > to
> > > > > > > >> > > > > > > >> further unify Flink's batch and stream
> > processing
> > > > > > > ability.
> > > > > > > >> > > Please
> > > > > > > >> > > > > feel
> > > > > > > >> > > > > > > >> free to reply to this email thread and share
> > with
> > > > us
> > > > > > your
> > > > > > > >> > > > opinions.
> > > > > > > >> > > > > > > >>
> > > > > > > >> > > > > > > >> Best regards.
> > > > > > > >> > > > > > > >>
> > > > > > > >> > > > > > > >> Dong and Yunfeng
> > > > > > > >> > > > > > > >>
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > >
> > > > > > > >> > > > >
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to