Hi Dong, Thanks for the quick reply and for clarification, yeah that makes sense! Best Regards, Ahmed Hamdy
On Fri, 2 Jun 2023 at 02:59, Dong Lin <lindon...@gmail.com> wrote: > Hi Ahmed, > > Thanks for the comments. > > I agree with you and Piotr that it would be useful to provide a more > generic approach to address more use-case in one proposal. On the other > hand, I also think it is important to make sure that the alternative (more > generic) approach can indeed address the extra use-cases reliably as > expected. Then we can compare the pros/cons of these approaches and make > the best choice for Flink users. > > If I understand your question correctly, you are asking whether it would be > better to replace upperBoundCheckpointingIntervalForLastSource() with an > API on the source/operator interface. > > The short answer is probably no. This is because the expected users of the > API *HybridSourceBuilder#upperBoundCheckpointingIntervalForLastSource*() > are end-users who use Flink API and connector API to develop Flink job. We > probably don't want end-users to directly use the source/operator > interface, which is generally more complicated and intended to be used by > developers of source operators. > > FLIP-309 currently proposes to add the API > *SplitEnumeratorContext#upperBoundCheckpointingInterval* for developers of > source operators (e.g. HybridSource, MySQL CDC source) to upper-bound > checkpointing interval. Are you suggesting that we should replace this API > with a config on the source or operator constructor? > > This approach probably works for HybridSource. But I am not sure it works > for MySQL CDC Source (which is also mentioned in the latest FLIP-309 > motivation section), which is implemented as one source operator rather > than multiple source operators (which HybridSource does). And we need to > enable the new checkpointing interval in the middle of this source > operator's execution. > > If I misunderstood your suggestion, can you provide more details regarding > the proposed API and explain its benefits? > > Best, > Dong > > > > On Fri, Jun 2, 2023 at 2:12 AM Ahmed Hamdy <hamdy10...@gmail.com> wrote: > > > Hi Dong, > > Thanks for the great proposal. > > The thread is very intuitive along with suggestions from Jing and Piotr. > > As much as I like the simplicity of the proposed approach I think a much > > wider benefit is achieved by taking a more generic approach similar to > > Piotr's suggestion of having a `CheckpointTrigger`. I think this even > > solidifies the argument you are discussing > > > On the other hand, the approach currently proposed in the FLIP is much > > simpler as it does not depend on backpressure. Users specify the extra > > interval requirement on the specific sources (e.g. HybridSource, MySQL > CDC > > Source) and can easily know the checkpointing interval will be used on > the > > continuous phase of the corresponding source. > > > > where the base HybridSource can use a `CheckpointTrigger` that doesn't > > depend on backpressure. > > > > > > > > > > I have a couple of questions for clarification. > > > > @Dong > > Do you think in the solution in FLIP 309, instead of using > > ``` > > /** > > * Upper-bound the checkpointing interval when the last source > > added right before this > > * method invocation is reading data. > > */ > > public <ToEnumT extends SplitEnumerator, NextSourceT extends > > Source<T, > > ?, ?>> > > HybridSourceBuilder<T, ToEnumT> > > upperBoundCheckpointingIntervalForLastSource( > > Duration duration) { > > ... > > } > > ``` > > > > We can have an upperBoundCheckpointingInterval configured in the Source > > Interface, or even better in the Operator one. > > then we can easily implement the one for HybridSource by relying on > > delegation to the `currentReader`. > > > > > > @Piotr > > > > Regarding the more general approach of adjusting based on generic > > triggers/backpressure metrics. I saw you mentioned the resemblance with > > FLIP-271, > > Do you think it is worth going with the REST API proposal for dynamically > > configuring the interval hence the trigger logic could be implemented on > > Flink or external systems like Flink Kubernetes Operator? > > Wdyt? I think the REST API proposal here sounds more and more > interesting. > > > > > > Best Regards, > > Ahmed Hamdy > > > > > > On Wed, 31 May 2023 at 07:59, Dong Lin <lindon...@gmail.com> wrote: > > > > > Hi Piotr, > > > > > > Thanks for the reply. Please see my comments inline. > > > > > > On Wed, May 31, 2023 at 12:58 AM Piotr Nowojski <pnowoj...@apache.org> > > > wrote: > > > > > > > Hi Dong, > > > > > > > > First of all we don't need to send any extra signal from source (or > non > > > > source) operators. All of the operators are already reporting > > > backpressured > > > > metrics [1] > > > > and all of the metrics are already sent to JobManager. We would only > > need > > > > > > > > > > Hmm... I am not sure metrics such as isBackPressured are already sent > to > > > JM. According to the doc > > > < > > > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#io > > > >, > > > this metric is only available on TaskManager. And I could't find the > code > > > that sends these metrics to JM. Can you help provide link to the code > and > > > doc that shows this metric is reported to JM. > > > > > > Suppose this metric is indeed reported to JM, we also need to confirm > > that > > > the frequency meets our need. For example, typically metrics are > updated > > on > > > the order of seconds. The default metric reporter interval (as > specified > > in > > > MetricOptions) is 10 seconds, which is probably not sufficient for the > > > suggested approach to work reliably. This is because the longer the > > > interval, the more likely that the algorithm will not trigger > checkpoint > > > using the short interval even if all subtasks are not-backpressured. > > > > > > For example, let's say every source operator subtask reports this > metric > > to > > > JM once every 10 seconds. There are 100 source subtasks. And each > subtask > > > is backpressured roughly 10% of the total time due to traffic spikes > (and > > > limited buffer). Then at any given time, there are 1 - 0.9^100 = > 99.997% > > > chance that there is at least one subtask that is backpressured. Then > we > > > have to wait for at least 10 seconds to check again. The expected > > > checkpointing interval can be very close to 30 minutes in the use-case > > > mentioned earlier. > > > > > > to pass some accessor to the metrics to the `CheckpointTrigger`. > > > > > > > > > > > execution.checkpointing.interval.no-backpressure > > > > > > > > Maybe that's the way to go, but as I mentioned before, I could see > this > > > > `CheckpointTrigger` to be a pluggable component, that could have been > > > > configured > > > > the same way as `MetricReporters` are right now [2]. We could just > > > provide > > > > out of the box two plugins, one implementing current checkpoint > > > triggering > > > > strategy, > > > > and the other using backpressure. > > > > > > > > > > Yes, it is possible to add a CheckpointTrigger as a pluggable > component. > > I > > > am open to this idea as long as it provides benefits over the job-level > > > config (e.g. covers more use-case, or simpler configuration for > > > common-case). > > > > > > I think we can decide how to let user specify this interval after we > are > > > able to address the other issues related to the feasibility and > > reliability > > > of the suggested approach. > > > > > > > > > > > I think the root cause of this issue is that the decision of the > > > > > checkpointing interval really depends on the expected impact of a > > > > > checkpoint on the throughput. > > > > > > > > Yes, I agree. Ideally we probably should adjust the checkpointing > > > interval > > > > based on measured latency, for example using latency markers [3], but > > > that > > > > would > > > > require some investigation if latency markers are indeed that costly > as > > > > documented and if so optimizing them to solve the performance > > degradation > > > > of enabling > > > > e2e latency tracking. > > > > > > > > > > However, given that the new back pressure monitoring strategy would > be > > > > optional AND users could implement their own `CheckpointTrigger` if > > > really > > > > needed > > > > AND I have a feeling that there might be an even better solution > (more > > > > about that later). > > > > > > > > > > Overall I guess you are suggesting that 1) we can optimize the overhead > > of > > > latency tracking so that we can always turn it on and 2) we can use the > > > measured latency to dynamically determine checkpointing interval. > > > > > > I can understand this intuition. Still, the devil is in the details. > > After > > > thinking more about this, I am not sure I can find a good way to make > it > > > work. I am happy to discuss proc/cons if you provide more concrete > > > solutions. > > > > > > Note that goals of the alternative approach include 1) support sources > > > other than HybridSource and 2) reduce checkpointing interval when the > job > > > is backpressured. These goals are not necessary to achieve the use-case > > > targed by FLIP-309. While it will be nice to support additional > use-cases > > > with one proposal, it is probably also reasonable to make incremental > > > progress and support the low-hanging-fruit use-case first. The choice > > > really depends on the complexity and the importance of supporting the > > extra > > > use-cases. > > > > > > I am hoping that we can still be open to using the approach proposed in > > > FLIP-309 and we can not make the alternative approach work. What do you > > > think? > > > > > > > > > > > if the checkpointing overhead is > > > > > close to none, then it is beneficial to the e2e latency to still > > > > checkpoint > > > > > a high frequency even if there exists (intermittent) backpressure. > > > > > > > > In that case users could just configure a slow checkpointing interval > > to > > > a > > > > lower value, or just use static checkpoint interval strategy. > > > > > > > > > > I guess the point is that the suggested approach, which dynamically > > > determines the checkpointing interval based on the backpressure, may > > cause > > > regression when the checkpointing interval is relatively low. This > makes > > it > > > hard for users to enable this feature in production. It is like an > > > auto-driving system that is not guaranteed to work > > > > > > On the other hand, the approach currently proposed in the FLIP is much > > > simpler as it does not depend on backpressure. Users specify the extra > > > interval requirement on the specific sources (e.g. HybridSource, MySQL > > CDC > > > Source) and can easily know the checkpointing interval will be used on > > the > > > continuous phase of the corresponding source. This is pretty much same > as > > > how users use the existing execution.checkpointing.interval config. So > > > there is no extra concern of regression caused by this approach. > > > > > > > > > > > > > > > With the suggested approach, the e2e latency introduced by Flink is > > > > roughly > > > > > 72 seconds. This is because it takes 1 minute for 11MBps phase to > > end, > > > > and > > > > > another 12 seconds for the accumulated backlog to be cleared. And > > Flink > > > > can > > > > > not do checkpoint before the backlog is cleared. > > > > > > > > Indeed that's a valid concern. After thinking more about this issue, > > > maybe > > > > the proper solution would be to calculate "how much overloaded is the > > > most > > > > overloaded subtask". > > > > In this case, that would be 10% (we are trying to push 110% of the > > > > available capacity in the current job/cluster). Then we could use > that > > > > number as some kind of weighted average. > > > > We could figure out a function mapping the overload percentage, into > a > > > > floating point number from range [0, 1] > > > > > > > > f(overload_factor) = weight // weight is from [0, 1] > > > > > > > > and then the desired checkpoint interval would be something like > > > > > > > > (1 - weight) * fastCheckpointInterval + weight * > slowCheckpointInterval > > > > > > > > In your problematic example, we would like the weight to be pretty > > small > > > > (<10%?), so the calculated checkpoint interval would be pretty close > to > > > the > > > > fastCheckpointInterval. > > > > > > > > > > Hmm... I am not sure it will always be pretty close to the > > > fastCheckpointInterval. We can discuss when there is concrete > definition > > of > > > this algorithm. > > > > > > While each source subtask can measure its current throughput, I am not > > sure > > > it can measure the "input throughput", which is defined as the > throughput > > > when the subtask (and it downstream operators) as the unlimited > > processing > > > capacity. Therefore, it seems pretty hard to determine the > > > "overload_factor" timely and accurately. > > > > > > > > > > The overload factor we could calculate the same way as FLIP-271 is > > > > calculating how much should we rescale given operator [4]. > > > > > > > > I can think about this more and elaborate/refine this idea tomorrow. > > > > > > > > > > Sounds good. Looking forward to learning more ideas. > > > > > > Best, > > > Dong > > > > > > > > > > > > > > Best, > > > > Piotrek > > > > > > > > > > > > [1] > > > > > > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#io > > > > [2] > > > > > > > > > > > > > > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/metric_reporters/ > > > > [3] > > > > > > > > > > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#end-to-end-latency-tracking > > > > [4] > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-271%3A+Autoscaling > > > > > > > > wt., 30 maj 2023 o 13:58 Dong Lin <lindon...@gmail.com> napisał(a): > > > > > > > > > Hi Piotr, > > > > > > > > > > Thank you for providing those details. > > > > > > > > > > I understand you suggested using the existing "isBackPressured" > > signal > > > to > > > > > determine whether we should use the less frequent checkpointing > > > > interval. I > > > > > followed your thoughts and tried to make it work. Below are the > > issues > > > > that > > > > > I am not able to address. Can you see if there is a way to address > > > these > > > > > issues? > > > > > > > > > > Let's will use the following use-case to make the discussion more > > > > concrete: > > > > > a) Users want to checkpoint at least once every 30 minutes to > > > upper-bound > > > > > the amount of duplicate work after job failover. > > > > > b) Users want to checkpoint at least once every 30 seconds to > > > > > upper-bound *extra > > > > > e2e lag introduced by the Flink job* during the continuous > processing > > > > > phase. > > > > > > > > > > The suggested approach is designed to do this: > > > > > - If any of the source subtasks is backpressured, the job will > > > checkpoint > > > > > at 30-minutes interval. > > > > > - If none of the source subtasks is backpressured, the job will > > > > checkpoint > > > > > at 30-seconds interval. > > > > > > > > > > And we would need to add the following public APIs to implement > this > > > > > approach: > > > > > - Add a job level config, maybe > > > > > execution.checkpointing.interval.no-backpressure. This is the > > > > checkpointing > > > > > interval when none of the source subtasks is backpressured. > > > > > - Add a public API for source operator subtasks to report their > > > > > backpressure status to the checkpointing coordinator. The subtask > > > should > > > > > invoke this API whenever its backpressure status changed. > > > > > > > > > > Now, in order to make the suggested approach work for all users > (i.e. > > > no > > > > > regression), we need to make sure that whenever we use the > 30-minutes > > > > > checkpointing interval, the e2e latency will be less than or equal > to > > > the > > > > > case where we use the 30-seconds checkpointing interval. > > > > > > > > > > I thought about this in detail, and found the following fabricated > > > > > scenarios where this approach might cause regression: > > > > > > > > > > During the continuous processing phase, the input throughput is > 5MBps > > > > for 1 > > > > > minute, and 11MBps for 1 minutes, in lock-steps. The maximum > > throughput > > > > > achievable by this job is 10Mbps. For simplicity, suppose the > buffer > > > size > > > > > can hold roughly 1 second worth-of-data, then the job is > > > > > backpressured roughly 1 minutes out of every 2 minutes. > > > > > > > > > > With the suggested approach, the e2e latency introduced by Flink is > > > > roughly > > > > > 72 seconds. This is because it takes 1 minute for 11MBps phase to > > end, > > > > and > > > > > another 12 seconds for the accumulated backlog to be cleared. And > > Flink > > > > can > > > > > not do checkpoint before the backlog is cleared. > > > > > > > > > > On the other hand, if we continue to checkpoint at 30-seconds > > interval, > > > > the > > > > > e2e latency introduced by Flink is at most 42 seconds, plus the > extra > > > > delay > > > > > introduced by the checkpoint overhead. The e2e latency will be > better > > > > than > > > > > the suggested approach, if the impact of the checkpoint is less > than > > 30 > > > > > seconds. > > > > > > > > > > I think the root cause of this issue is that the decision of the > > > > > checkpointing interval really depends on the expected impact of a > > > > > checkpoint on the throughput. For example, if the checkpointing > > > overhead > > > > is > > > > > close to none, then it is beneficial to the e2e latency to still > > > > checkpoint > > > > > a high frequency even if there exists (intermittent) backpressure. > > > > > > > > > > Here is another fabricated use-case where the suggested approach > > might > > > > > cause regression. Let's say user's job is > > > > > *hybridSource.keyBy(...).transform(operatorA).sinkTo(PaimonSink)*. > > The > > > > > parallelism is 2. As we can see, there is all-to-all edge between > > > source > > > > > and operatorA. And due to limited resources (e.g. buffer), at any > > given > > > > > time, each operatorA subtask can only process data from one of its > > > > upstream > > > > > subtask at a time, meaning that the other upstream subtask will be > > > > > backpressured. So there might always be at least one source subtask > > > that > > > > is > > > > > backpressured even though the job's throughput can catch up with > the > > > > input > > > > > throughput. However, the suggested approach might end up always > using > > > the > > > > > less frequent checkpointing interval in this case. > > > > > > > > > > Suppose we can find a way to address the above issues, another > issue > > > with > > > > > the suggested approach is the extra communication overhead between > > the > > > > > source operator subtasks and the checkpointing coordinator. The > > source > > > > > subtask needs to send a message to checkpointing coordinator > whenever > > > its > > > > > backpressure status changes. The more frequently we check (e.g. > once > > > > every > > > > > 10 ms), the larger the overhead. And if we check not so frequently > > > (e.g. > > > > > once every second), we might be more vulnerable to > random/occasional > > > > > backpressure. So there seems to be tradeoff between the reliability > > and > > > > the > > > > > cost of this approach. > > > > > > > > > > Thanks again for the suggestion. I am looking forward to your > > comments. > > > > > > > > > > Best, > > > > > Dong > > > > > > > > > > > > > > > On Tue, May 30, 2023 at 4:37 PM Piotr Nowojski < > pnowoj...@apache.org > > > > > > > > wrote: > > > > > > > > > > > Hi again, > > > > > > > > > > > > Thanks Dong, yes I think your concerns are valid, and that's why > I > > > have > > > > > > previously refined my idea to use one of the backpressure > measuring > > > > > metrics > > > > > > that we already have. > > > > > > Either simply `isBackPressured == true` check [1], or > > > > > > `backPressuredTimeMsPerSecond >= N` (where `N ~= 990`) [2]. That > > > would > > > > > > address your three first concerns: > > > > > > - lack of event time > > > > > > - event time unreliability > > > > > > - lack of universal threshold value for `pendingRecords` > > > > > > > > > > > > In a bit more detail, we probably should check (using [1] or [2]) > > > > either: > > > > > > a) if any of the source subtasks is backpressured > > > > > > b) if any of the subtasks is backpressured > > > > > > > > > > > > In most cases a == b. The only time when that's not true, if some > > > > > windowed > > > > > > operator in the middle of the job graph started triggering so > many > > > > > results > > > > > > that it became backpressured, > > > > > > but the backpressure didn't last long enough to propagate to > > sources. > > > > For > > > > > > example that especially might occur if sources are idle. So > > probably > > > b) > > > > > is > > > > > > a better and more generic option. > > > > > > > > > > > > Regarding your last concern, with spiky traffic, I think the > > > following > > > > > > algorithm of triggering checkpoints would work pretty well: > > > > > > > > > > > > public BackpressureDetectingCheckpointTrigger { > > > > > > > > > > > > private long lastCheckpointTs = System.currentTimeMillis(); > > > > > > private long slowCheckpointInterval = ...; > > > > > > private long fastCheckpointInteveral = ...; > > > > > > > > > > > > //code executed periodically, for example once a second, once > every > > > > 10ms, > > > > > > or at the 1/10th of the fast checkpoint interval > > > > > > void maybeTriggerCheckpoint(...) { > > > > > > > > > > > > long nextCheckpointTs = lastCheckpointTs; > > > > > > if (isAnySubtaskBackpressured()) { > > > > > > nextCheckpointTs += slowCheckpointInterval; > > > > > > } > > > > > > else { > > > > > > nextCheckpointTs += fastCheckpointInterval; > > > > > > } > > > > > > > > > > > > if (nextCheckpointTs >= System.currentTimeMillis()) { > > > > > > triggerCheckpoint(); > > > > > > lastCheckpointTs = System.currentTimeMillis(); > > > > > > } > > > > > > } > > > > > > } > > > > > > > > > > > > This way, if there is a spike of backpressure, it doesn't matter > > that > > > > > much. > > > > > > If the backpressure goes away until the next iteration, the next > > > check > > > > > will > > > > > > trigger a checkpoint according to the > > > > > > fast interval. The slow checkpoint interval will be used only if > > the > > > > > > backpressure persists for the whole duration of the > > > > > slowCheckpointInterval. > > > > > > > > > > > > We could also go a little bit more fancy, and instead of using > only > > > > fast > > > > > or > > > > > > slow intervals, we could use a continuous spectrum to gradually > > > adjust > > > > > the > > > > > > interval, by replacing the first if/else > > > > > > check with a weighted average: > > > > > > > > > > > > int maxBackPressureTime = > > > > getSubtaskMaxBackPressuredTimeMsPerSecond(); > > > > > > long nextCheckpointTs = lastCheckpointTs + > > slowCheckpointInterval * > > > > > > maxBackPressureTime + fastCheckpointInterval * (1000 - > > > > > > maxBackPressureTime); > > > > > > > > > > > > This would further eliminate some potential jitter and make the > > > actual > > > > > > checkpoint interval a bit more predictable. > > > > > > > > > > > > Best, > > > > > > Piotrek > > > > > > > > > > > > > > > > > > wt., 30 maj 2023 o 04:40 Dong Lin <lindon...@gmail.com> > > napisał(a): > > > > > > > > > > > > > Let me correct the typo in the last paragraph as below: > > > > > > > > > > > > > > To make the problem even harder, the incoming traffic can be > > spiky. > > > > And > > > > > > the > > > > > > > overhead of triggering checkpointing can be relatively low, in > > > which > > > > > case > > > > > > > it might be more performant (w.r.t. e2e lag) for the Flink job > to > > > > > > > checkpoint at the more frequent interval in the continuous > phase > > in > > > > > face > > > > > > of > > > > > > > a spike in the number of pending records buffered in the source > > > > > operator. > > > > > > > > > > > > > > > > > > > > > On Tue, May 30, 2023 at 9:17 AM Dong Lin <lindon...@gmail.com> > > > > wrote: > > > > > > > > > > > > > > > Hi Piotrek, > > > > > > > > > > > > > > > > Thanks for providing more details of the alternative > approach! > > > > > > > > > > > > > > > > If I understand your proposal correctly, here are the > > > requirements > > > > > for > > > > > > it > > > > > > > > to work without incurring any regression: > > > > > > > > > > > > > > > > 1) The source needs a way to determine whether there exists > > > > > > backpressure. > > > > > > > > 2) If there is backpressure, then it means e2e latency is > > already > > > > > high > > > > > > > > and there should be no harm to use the less frequent > > > checkpointing > > > > > > > interval. > > > > > > > > 3) The configuration of the "less frequent checkpointing > > > interval" > > > > > > needs > > > > > > > > to be a job-level config so that it works for sources other > > than > > > > > > > > HybridSource. > > > > > > > > > > > > > > > > I would say that if we can find a way for the source to > > determine > > > > the > > > > > > > > "existence of backpressure" and meet the requirement 2), it > > would > > > > > > indeed > > > > > > > be > > > > > > > > a much more elegant approach that solves more use-cases. > > > > > > > > > > > > > > > > The devil is in the details. I am not sure how to determine > the > > > > > > > "existence > > > > > > > > of backpressure". Let me explain my thoughts and maybe you > can > > > help > > > > > > > > provide the answers. > > > > > > > > > > > > > > > > To make the discussion more concrete, let's say the input > > records > > > > do > > > > > > not > > > > > > > > have event timestamps. Users want to checkpoint at least once > > > every > > > > > 30 > > > > > > > > minutes to upper-bound the amount of duplicate work after job > > > > > failover. > > > > > > > And > > > > > > > > users want to checkpoint at least once every 30 seconds to > > > > > upper-bound > > > > > > > *extra > > > > > > > > e2e lag introduced by the Flink job* during the continuous > > > > processing > > > > > > > > phase. > > > > > > > > > > > > > > > > Since the input records do not have event timestamps, we can > > not > > > > rely > > > > > > on > > > > > > > > metrics such as currentFetchEventTimeLag [1] to determine the > > > > > absolute > > > > > > > e2e > > > > > > > > lag, because currentFetchEventTimeLag depends on the > existence > > of > > > > > event > > > > > > > > timestamps. > > > > > > > > > > > > > > > > Also note that, even if the input records have event > timestamps > > > and > > > > > we > > > > > > > can > > > > > > > > measure currentFetchEventTimeLag, we still need a threshold > to > > > > > > determine > > > > > > > > whether the value of currentFetchEventTimeLag is too high. > One > > > idea > > > > > > might > > > > > > > > be to use the user-specified "less frequent checkpointing > > > interval" > > > > > as > > > > > > > > this threshold, which in this case is 30 seconds. But this > > > approach > > > > > can > > > > > > > > also cause regression. For example, let's say the records go > > > > through > > > > > > > > several Kafka/MirrorMaker pipelines after it is generated and > > > > before > > > > > it > > > > > > > is > > > > > > > > received by Flink, causing its currentFetchEventTimeLag to be > > > > always > > > > > > > higher > > > > > > > > than 30 seconds. Then Flink will end up always using the > "less > > > > > frequent > > > > > > > > checkpointing interval" in the continuous phase, which in > this > > > case > > > > > is > > > > > > 30 > > > > > > > > minutes. > > > > > > > > > > > > > > > > Other options to determine the "existence of backpressure" > > > includes > > > > > > using > > > > > > > > the absolute number of records in the source storage system > > that > > > > are > > > > > > > > waiting to be fetched (e.g. pendingRecords [1]), or using the > > > > > absolute > > > > > > > > number of buffered records in the source output queue. > > However, I > > > > > find > > > > > > it > > > > > > > > hard to reliably determine "e2e latency is already high" > based > > on > > > > the > > > > > > > > absolute number of records. What threshold should we choose > to > > > > > > determine > > > > > > > > that the number of pending records is too many (and it is > safe > > to > > > > > > > increase > > > > > > > > the checkpointing interval)? > > > > > > > > > > > > > > > > To make the problem even harder, the incoming traffic can be > > > spiky. > > > > > And > > > > > > > > the overhead of triggering checkpointing can be relative low, > > in > > > > > which > > > > > > > case > > > > > > > > it might be more performance (w.r.t. e2e lag) for the Flink > job > > > to > > > > > > > > checkpoint at the higher interval in the continuous phase in > > face > > > > of > > > > > a > > > > > > > > spike in the number of pending records buffered in the source > > > > > operator. > > > > > > > > > > > > > > > > The problems described above are the main reasons that I can > > not > > > > > find a > > > > > > > > way to make the alternative approach work. Any thoughts? > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics > > > > > > > > > > > > > > > > > > > > > > > > On Mon, May 29, 2023 at 11:23 PM Piotr Nowojski < > > > > > pnowoj...@apache.org> > > > > > > > > wrote: > > > > > > > > > > > > > > > >> Hi > > > > > > > >> > > > > > > > >> @Jing > > > > > > > >> > > > > > > > >> > Your proposal to dynamically adjust the checkpoint > intervals > > > is > > > > > > > elegant! > > > > > > > >> It > > > > > > > >> > makes sense to build it as a generic feature in Flink. > > Looking > > > > > > forward > > > > > > > >> to > > > > > > > >> > it. However, for some user cases, e.g. when users were > aware > > > of > > > > > the > > > > > > > >> bounded > > > > > > > >> > sources (in the HybridSource) and care more about the > > > > throughput, > > > > > > the > > > > > > > >> > dynamic adjustment might not be required. Just let those > > > bounded > > > > > > > sources > > > > > > > >> > always have larger checkpoint intervals even when there is > > no > > > > back > > > > > > > >> > pressure. Because no one cares about latency in this case, > > > let's > > > > > > turn > > > > > > > >> off > > > > > > > >> > the dynamic adjustment, reduce the checkpoint frequency, > > have > > > > > better > > > > > > > >> > throughput, and save unnecessary source consumption. Did I > > > miss > > > > > > > anything > > > > > > > >> > here? > > > > > > > >> > > > > > > > >> But why do we need to have two separate mechanisms, if the > > > dynamic > > > > > > > >> adjustment based on the backpressure/backlog would > > > > > > > >> achieve basically the same goal as your proposal and would > > solve > > > > > both > > > > > > of > > > > > > > >> the problems? Having two independent solutions > > > > > > > >> in the same codebase, in the docs, that are achieving > > basically > > > > the > > > > > > same > > > > > > > >> thing is far from ideal. It would increase both the > > > > > > > >> complexity of the system and confuse potential users. > > > > > > > >> > > > > > > > >> Moreover, as I have already mentioned before, I don't like > the > > > > > current > > > > > > > >> proposal as it's focusing ONLY on the HybridSource, > > > > > > > >> which can lead to even worse problem in the future, where > many > > > > > > different > > > > > > > >> sources would have each a completely custom > > > > > > > >> solution to solve the same/similar problems, complicating > the > > > > system > > > > > > and > > > > > > > >> confusing the users even more. > > > > > > > >> > > > > > > > >> @Dong, > > > > > > > >> > > > > > > > >> > For now I am not able to come up with a good way to > support > > > > this. > > > > > I > > > > > > am > > > > > > > >> happy to discuss the > > > > > > > >> > pros/cons if you can provide more detail (e.g. API design) > > > > > regarding > > > > > > > how > > > > > > > >> to support this approach > > > > > > > >> > > > > > > > >> I have already described such proposal: > > > > > > > >> > > > > > > > >> > Piotr: > > > > > > > >> > I don't know, maybe instead of adding this logic to > operator > > > > > > > >> coordinators, `CheckpointCoordinator` should have a > pluggable > > > > > > > >> `CheckpointTrigger`, > > > > > > > >> > that the user could configure like a `MetricReporter`. The > > > > default > > > > > > one > > > > > > > >> would be just periodically triggering checkpoints. Maybe > > > > > > > >> > `BacklogDynamicCheckpointTrigger` could look at > metrics[1], > > > > check > > > > > if > > > > > > > >> `pendingRecords` for some source has exceeded the configured > > > > > > > >> > threshold and based on that adjust the checkpointing > > interval > > > > > > > >> accordingly? This would at least address some of my > concerns. > > > > > > > >> > > > > > > > >> plus > > > > > > > >> > > > > > > > >> > Piotr: > > > > > > > >> > Either way, I would like to refine my earlier idea, and > > > instead > > > > > of > > > > > > > >> using > > > > > > > >> metrics like `pendingRecords`, I think we could switch > between > > > > fast > > > > > > and > > > > > > > >> > slow checkpointing intervals based on the information if > the > > > job > > > > > is > > > > > > > >> backpressured or not. My thinking is as follows: > > > > > > > >> > > > > > > > > >> > As a user, I would like to have my regular fast > > checkpointing > > > > > > interval > > > > > > > >> for low latency, but the moment my system is not keeping up, > > if > > > > the > > > > > > > >> backpressure > > > > > > > >> > builds up, or simply we have a huge backlog to reprocess, > > > > latency > > > > > > > >> doesn't > > > > > > > >> matter anymore. Only throughput matters. So I would like the > > > > > > > checkpointing > > > > > > > >> to slow down. > > > > > > > >> > > > > > > > > >> > I think this should cover pretty well most of the cases, > > what > > > do > > > > > you > > > > > > > >> think? If this backpressured based behaviour is still not > > > enough, > > > > I > > > > > > > would > > > > > > > >> still say > > > > > > > >> > that we should provide plugable checkpoint triggering > > > > controllers > > > > > > that > > > > > > > >> would work based on metrics. > > > > > > > >> > > > > > > > >> > change the checkpointing interval based on the "backlog > > > signal", > > > > > > > >> > > > > > > > >> What's wrong with the job being backpressured? If job is > > > > > > backpressured, > > > > > > > we > > > > > > > >> don't care about individual records latency, only about > > > increasing > > > > > > > >> the throughput to get out of the backpressure situation > ASAP. > > > > > > > >> > > > > > > > >> > In the mentioned use-case, users want to have two > different > > > > > > > >> checkpointing > > > > > > > >> > intervals at different phases of the HybridSource. We > should > > > > > provide > > > > > > > an > > > > > > > >> API > > > > > > > >> > for users to express the extra checkpointing interval in > > > > addition > > > > > to > > > > > > > the > > > > > > > >> > existing execution.checkpointing.interval. What would be > the > > > > > > > definition > > > > > > > >> of > > > > > > > >> > that API with this alternative approach? > > > > > > > >> > > > > > > > >> I think my proposal with `BacklogDynamicCheckpointTrigger` > or > > > > > > > >> `BackpressureDetectingCheckpointTrigger` would solve your > > > > motivating > > > > > > use > > > > > > > >> case > > > > > > > >> just as well. > > > > > > > >> > > > > > > > >> 1. In the catch up phase (reading the bounded source): > > > > > > > >> a) if we are under backpressure (common case), system > would > > > > > fallback > > > > > > > to > > > > > > > >> the less frequent checkpointing interval > > > > > > > >> b) if there is no backpressure (I hope a rare case, there > > is a > > > > > > > backlog, > > > > > > > >> but the source is too slow), Flink cluster has spare > resources > > > to > > > > > > > actually > > > > > > > >> run more > > > > > > > >> frequent checkpointing interval. No harm should be > done. > > > But > > > > > > > >> arguably > > > > > > > >> using a less frequent checkpointing interval here should be > > more > > > > > > > >> desirable. > > > > > > > >> > > > > > > > >> 2. In the continuous processing phase (unbounded source) > > > > > > > >> a) if we are under backpressure, as I mentioned above, no > > one > > > > > cares > > > > > > > >> about > > > > > > > >> checkpointing interval and the frequency of committing > records > > > to > > > > > the > > > > > > > >> output, as e2e latency is already high due to the > > backlog > > > in > > > > > the > > > > > > > >> sources > > > > > > > >> b) if there is no backpressure, that's the only case where > > the > > > > > user > > > > > > > >> actually cares about the frequency of committing records to > > the > > > > > > output, > > > > > > > we > > > > > > > >> are > > > > > > > >> using the more frequent checkpointing interval. > > > > > > > >> > > > > > > > >> 1b) I think is mostly harmless, and I think could be solved > > with > > > > > some > > > > > > > >> extra > > > > > > > >> effort > > > > > > > >> 2a) and 2b) are not solved by your proposal > > > > > > > >> 2a) and 2b) are applicable to any source, not just > > HybridSource, > > > > > which > > > > > > > is > > > > > > > >> also not covered by your proposal. > > > > > > > >> > > > > > > > >> Best, > > > > > > > >> Piotrek > > > > > > > >> > > > > > > > >> > > > > > > > >> czw., 25 maj 2023 o 17:29 Jing Ge > <j...@ververica.com.invalid > > > > > > > > > > >> napisał(a): > > > > > > > >> > > > > > > > >> > Hi Dong, Hi Piotr, > > > > > > > >> > > > > > > > > >> > Thanks for the clarification. > > > > > > > >> > > > > > > > > >> > @Dong > > > > > > > >> > > > > > > > > >> > According to the code examples in the FLIP, I thought we > are > > > > > > focusing > > > > > > > on > > > > > > > >> > the HybridSource scenario. With the current HybridSource > > > > > > > >> implementation, we > > > > > > > >> > don't even need to know the boundedness of sources in the > > > > > > > HybridSource, > > > > > > > >> > since all sources except the last one must be bounded[1], > > i.e. > > > > > only > > > > > > > the > > > > > > > >> > last source is unbounded. This makes it much easier to set > > > > > different > > > > > > > >> > intervals to sources with different boundedness. > > > > > > > >> > > > > > > > > >> > Boundedness in Flink is a top level concept. I think it > > should > > > > be > > > > > ok > > > > > > > to > > > > > > > >> > introduce a top level config for the top level concept. I > am > > > not > > > > > > > >> familiar > > > > > > > >> > with MySQL CDC. For those specific cases, you are right, > > your > > > > > > proposal > > > > > > > >> can > > > > > > > >> > provide the feature with minimal changes, like I mentioned > > > > > > previously, > > > > > > > >> it > > > > > > > >> > is a thoughtful design. +1 > > > > > > > >> > > > > > > > > >> > @Piotr > > > > > > > >> > > > > > > > > >> > > For example join (windowed/temporal) of two tables > backed > > > by a > > > > > > > hybrid > > > > > > > >> > > source? I could easily see a scenario where one table > with > > > > > little > > > > > > > data > > > > > > > >> > > catches up much more quickly. > > > > > > > >> > > > > > > > > >> > I am confused. I thought we were talking about > HybridSource > > > > which > > > > > > > >> "solves > > > > > > > >> > the problem of sequentially reading input from > heterogeneous > > > > > sources > > > > > > > to > > > > > > > >> > produce a single input stream."[2] > > > > > > > >> > I could not find any join within a HybridSource. So, your > > > might > > > > > mean > > > > > > > >> > something else the join example and it should be out of > the > > > > scope, > > > > > > if > > > > > > > I > > > > > > > >> am > > > > > > > >> > not mistaken. > > > > > > > >> > > > > > > > > >> > > About the (un)boundness of the input stream. I'm not > sure > > if > > > > > that > > > > > > > >> should > > > > > > > >> > > actually matter. Actually the same issue, with two > > frequent > > > > > > > >> checkpointing > > > > > > > >> > > during a catch up period or when Flink is overloaded, > > could > > > > > affect > > > > > > > >> jobs > > > > > > > >> > > that are purely unbounded, like continuously reading > from > > > > Kafka. > > > > > > > Even > > > > > > > >> > more, > > > > > > > >> > > nothing prevents users from actually storing bounded > data > > > in a > > > > > > Kafka > > > > > > > >> > topic. > > > > > > > >> > > Either way, I would like to refine my earlier idea, and > > > > instead > > > > > of > > > > > > > >> using > > > > > > > >> > > metrics like `pendingRecords`, I think we could switch > > > between > > > > > > fast > > > > > > > >> and > > > > > > > >> > > slow checkpointing intervals based on the information if > > the > > > > job > > > > > > is > > > > > > > >> > > backpressured or not. My thinking is as follows: > > > > > > > >> > > > > > > > > >> > This is again a very different use case as HybridSource. > > Users > > > > do > > > > > > > allow > > > > > > > >> > storing bounded data in a Kafka and if it is not used as > the > > > > last > > > > > > > >> source in > > > > > > > >> > a HybridSource, it is a bounded source and can still > benefit > > > > from > > > > > > > larger > > > > > > > >> > checkpoint interval wrt the high throughput (Kafka or any > > > other > > > > > > > storage > > > > > > > >> > does not matter). BTW, the larger checkpoint interval for > > > > bounded > > > > > > > >> source is > > > > > > > >> > optional, users can use it but must not use it, if they > > don't > > > > care > > > > > > > about > > > > > > > >> > the throughput with bounded data. > > > > > > > >> > > > > > > > > >> > Your proposal to dynamically adjust the checkpoint > intervals > > > is > > > > > > > >> elegant! It > > > > > > > >> > makes sense to build it as a generic feature in Flink. > > Looking > > > > > > forward > > > > > > > >> to > > > > > > > >> > it. However, for some user cases, e.g. when users were > aware > > > of > > > > > the > > > > > > > >> bounded > > > > > > > >> > sources (in the HybridSource) and care more about the > > > > throughput, > > > > > > the > > > > > > > >> > dynamic adjustment might not be required. Just let those > > > bounded > > > > > > > sources > > > > > > > >> > always have larger checkpoint intervals even when there is > > no > > > > back > > > > > > > >> > pressure. Because no one cares about latency in this case, > > > let's > > > > > > turn > > > > > > > >> off > > > > > > > >> > the dynamic adjustment, reduce the checkpoint frequency, > > have > > > > > better > > > > > > > >> > throughput, and save unnecessary source consumption. Did I > > > miss > > > > > > > anything > > > > > > > >> > here? > > > > > > > >> > > > > > > > > >> > Best regards, > > > > > > > >> > Jing > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > [1] > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/6b6df3db466d6a030d5a38ec786ac3297cb41c38/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java#L244 > > > > > > > >> > [2] > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/#hybrid-source > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > On Thu, May 25, 2023 at 3:03 PM Dong Lin < > > lindon...@gmail.com > > > > > > > > > > wrote: > > > > > > > >> > > > > > > > > >> > > Hi Piotr, > > > > > > > >> > > > > > > > > > >> > > Thanks for the discussion. Please see my comments > inline. > > > > > > > >> > > > > > > > > > >> > > On Thu, May 25, 2023 at 6:34 PM Piotr Nowojski < > > > > > > > pnowoj...@apache.org> > > > > > > > >> > > wrote: > > > > > > > >> > > > > > > > > > >> > > > Hi all, > > > > > > > >> > > > > > > > > > > >> > > > Thanks for the discussion. > > > > > > > >> > > > > > > > > > > >> > > > @Dong > > > > > > > >> > > > > > > > > > > >> > > > > In the target use-case, we would like to > HybridSource > > to > > > > > > > trigger> > > > > > > > >> > > > checkpoint more frequently when it is read the Kafka > > > Source > > > > > > (than > > > > > > > >> when > > > > > > > >> > it > > > > > > > >> > > > > is reading the HDFS source). We would need to set a > > flag > > > > for > > > > > > the > > > > > > > >> > > > checkpoint > > > > > > > >> > > > > trigger to know which source the HybridSource is > > reading > > > > > from. > > > > > > > >> > > > > > > > > > > >> > > > Is this really your actual goal? Should users care if > > some > > > > > table > > > > > > > >> > defined > > > > > > > >> > > in > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > My actual goal is to address the use-case described in > the > > > > > > > motivation > > > > > > > >> > > section. More specifically, > > > > > > > >> > > my goal is to provide API that uses can use to express > > their > > > > > > needed > > > > > > > >> > > checkpointing interval > > > > > > > >> > > at different phases of the job. So that Flink can > achieve > > > the > > > > > > > maximum > > > > > > > >> > > throughput while also meeting > > > > > > > >> > > users' need for data freshness and failover time. > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > a meta store is backed by HybridSource or not? I think > > the > > > > > > actual > > > > > > > >> goal > > > > > > > >> > is > > > > > > > >> > > > this: > > > > > > > >> > > > > > > > > > >> > > As a user I would like to have a self adjusting > mechanism > > > for > > > > > > > >> > checkpointing > > > > > > > >> > > > intervals, so that during the catch up phase my job > > > focuses > > > > on > > > > > > > >> > throughput > > > > > > > >> > > > to catch up ASAP, while during normal processing > > (without > > > a > > > > > > large > > > > > > > >> > > backlog) > > > > > > > >> > > > Flink is trying to minimize e2e latency. > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > Sure. It will be great to have a way to support this > > > > > > self-adjusting > > > > > > > >> > > mechanism. For now I am not able > > > > > > > >> > > to come up with a good way to support this. I am happy > to > > > > > discuss > > > > > > > the > > > > > > > >> > > pros/cons if you can provide > > > > > > > >> > > more detail (e.g. API design) regarding how to support > > this > > > > > > > approach. > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > >> > > > Am I right here? > > > > > > > >> > > > > > > > > > > >> > > > > there won't exist any "*conflicting* desired > > checkpoint > > > > > > trigger" > > > > > > > >> by > > > > > > > >> > > > definition > > > > > > > >> > > > > > > > > > > >> > > > Ok, arguably there won't be a conflict, but the > decision > > > to > > > > > pick > > > > > > > >> > minimum > > > > > > > >> > > > out of the upper bounds might be sub-optimal. > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > As of today, users need checkpoint in order to address > two > > > > > goals. > > > > > > > One > > > > > > > >> > goal > > > > > > > >> > > is to upper-bound > > > > > > > >> > > data staleness when there is sink with exactly-once > > > semantics > > > > > > (e.g. > > > > > > > >> > > Paimon), since those sinks > > > > > > > >> > > can only output data when checkpoint is triggered. The > > other > > > > > goal > > > > > > is > > > > > > > >> to > > > > > > > >> > > upper-bound the amount of > > > > > > > >> > > duplicate work needed after failover. > > > > > > > >> > > > > > > > > > >> > > In both cases, users need to upper-bound the > checkpointing > > > > > > interval. > > > > > > > >> This > > > > > > > >> > > makes it more intuitive > > > > > > > >> > > for the config to only express the checkpointing > interval > > > > > > > upper-bound. > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > >> > > > > Overall, I am not sure we always want to have a > longer > > > > > > > >> checkpointing > > > > > > > >> > > > > interval. That really depends on the specific > use-case > > > and > > > > > the > > > > > > > job > > > > > > > >> > > graph. > > > > > > > >> > > > > > > > > > > >> > > > Yes, that's why I proposed something a little bit more > > > > > generic. > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > I am not sure I fully understand the alternative > proposal > > > that > > > > > is > > > > > > > >> meant > > > > > > > >> > to > > > > > > > >> > > be more generic. So it is hard for me to evaluate the > > > > pros/cons. > > > > > > > >> > > > > > > > > > >> > > I understand that you preferred for the source operator > to > > > use > > > > > the > > > > > > > >> REST > > > > > > > >> > API > > > > > > > >> > > to trigger checkpoints. This sounds > > > > > > > >> > > like a downside since using REST API is not as easy as > > using > > > > the > > > > > > > >> > > programming API proposed in the FLIP. > > > > > > > >> > > > > > > > > > >> > > Can you help explain the generic approach more > concretely, > > > > such > > > > > as > > > > > > > the > > > > > > > >> > APIs > > > > > > > >> > > you would suggest introducing? That would > > > > > > > >> > > allow me to evaluate the pros/cons and hopefully pick > the > > > best > > > > > > > option. > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > >> > > > > I believe there can be use-case where > > > > > > > >> > > > > the proposed API is not useful, in which case users > > can > > > > > choose > > > > > > > >> not to > > > > > > > >> > > use > > > > > > > >> > > > > the API without incurring any performance > regression. > > > > > > > >> > > > > > > > > > > >> > > > I'm not saying that this proposal is not useful. Just > > that > > > > we > > > > > > > might > > > > > > > >> be > > > > > > > >> > > able > > > > > > > >> > > > to solve this problem in a more flexible manner. If we > > > > > > introduce a > > > > > > > >> > > > partially working solution now at the source level, > and > > > > later > > > > > we > > > > > > > >> will > > > > > > > >> > > still > > > > > > > >> > > > need a different solution on another level to cover > > other > > > > use > > > > > > > cases, > > > > > > > >> > that > > > > > > > >> > > > would clog the API and confuse users. > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > Can you explain why this is "partially working"? Is it > > > because > > > > > > there > > > > > > > >> are > > > > > > > >> > > use-cases that should > > > > > > > >> > > be addressed but not already covered by the proposed > > > approach? > > > > > > > >> > > > > > > > > > >> > > If so, can you help explain the use-case that would be > > > useful > > > > to > > > > > > > >> address? > > > > > > > >> > > With concrete > > > > > > > >> > > use-cases in mind, we can pick the API with minimal > change > > > to > > > > > > > address > > > > > > > >> > these > > > > > > > >> > > use-cases. > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > >> > > > @Jing > > > > > > > >> > > > > > > > > > > >> > > > > @Piotr > > > > > > > >> > > > > Just out of curiosity, do you know any real use > cases > > > > where > > > > > > > >> real-time > > > > > > > >> > > > data is processed before the backlog? > > > > > > > >> > > > > > > > > > > >> > > > For example join (windowed/temporal) of two tables > > backed > > > > by a > > > > > > > >> hybrid > > > > > > > >> > > > source? I could easily see a scenario where one table > > with > > > > > > little > > > > > > > >> data > > > > > > > >> > > > catches up much more quickly. > > > > > > > >> > > > > > > > > > > >> > > > @Jing and @Dong > > > > > > > >> > > > > > > > > > > >> > > > About the (un)boundness of the input stream. I'm not > > sure > > > if > > > > > > that > > > > > > > >> > should > > > > > > > >> > > > actually matter. Actually the same issue, with two > > > frequent > > > > > > > >> > checkpointing > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > Indeed, I agree with you on this point and prefer not to > > > have > > > > > this > > > > > > > >> > proposal > > > > > > > >> > > depend on the (un)boundness. > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > during a catch up period or when Flink is overloaded, > > > could > > > > > > affect > > > > > > > >> jobs > > > > > > > >> > > > that are purely unbounded, like continuously reading > > from > > > > > Kafka. > > > > > > > >> Even > > > > > > > >> > > more, > > > > > > > >> > > > nothing prevents users from actually storing bounded > > data > > > > in a > > > > > > > Kafka > > > > > > > >> > > topic. > > > > > > > >> > > > Either way, I would like to refine my earlier idea, > and > > > > > instead > > > > > > of > > > > > > > >> > using > > > > > > > >> > > > metrics like `pendingRecords`, I think we could switch > > > > between > > > > > > > fast > > > > > > > >> and > > > > > > > >> > > > slow checkpointing intervals based on the information > if > > > the > > > > > job > > > > > > > is > > > > > > > >> > > > backpressured or not. My thinking is as follows: > > > > > > > >> > > > > > > > > > > >> > > > As a user, I would like to have my regular fast > > > > checkpointing > > > > > > > >> interval > > > > > > > >> > > for > > > > > > > >> > > > low latency, but the moment my system is not keeping > up, > > > if > > > > > the > > > > > > > >> > > > backpressure builds up, or simply we have a huge > backlog > > > to > > > > > > > >> reprocess, > > > > > > > >> > > > latency doesn't matter anymore. Only throughput > matters. > > > So > > > > I > > > > > > > would > > > > > > > >> > like > > > > > > > >> > > > the checkpointing to slow down. > > > > > > > >> > > > > > > > > > > >> > > > I think this should cover pretty well most of the > cases, > > > > what > > > > > do > > > > > > > you > > > > > > > >> > > think? > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > Thank you for all the comments and this idea. I like > this > > > > idea. > > > > > We > > > > > > > >> > actually > > > > > > > >> > > thought about this idea before proposing this FLIP. > > > > > > > >> > > > > > > > > > >> > > In order to make this idea work, we need to come-up > with a > > > > good > > > > > > > >> algorithm > > > > > > > >> > > that can dynamically change the checkpointing interval > > based > > > > on > > > > > > the > > > > > > > >> > > "backlog signal", without causing regression w.r.t. > > failover > > > > > time > > > > > > > and > > > > > > > >> > data > > > > > > > >> > > freshness. I find it hard to come up with this algorithm > > due > > > > to > > > > > > > >> > > insufficient "backlog signal". > > > > > > > >> > > > > > > > > > >> > > For the use-case mentioned in the motivation section, > the > > > data > > > > > in > > > > > > > the > > > > > > > >> > > source does not have event timestamps to help determine > > the > > > > > amount > > > > > > > of > > > > > > > >> > > backlog. So the only source-of-truth for determining > > backlog > > > > is > > > > > > the > > > > > > > >> > amount > > > > > > > >> > > of data buffered in operators. But the buffer size is > > > > typically > > > > > > > >> chosen to > > > > > > > >> > > be proportional to round-trip-time and throughput. > Having > > a > > > > full > > > > > > > >> buffer > > > > > > > >> > > does not necessarily mean that the data is lagging > behind. > > > And > > > > > > > >> increasing > > > > > > > >> > > the checkpointing interval with insufficient "backlog > > > signal" > > > > > can > > > > > > > >> have a > > > > > > > >> > > negative impact on data freshness and failover time. > > > > > > > >> > > > > > > > > > >> > > In order to make this idea work, we would need to > > *provide* > > > > that > > > > > > the > > > > > > > >> > > algorithm would not negatively hurt data freshness and > > > > failover > > > > > > time > > > > > > > >> when > > > > > > > >> > > it decides to increase checkpointing intervals. For now > I > > > cold > > > > > not > > > > > > > >> come > > > > > > > >> > up > > > > > > > >> > > with such an algorithm. > > > > > > > >> > > > > > > > > > >> > > If this backpressured based behaviour is still not > > enough, I > > > > > would > > > > > > > >> still > > > > > > > >> > > > say that we should provide plugable checkpoint > > triggering > > > > > > > >> controllers > > > > > > > >> > > that > > > > > > > >> > > > would work based on metrics. > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > I am not sure how to address the use-case mentioned in > the > > > > > > > motivation > > > > > > > >> > > section, with the pluggable checkpoint trigger + > metrics. > > > Can > > > > > you > > > > > > > help > > > > > > > >> > > provide the definition of these APIs and kindly explain > > how > > > > that > > > > > > > >> works to > > > > > > > >> > > address the mentioned use-case. > > > > > > > >> > > > > > > > > > >> > > In the mentioned use-case, users want to have two > > different > > > > > > > >> checkpointing > > > > > > > >> > > intervals at different phases of the HybridSource. We > > should > > > > > > provide > > > > > > > >> an > > > > > > > >> > API > > > > > > > >> > > for users to express the extra checkpointing interval in > > > > > addition > > > > > > to > > > > > > > >> the > > > > > > > >> > > existing execution.checkpointing.interval. What would be > > the > > > > > > > >> definition > > > > > > > >> > of > > > > > > > >> > > that API with this alternative approach? > > > > > > > >> > > > > > > > > > >> > > Best, > > > > > > > >> > > Dong > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > >> > > > Best, > > > > > > > >> > > > Piotrek > > > > > > > >> > > > > > > > > > > >> > > > czw., 25 maj 2023 o 07:47 Dong Lin < > lindon...@gmail.com > > > > > > > > > > >> napisał(a): > > > > > > > >> > > > > > > > > > > >> > > > > Hi Jing, > > > > > > > >> > > > > > > > > > > > >> > > > > Thanks for your comments! > > > > > > > >> > > > > > > > > > > > >> > > > > Regarding the idea of using the existing > "boundedness" > > > > > > attribute > > > > > > > >> of > > > > > > > >> > > > > sources, that is indeed something that we might find > > > > > intuitive > > > > > > > >> > > > initially. I > > > > > > > >> > > > > have thought about this idea, but could not find a > > good > > > > way > > > > > to > > > > > > > >> make > > > > > > > >> > it > > > > > > > >> > > > > work. I will try to explain my thoughts and see if > we > > > can > > > > > > find a > > > > > > > >> > better > > > > > > > >> > > > > solution. > > > > > > > >> > > > > > > > > > > > >> > > > > Here is my understanding of the idea mentioned > above: > > > > > provide > > > > > > a > > > > > > > >> job > > > > > > > >> > > level > > > > > > > >> > > > > config execution.checkpoint.interval.bounded. Flink > > will > > > > use > > > > > > > this > > > > > > > >> as > > > > > > > >> > > the > > > > > > > >> > > > > checkpointing interval whenever there exists at > least > > > one > > > > > > > running > > > > > > > >> > > source > > > > > > > >> > > > > which claims it is under the "bounded" stage. > > > > > > > >> > > > > > > > > > > > >> > > > > Note that we can not simply re-use the existing > > > > > "boundedness" > > > > > > > >> > attribute > > > > > > > >> > > > of > > > > > > > >> > > > > source operators. The reason is that for sources > such > > as > > > > > MySQL > > > > > > > >> CDC, > > > > > > > >> > its > > > > > > > >> > > > > boundedness can be "continuous_unbounded" because it > > can > > > > run > > > > > > > >> > > > continuously. > > > > > > > >> > > > > But MySQL CDC has two phases internally, where the > > > source > > > > > > needs > > > > > > > to > > > > > > > >> > > first > > > > > > > >> > > > > read a snapshot (with bounded amount of data) and > then > > > > read > > > > > a > > > > > > > >> binlog > > > > > > > >> > > > (with > > > > > > > >> > > > > unbounded amount of data). > > > > > > > >> > > > > > > > > > > > >> > > > > As a result, in order to support optimization for > > souces > > > > > like > > > > > > > >> MySQL > > > > > > > >> > > CDC, > > > > > > > >> > > > we > > > > > > > >> > > > > need to expose an API for the source operator to > > declare > > > > > > whether > > > > > > > >> it > > > > > > > >> > is > > > > > > > >> > > > > running at a bounded or continuous_unbounded stage. > > > *This > > > > > > > >> introduces > > > > > > > >> > > the > > > > > > > >> > > > > need to define a new concept named "bounded stage".* > > > > > > > >> > > > > > > > > > > > >> > > > > Then, we will need to *introduce a new contract > > between > > > > > source > > > > > > > >> > > operators > > > > > > > >> > > > > and the Flink runtime*, saying that if there is a > > source > > > > > that > > > > > > > >> claims > > > > > > > >> > it > > > > > > > >> > > > is > > > > > > > >> > > > > running at the bounded stage, then Flink will use > the > > " > > > > > > > >> > > > > execution.checkpoint.interval.bounded" as the > > > > checkpointing > > > > > > > >> interval. > > > > > > > >> > > > > > > > > > > > >> > > > > Here are the the concerns I have with this approach: > > > > > > > >> > > > > > > > > > > > >> > > > > - The execution.checkpoint.interval.bounded is a > > > top-level > > > > > > > config, > > > > > > > >> > > > meaning > > > > > > > >> > > > > that every Flink user needs to read about its > > semantics. > > > > In > > > > > > > >> > comparison, > > > > > > > >> > > > the > > > > > > > >> > > > > proposed approach only requires users of specific > > > sources > > > > > > (e.g. > > > > > > > >> > > > > HybridSource, MySQL CDC) to know the new > > source-specific > > > > > > config. > > > > > > > >> > > > > > > > > > > > >> > > > > - It introduces a new top-level concept in Flink to > > > > describe > > > > > > the > > > > > > > >> > > internal > > > > > > > >> > > > > stages of specific sources (e.g. MySQL CDC). In > > > > comparison, > > > > > > the > > > > > > > >> > > proposed > > > > > > > >> > > > > approach only requires users of specific sources > (e.g. > > > > > > > >> HybridSource, > > > > > > > >> > > > MySQL > > > > > > > >> > > > > CDC) to know this concept, which not only makes the > > > > > > explanation > > > > > > > >> much > > > > > > > >> > > > > simpler (since they are already using the specific > > > > sources), > > > > > > but > > > > > > > >> also > > > > > > > >> > > > > limits the scope of this new concept (only these > users > > > > need > > > > > to > > > > > > > >> know > > > > > > > >> > > this > > > > > > > >> > > > > concept). > > > > > > > >> > > > > > > > > > > > >> > > > > - It is harder to understand the existing config > > > > > > > >> > > > > execution.checkpoint.interval. > > > > > > > >> > > > > Because we need to explain that it is only used when > > > there > > > > > is > > > > > > no > > > > > > > >> > source > > > > > > > >> > > > > with "bounded stage", introducing more if-else for > > this > > > > > > config. > > > > > > > In > > > > > > > >> > > > > comparison, with the proposed approach, the > semantics > > of > > > > > > > >> > > > > execution.checkpoint.interval is simpler without > > > if/else, > > > > as > > > > > > it > > > > > > > >> will > > > > > > > >> > > > always > > > > > > > >> > > > > be applied regardless which sources users are using. > > > > > > > >> > > > > > > > > > > > >> > > > > I am happy to discuss if there are better > approaches. > > > > > > > >> > > > > > > > > > > > >> > > > > Thanks, > > > > > > > >> > > > > Dong > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > On Wed, May 24, 2023 at 8:23 AM Jing Ge > > > > > > > >> <j...@ververica.com.invalid> > > > > > > > >> > > > > wrote: > > > > > > > >> > > > > > > > > > > > >> > > > > > Hi Yunfeng, Hi Dong > > > > > > > >> > > > > > > > > > > > > >> > > > > > Thanks for the informative discussion! It is a > > > rational > > > > > > > >> requirement > > > > > > > >> > > to > > > > > > > >> > > > > set > > > > > > > >> > > > > > different checkpoint intervals for different > sources > > > in > > > > a > > > > > > > >> > > hybridSource. > > > > > > > >> > > > > The > > > > > > > >> > > > > > tiny downside of this proposal, at least for me, > is > > > > that I > > > > > > > have > > > > > > > >> to > > > > > > > >> > > > > > understand the upper-bound definition of the > > interval > > > > and > > > > > > the > > > > > > > >> > > built-in > > > > > > > >> > > > > rule > > > > > > > >> > > > > > for Flink to choose the minimum value between it > and > > > the > > > > > > > default > > > > > > > >> > > > interval > > > > > > > >> > > > > > setting. However, afaiac, the intention of this > > > built-in > > > > > > rule > > > > > > > >> is to > > > > > > > >> > > > > > minimize changes in Flink to support the request > > > feature > > > > > > which > > > > > > > >> is a > > > > > > > >> > > > very > > > > > > > >> > > > > > thoughtful move. Thanks for taking care of it. +1 > > for > > > > the > > > > > > > >> Proposal. > > > > > > > >> > > > > > > > > > > > > >> > > > > > Another very rough idea was rising in my mind > while > > I > > > > was > > > > > > > >> reading > > > > > > > >> > the > > > > > > > >> > > > > FLIP. > > > > > > > >> > > > > > I didn't do a deep dive with related source code > > yet, > > > so > > > > > > > please > > > > > > > >> > > correct > > > > > > > >> > > > > me > > > > > > > >> > > > > > if I am wrong. The use case shows that two > different > > > > > > > checkpoint > > > > > > > >> > > > intervals > > > > > > > >> > > > > > should be set for bounded(historical) stream and > > > > > > > unbounded(fresh > > > > > > > >> > > > > real-time) > > > > > > > >> > > > > > stream sources. It is a trade-off between > throughput > > > and > > > > > > > >> latency, > > > > > > > >> > > i.e. > > > > > > > >> > > > > > bounded stream with large checkpoint interval for > > > better > > > > > > > >> throughput > > > > > > > >> > > and > > > > > > > >> > > > > > unbounded stream with small checkpoint interval > for > > > > lower > > > > > > > >> latency > > > > > > > >> > (in > > > > > > > >> > > > > case > > > > > > > >> > > > > > of failover). As we could see that the different > > > > interval > > > > > > > >> setting > > > > > > > >> > > > depends > > > > > > > >> > > > > > on the boundedness of streams. Since the Source > API > > > > > already > > > > > > > has > > > > > > > >> its > > > > > > > >> > > own > > > > > > > >> > > > > > boundedness flag[1], is it possible to define two > > > > interval > > > > > > > >> > > > configurations > > > > > > > >> > > > > > and let Flink automatically set the related one to > > the > > > > > > source > > > > > > > >> based > > > > > > > >> > > on > > > > > > > >> > > > > the > > > > > > > >> > > > > > known boundedness? The interval for bounded stream > > > could > > > > > be > > > > > > > like > > > > > > > >> > > > > > execution.checkpoint.interval.bounded(naming could > > be > > > > > > > >> > reconsidered), > > > > > > > >> > > > and > > > > > > > >> > > > > > the other one for unbounded stream, we could use > the > > > > > > existing > > > > > > > >> one > > > > > > > >> > > > > > execution.checkpoint.interval by default, or > > > introduce a > > > > > new > > > > > > > one > > > > > > > >> > like > > > > > > > >> > > > > > execution.checkpoint.interval.unbounded. In this > > way, > > > no > > > > > API > > > > > > > >> change > > > > > > > >> > > is > > > > > > > >> > > > > > required. > > > > > > > >> > > > > > > > > > > > > >> > > > > > @Piotr > > > > > > > >> > > > > > Just out of curiosity, do you know any real use > > cases > > > > > where > > > > > > > >> > real-time > > > > > > > >> > > > > data > > > > > > > >> > > > > > is processed before the backlog? Semantically, the > > > > backlog > > > > > > > >> contains > > > > > > > >> > > > > > historical data that has to be processed before > the > > > > > > real-time > > > > > > > >> data > > > > > > > >> > is > > > > > > > >> > > > > > allowed to be processed. Otherwise, up-to-date > data > > > will > > > > > be > > > > > > > >> > > overwritten > > > > > > > >> > > > > by > > > > > > > >> > > > > > out-of-date data which turns out to be unexpected > > > > results > > > > > in > > > > > > > >> real > > > > > > > >> > > > > business > > > > > > > >> > > > > > scenarios. > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > Best regards, > > > > > > > >> > > > > > Jing > > > > > > > >> > > > > > > > > > > > > >> > > > > > [1] > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/fadde2a378aac4293676944dd513291919a481e3/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java#L41 > > > > > > > >> > > > > > > > > > > > > >> > > > > > On Tue, May 23, 2023 at 5:53 PM Dong Lin < > > > > > > lindon...@gmail.com > > > > > > > > > > > > > > > >> > > wrote: > > > > > > > >> > > > > > > > > > > > > >> > > > > > > Hi Piotr, > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > Thanks for the comments. Let me try to > understand > > > your > > > > > > > >> concerns > > > > > > > >> > and > > > > > > > >> > > > > > > hopefully address the concerns. > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> What would happen if there are two (or more) > > > > operator > > > > > > > >> > > coordinators > > > > > > > >> > > > > > with > > > > > > > >> > > > > > > conflicting desired checkpoint trigger behaviour > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > With the proposed change, there won't exist any > > > > > > > "*conflicting* > > > > > > > >> > > > desired > > > > > > > >> > > > > > > checkpoint trigger" by definition. Both > job-level > > > > config > > > > > > and > > > > > > > >> the > > > > > > > >> > > > > proposed > > > > > > > >> > > > > > > API upperBoundCheckpointingInterval() means the > > > > > > upper-bound > > > > > > > of > > > > > > > >> > the > > > > > > > >> > > > > > > checkpointing interval. If there are different > > > > > > upper-bounds > > > > > > > >> > > proposed > > > > > > > >> > > > by > > > > > > > >> > > > > > > different source operators and the job-level > > config, > > > > > Flink > > > > > > > >> will > > > > > > > >> > try > > > > > > > >> > > > to > > > > > > > >> > > > > > > periodically trigger checkpoints at the interval > > > > > > > >> corresponding to > > > > > > > >> > > the > > > > > > > >> > > > > > > minimum of all these proposed upper-bounds. > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> If one source is processing a backlog and the > > > other > > > > > is > > > > > > > >> already > > > > > > > >> > > > > > > processing real time data.. > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > Overall, I am not sure we always want to have a > > > longer > > > > > > > >> > > checkpointing > > > > > > > >> > > > > > > interval. That really depends on the specific > > > use-case > > > > > and > > > > > > > the > > > > > > > >> > job > > > > > > > >> > > > > graph. > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > The proposed API change mechanism for operators > > and > > > > > users > > > > > > to > > > > > > > >> > > specify > > > > > > > >> > > > > > > different checkpoint intervals at different > > periods > > > of > > > > > the > > > > > > > >> job. > > > > > > > >> > > Users > > > > > > > >> > > > > > have > > > > > > > >> > > > > > > the option to use the new API to get better > > > > performance > > > > > in > > > > > > > the > > > > > > > >> > > > use-case > > > > > > > >> > > > > > > specified in the motivation section. I believe > > there > > > > can > > > > > > be > > > > > > > >> > > use-case > > > > > > > >> > > > > > where > > > > > > > >> > > > > > > the proposed API is not useful, in which case > > users > > > > can > > > > > > > choose > > > > > > > >> > not > > > > > > > >> > > to > > > > > > > >> > > > > use > > > > > > > >> > > > > > > the API without incurring any performance > > > regression. > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> it might be a bit confusing and not user > > friendly > > > > to > > > > > > have > > > > > > > >> > > multiple > > > > > > > >> > > > > > > places that can override the checkpointing > > behaviour > > > > in > > > > > a > > > > > > > >> > different > > > > > > > >> > > > way > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > Admittedly, adding more APIs always incur more > > > > > complexity. > > > > > > > But > > > > > > > >> > > > > sometimes > > > > > > > >> > > > > > we > > > > > > > >> > > > > > > have to incur this complexity to address new > > > > use-cases. > > > > > > > Maybe > > > > > > > >> we > > > > > > > >> > > can > > > > > > > >> > > > > see > > > > > > > >> > > > > > if > > > > > > > >> > > > > > > there are more user-friendly way to address this > > > > > use-case. > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> already implemented and is simple from the > > > > > perspective > > > > > > of > > > > > > > >> > Flink > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > Do you mean that the HybridSource operator > should > > > > invoke > > > > > > the > > > > > > > >> rest > > > > > > > >> > > API > > > > > > > >> > > > > to > > > > > > > >> > > > > > > trigger checkpoints? The downside of this > approach > > > is > > > > > that > > > > > > > it > > > > > > > >> > makes > > > > > > > >> > > > it > > > > > > > >> > > > > > hard > > > > > > > >> > > > > > > for developers of source operators (e.g. MySQL > > CDC, > > > > > > > >> HybridSource) > > > > > > > >> > > to > > > > > > > >> > > > > > > address the target use-case. AFAIK, there is no > > > > existing > > > > > > > case > > > > > > > >> > where > > > > > > > >> > > > we > > > > > > > >> > > > > > > require operator developers to use REST API to > do > > > > their > > > > > > job. > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > Can you help explain the benefit of using REST > API > > > > over > > > > > > > using > > > > > > > >> the > > > > > > > >> > > > > > proposed > > > > > > > >> > > > > > > API? > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > Note that this approach also seems to have the > > same > > > > > > downside > > > > > > > >> > > > mentioned > > > > > > > >> > > > > > > above: "multiple places that can override the > > > > > > checkpointing > > > > > > > >> > > > > behaviour". I > > > > > > > >> > > > > > > am not sure there can be a solution to address > the > > > > > target > > > > > > > >> > use-case > > > > > > > >> > > > > > without > > > > > > > >> > > > > > > having multiple places that can affect the > > > > checkpointing > > > > > > > >> > behavior. > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> check if `pendingRecords` for some source has > > > > > exceeded > > > > > > > the > > > > > > > >> > > > > configured > > > > > > > >> > > > > > > threshold and based on that adjust the > > checkpointing > > > > > > > interval > > > > > > > >> > > > > accordingly > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > I am not sure this approach can address the > target > > > > > > use-case > > > > > > > >> in a > > > > > > > >> > > > better > > > > > > > >> > > > > > > way. In the target use-case, we would like to > > > > > HybridSource > > > > > > > to > > > > > > > >> > > trigger > > > > > > > >> > > > > > > checkpoint more frequently when it is read the > > Kafka > > > > > > Source > > > > > > > >> (than > > > > > > > >> > > > when > > > > > > > >> > > > > it > > > > > > > >> > > > > > > is reading the HDFS source). We would need to > set > > a > > > > flag > > > > > > for > > > > > > > >> the > > > > > > > >> > > > > > checkpoint > > > > > > > >> > > > > > > trigger to know which source the HybridSource is > > > > reading > > > > > > > from. > > > > > > > >> > But > > > > > > > >> > > > IMO > > > > > > > >> > > > > > the > > > > > > > >> > > > > > > approach is less intuitive and more complex than > > > > having > > > > > > the > > > > > > > >> > > > > HybridSource > > > > > > > >> > > > > > > invoke upperBoundCheckpointingInterval() > directly > > > once > > > > > it > > > > > > is > > > > > > > >> > > reading > > > > > > > >> > > > > > Kafka > > > > > > > >> > > > > > > Source. > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > Maybe I did not understand the alternative > > approach > > > > > > rightly. > > > > > > > >> I am > > > > > > > >> > > > happy > > > > > > > >> > > > > > to > > > > > > > >> > > > > > > discuss more on this topic. WDYT? > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > Best, > > > > > > > >> > > > > > > Dong > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > On Tue, May 23, 2023 at 10:27 PM Piotr Nowojski > < > > > > > > > >> > > > pnowoj...@apache.org> > > > > > > > >> > > > > > > wrote: > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > Hi, > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > Thanks for the proposal. However, are you sure > > > that > > > > > the > > > > > > > >> > > > > > > > OperatorCoordinator is the right place to > place > > > such > > > > > > > logic? > > > > > > > >> > What > > > > > > > >> > > > > would > > > > > > > >> > > > > > > > happen if there are two (or more) operator > > > > > coordinators > > > > > > > with > > > > > > > >> > > > > > conflicting > > > > > > > >> > > > > > > > desired checkpoint trigger behaviour? If one > > > source > > > > is > > > > > > > >> > > processing a > > > > > > > >> > > > > > > backlog > > > > > > > >> > > > > > > > and the other is already processing real time > > > data, > > > > I > > > > > > > would > > > > > > > >> > > assume > > > > > > > >> > > > > that > > > > > > > >> > > > > > > in > > > > > > > >> > > > > > > > most use cases you would like to still have > the > > > > longer > > > > > > > >> > > > checkpointing > > > > > > > >> > > > > > > > interval, not the shorter one. Also apart from > > > that, > > > > > it > > > > > > > >> might > > > > > > > >> > be > > > > > > > >> > > a > > > > > > > >> > > > > bit > > > > > > > >> > > > > > > > confusing and not user friendly to have > multiple > > > > > places > > > > > > > that > > > > > > > >> > can > > > > > > > >> > > > > > override > > > > > > > >> > > > > > > > the checkpointing behaviour in a different > way. > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > FIY in the past, we had some discussions about > > > > similar > > > > > > > >> requests > > > > > > > >> > > and > > > > > > > >> > > > > > back > > > > > > > >> > > > > > > > then we chose to keep the system simpler, and > > > > exposed > > > > > a > > > > > > > more > > > > > > > >> > > > generic > > > > > > > >> > > > > > REST > > > > > > > >> > > > > > > > API checkpoint triggering mechanism. I know > that > > > > > having > > > > > > to > > > > > > > >> > > > implement > > > > > > > >> > > > > > such > > > > > > > >> > > > > > > > logic outside of Flink and having to call REST > > > calls > > > > > to > > > > > > > >> trigger > > > > > > > >> > > > > > > checkpoints > > > > > > > >> > > > > > > > might not be ideal, but that's already > > implemented > > > > and > > > > > > is > > > > > > > >> > simple > > > > > > > >> > > > from > > > > > > > >> > > > > > the > > > > > > > >> > > > > > > > perspective of Flink. > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > I don't know, maybe instead of adding this > logic > > > to > > > > > > > operator > > > > > > > >> > > > > > > coordinators, > > > > > > > >> > > > > > > > `CheckpointCoordinator` should have a > pluggable > > > > > > > >> > > > `CheckpointTrigger`, > > > > > > > >> > > > > > that > > > > > > > >> > > > > > > > the user could configure like a > > `MetricReporter`. > > > > The > > > > > > > >> default > > > > > > > >> > one > > > > > > > >> > > > > would > > > > > > > >> > > > > > > be > > > > > > > >> > > > > > > > just periodically triggering checkpoints. > Maybe > > > > > > > >> > > > > > > > `BacklogDynamicCheckpointTrigger` could look > at > > > > > > > metrics[1], > > > > > > > >> > check > > > > > > > >> > > > if > > > > > > > >> > > > > > > > `pendingRecords` for some source has exceeded > > the > > > > > > > configured > > > > > > > >> > > > > threshold > > > > > > > >> > > > > > > and > > > > > > > >> > > > > > > > based on that adjust the checkpointing > interval > > > > > > > accordingly? > > > > > > > >> > This > > > > > > > >> > > > > would > > > > > > > >> > > > > > > at > > > > > > > >> > > > > > > > least address some of my concerns. > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > WDYT? > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > Best, > > > > > > > >> > > > > > > > Piotrek > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > [1] > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > wt., 9 maj 2023 o 19:11 Yunfeng Zhou < > > > > > > > >> > > flink.zhouyunf...@gmail.com> > > > > > > > >> > > > > > > > napisał(a): > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > >> Hi all, > > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > > >> Dong(cc'ed) and I are opening this thread to > > > > discuss > > > > > > our > > > > > > > >> > > proposal > > > > > > > >> > > > to > > > > > > > >> > > > > > > >> support dynamically triggering checkpoints > from > > > > > > > operators, > > > > > > > >> > which > > > > > > > >> > > > has > > > > > > > >> > > > > > > >> been documented in FLIP-309 > > > > > > > >> > > > > > > >> < > > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255069517 > > > > > > > >> > > > > > > >> >. > > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > > >> With the help of the ability proposed in this > > > FLIP, > > > > > > users > > > > > > > >> > could > > > > > > > >> > > > > > > >> improve the performance of their Flink job in > > > cases > > > > > > like > > > > > > > >> when > > > > > > > >> > > the > > > > > > > >> > > > > job > > > > > > > >> > > > > > > >> needs to process both historical batch data > and > > > > > > real-time > > > > > > > >> > > > streaming > > > > > > > >> > > > > > > >> data, by adjusting the checkpoint triggerings > > in > > > > > > > different > > > > > > > >> > > phases > > > > > > > >> > > > > of a > > > > > > > >> > > > > > > >> HybridSource or CDC source. > > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > > >> This proposal would be a fundamental > component > > in > > > > the > > > > > > > >> effort > > > > > > > >> > to > > > > > > > >> > > > > > > >> further unify Flink's batch and stream > > processing > > > > > > > ability. > > > > > > > >> > > Please > > > > > > > >> > > > > feel > > > > > > > >> > > > > > > >> free to reply to this email thread and share > > with > > > > us > > > > > > your > > > > > > > >> > > > opinions. > > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > > >> Best regards. > > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > > >> Dong and Yunfeng > > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >