Hi Piotr, Thank you for providing further suggestions to help improve the API. Please see my comments inline.
On Fri, Jun 30, 2023 at 10:35 PM Piotr Nowojski <pnowoj...@apache.org> wrote: > Hey, > > Sorry for a late reply, I was OoO for a week. I have three things to point > out. > > 1. =============== > > The updated proposal is indeed better, but to be honest I still don't like > it, for mostly the same reasons that I have mentioned earlier: > - only a partial solution, that doesn't address all use cases, so we would > need to throw it away sooner or later > - I don't see and it hasn't been discussed how to make it work out of the > box for all sources > - somehow complicating API for people implementing Sources > - it should work out of the box for most of the sources, or at least to > have that potential in the future > The moments above seem kind of "abstract". I am hoping to understand more technical details behind these comments so that we can see how to address the concern. For example, even if a FLP does not address all use-case (which is arguably true for every FLIP), its solution does not necessarily need to be thrown away later as long as it is extensible. So we probably need to understand specifically why the proposed APIs would be thrown away. Similarly, we would need to understand if there is a better design to make the API simpler and work out of the box etc. in order to decide how to address these comments. > On top of that: > - the FLIP I think is missing how to hook up SplitEnumeratorContext and > CheckpointCoordinator to pass "isProcessingBacklog" > I think it can be passed via the following function chain: - CheckpointCoordinator invokes OperatorCoordinatorCheckpointContext#isProcessingBacklog (via coordinatorsToCheckpoint) to get this information. - OperatorCoordinatorHolder implements OperatorCoordinatorCheckpointContext#isProcessingBacklog and returns OperatorCoordinator#isProcessingBacklog (via coordinator) - SourceCoordinator implements OperatorCoordinator#isProcessingBacklog and returns SourceCoordinatorContext#isProcessingBacklog - SourceCoordinatorContext will implement SplitEnumeratorContext#setIsProcessingBacklog and stores the given information in a variable. Note that it involves only internal API. We might be able to find a simpler solution with less functions on the path. As long as the above solution works without having any performance or correctness, I think maybe we should focus on the public API design and discuss the implementation in the PR review? - the FLIP suggests to use the long checkpointing interval as long as any > subtask is processing the backlog. Are you sure that's the right call? What > if other > sources are producing fresh records, and those fresh records are reaching > sinks? It could happen either with disjoint JobGraph, embarrassing parallel > JobGraph (no keyBy/unions/joins), or even with keyBy. Fresh records can > slip using a not backpressured input channel through generally > backpressured > keyBy exchange. How should we handle that? This problem I think will > affect every solution, including my previously proposed generic one, but we > should > discuss how to handle that as well. > Good question. Here is my plan to improve the solution in a follow-up FLIP: - Let every subtask of every source operator emit RecordAttributes(isBacklog=..) - Let every subtask of every operator handle the RecordAttributes received from inputs and emit RecordAttributes to downstream operators. Flink runtime can derive this information for every one-input operator. For an operator with two inputs, if one input has isBacklog=true and the other has isBacklog=false, the operator should determine the isBacklog for its output records based on its semantics. - If there exists a subtask of a two-phase commit operator with isBacklog=false, the operator should let JM know so that the JM will use the short checkpoint interval (for data freshness). Otherwise, JM will use the long checkpoint interval. The above solution guarantees that, if every two-input operator has explicitly specified their isBacklog based on the inputs' isBacklog, then the JM will use the short checkpoint interval if and only if it is useful for at least one subtask of one two-phase commit operator. Note that even the above solution might not be perfect. Suppose there exists one subtask of the two-phase commit operator has isBacklog=false, but every other subtasks of this operator has isBacklog=true, due to load imbalance. In this case, it might be beneficial to use the long checkpoint interval to improve the average data freshness for this operator. However, as we get into more edge case, the solution will become more complicated (e.g. providing more APIs for user to specify their intended strategy) and there will be less additional benefits (because these scenarios are less common). Also, note that we can support the solution described above without throwing away any public API currently proposed in FLIP-309. More specifically, we still need execution.checkpointing.interval-during-backlog. Sources such as HybridSource and MySQL CDC source can still use setIsProcessingBacklog() to specify the backlog status. We just need to update setIsProcessingBacklog() to emit RecordAttributes(isBacklog=..) upon invocation. I hope the above solution is reasonable and can address most of your concerns. And I hope we can use FLIP-309 to solve a large chunk of the existing problems in Flink 1.18 release and make further improvements in followup FLIPs. What do you think? > 2. =============== > > Regarding the current proposal, there might be a way to make it actually > somehow generic (but not pluggable). But it might require slightly > different > interfaces. We could keep the idea that SourceCoordinator/SplitEnumerator > is responsible for switching between slow/fast processing modes. It could > be > implemented to achieve something like in the FLIP-309 proposal, but apart > of that, the default behaviour would be a built in mechanism working like > this: > 1. Every SourceReaderBase checks its metrics and its state, to decide if it > considers itself as "processingBacklog" or "veryBackpressured". The base > implementation could do it via a similar mechanism as I was proposing > previously, via looking at the busy/backPressuredTimeMsPerSecond, > pendingRecords and processing rate. > 2. SourceReaderBase could send an event with > "processingBacklog"/"veryBackpressured" state. > 3. SourceCoordinator would collect those events, and decide what should it > do, whether it should switch whole source to the > "processingBacklog"/"veryBackpressured" state or not. > That could provide eventually a generic solution that works fo every > source that reports the required metrics. Each source implementation could > decide > whether to use that default behaviour, or if maybe it's better to override > the default, or combine default with something custom (like HybridSource). > > And as a first step, we could implement that mechanism only on the > SourceCoordinator side, without events, without the default generic > solution and use > it in the HybridSource/MySQL CDC. > > This approach has some advantages compared to my previous proposal: > + no need to tinker with metrics and pushing metrics from TMs to JM > + somehow communicating this information via Events seems a bit cleaner > to me and avoids problems with freshness of the metrics > And some issues: > - I don't know if it can be made pluggable in the future. If a user could > implement a custom `CheckpointTrigger` that would automatically work with > all/most > of the pre-existing sources? > - I don't know if it can be expanded if needed in the future, to make > decisions based on operators in the middle of a jobgraph. > Thanks for the proposal. Overall, I agree it is valuable to be able to determine the isProcessingBacklog based on the source reader metrics. I will probably suggest making the following changes upon your idea: - Instead of letting the source reader send events to the source coordinator, the source reader can emit RecordAttributes(isBacklog=..) as described earlier. We will let two-phase commit operator to decide whether they need the short checkpoint interval. - We consider isProcessingBacklog=true when watermarkLag is larger than a threshold. This is a nice addition. But I think we still need extra information from user (e.g. the threshold whether the watermarkLag or backPressuredTimeMsPerSecond is too high) with extra public APIs for this feature to work reliably. This is because there is no default algorithm that works in all cases without extra specification from users, due to the issues around the default algorithm we discussed previously. Overall, I think the current proposal in FLIP-309 is a first step towards addressing these problems. The API for source enumerator to explicitly set isProcessingBacklog based on its status is useful even if we can support metrics-based solutions. If that looks reasonable, can we agree to make incremental improvement and work on the metrics-based solution in a followup FLIP? > > 3. =============== > > Independent of that, during some brainstorming between me, Chesnay and > Stefan Richter, an idea popped up, that I think could be a counter proposal > as > an intermediate solution that probably effectively works the same way as > current FLIP-309. > > Inside a HybridSource, from it's SplitEnumerator#snapshotState method, can > not you throw an exception like > `new CheckpointException(TOO_MANY_CHECKPOINT_REQUESTS)` or `new > CheckpointException(TRIGGER_CHECKPOINT_FAILURE)`? > Actually we could also introduce a dedicated `CheckpointFailureReason` for > that purpose and handle it some special way in some places (like maybe hide > such rejected checkpoints from the REST API/WebUI). We could elaborate on > this a bit more, but after a brief thinking I could see it actually > working well > enough without any public facing changes. But I might be wrong here. > > If this feature actually grabs traction, we could expand it to something > more sophisticated available via a public API in the future. > In the target use-case, user still want to do checkpoint (though at a larger interval) when there is backlog. And HybridSource need to know the expected checkpoint interval during backlog in order to determine whether it should keep throwing CheckpointException. Thus, we still need to add execution.checkpointing.interval-during-backlog for user to specify this information. It seems that the only benefit of this approach is to avoid adding SplitEnumeratorContext#setIsProcessingBacklog. The downside of this approach is that it is hard to enforce the semantics specified by execution.checkpointing.interval-during-backlog. For example, suppose execution.checkpointing.interval =3 minute and execution.checkpointing.interval-during-backlog = 7 minutes. During the backlog phase, checkpoint coordinator will still trigger the checkpoint once every 3 minutes. HybridSource will need to reject 2 out of the 3 checkpoint invocation, and the effective checkpoint interval will be 9 minutes. Overall, I think the solution is a bit hacky. I think it is preferred to throw exception only when there is indeed error. If we don't need to check a checkpoint, it is preferred to not trigger the checkpoint in the first place. And I think adding SplitEnumeratorContext#setIsProcessingBacklog is probably not that much of a big deal. Thanks for all the comments. I am looking forward to your thoughts. Best, Dong > > =============== > > Sorry for disturbing this FLIP discussion and voting. > > Best, > Piotrek > > czw., 29 cze 2023 o 05:08 feng xiangyu <xiangyu...@gmail.com> napisał(a): > > > Hi Dong, > > > > Thanks for your quick reply. I think this has truly solved our problem > and > > will enable us upgrade our existing jobs more seamless. > > > > Best, > > Xiangyu > > > > Dong Lin <lindon...@gmail.com> 于2023年6月29日周四 10:50写道: > > > > > Hi Feng, > > > > > > Thanks for the feedback. Yes, you can configure the > > > execution.checkpointing.interval-during-backlog to effectively disable > > > checkpoint during backlog. > > > > > > Prior to your comment, the FLIP allows users to do this by setting the > > > config value to something large (e.g. 365 day). After thinking about > this > > > more, we think it is more usable to allow users to achieve this goal by > > > setting the config value to 0. This is consistent with the existing > > > behavior of execution.checkpointing.interval -- the checkpoint is > > disabled > > > if user set execution.checkpointing.interval to 0. > > > > > > We have updated the description of > > > execution.checkpointing.interval-during-backlog > > > to say the following: > > > ... it is not null, the value must either be 0, which means the > > checkpoint > > > is disabled during backlog, or be larger than or equal to > > > execution.checkpointing.interval. > > > > > > Does this address your need? > > > > > > Best, > > > Dong > > > > > > > > > > > > On Thu, Jun 29, 2023 at 9:23 AM feng xiangyu <xiangyu...@gmail.com> > > wrote: > > > > > > > Hi Dong and Yunfeng, > > > > > > > > Thanks for the proposal, your flip sounds very useful from my > > > perspective. > > > > In our business, when we using hybrid source in production we also > met > > > the > > > > problem described in your flip. > > > > In our solution, we tend to skip making any checkpoints before all > > batch > > > > tasks have finished and resume the periodic checkpoint only in > > streaming > > > > phrase. Within this flip, we can solve our problem in a more generic > > way. > > > > > > > > However, I am wondering if we still want to skip making any > checkpoints > > > > during historical phrase, can we set this configuration > > > > "execution.checkpointing.interval-during-backlog" equals "-1" to > cover > > > this > > > > case? > > > > > > > > Best, > > > > Xiangyu > > > > > > > > Hang Ruan <ruanhang1...@gmail.com> 于2023年6月28日周三 16:30写道: > > > > > > > > > Thanks for Dong and Yunfeng's work. > > > > > > > > > > The FLIP looks good to me. This new version is clearer to > understand. > > > > > > > > > > Best, > > > > > Hang > > > > > > > > > > Dong Lin <lindon...@gmail.com> 于2023年6月27日周二 16:53写道: > > > > > > > > > > > Thanks Jack, Jingsong, and Zhu for the review! > > > > > > > > > > > > Thanks Zhu for the suggestion. I have updated the configuration > > name > > > as > > > > > > suggested. > > > > > > > > > > > > On Tue, Jun 27, 2023 at 4:45 PM Zhu Zhu <reed...@gmail.com> > wrote: > > > > > > > > > > > > > Thanks Dong and Yunfeng for creating this FLIP and driving this > > > > > > discussion. > > > > > > > > > > > > > > The new design looks generally good to me. Increasing the > > > checkpoint > > > > > > > interval when the job is processing backlogs is easier for > users > > to > > > > > > > understand and can help in more scenarios. > > > > > > > > > > > > > > I have one comment about the new configuration. > > > > > > > Naming the new configuration > > > > > > > "execution.checkpointing.interval-during-backlog" would be > better > > > > > > > according to Flink config naming convention. > > > > > > > It is also because that nested config keys should be avoided. > See > > > > > > > FLINK-29372 for more details. > > > > > > > > > > > > > > Thanks, > > > > > > > Zhu > > > > > > > > > > > > > > Jingsong Li <jingsongl...@gmail.com> 于2023年6月27日周二 15:45写道: > > > > > > > > > > > > > > > > Looks good to me! > > > > > > > > > > > > > > > > Thanks Dong, Yunfeng and all for your discussion and design. > > > > > > > > > > > > > > > > Best, > > > > > > > > Jingsong > > > > > > > > > > > > > > > > On Tue, Jun 27, 2023 at 3:35 PM Jark Wu <imj...@gmail.com> > > > wrote: > > > > > > > > > > > > > > > > > > Thank you Dong for driving this FLIP. > > > > > > > > > > > > > > > > > > The new design looks good to me! > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > Jark > > > > > > > > > > > > > > > > > > > 2023年6月27日 14:38,Dong Lin <lindon...@gmail.com> 写道: > > > > > > > > > > > > > > > > > > > > Thank you Leonard for the review! > > > > > > > > > > > > > > > > > > > > Hi Piotr, do you have any comments on the latest > proposal? > > > > > > > > > > > > > > > > > > > > I am wondering if it is OK to start the voting thread > this > > > > week. > > > > > > > > > > > > > > > > > > > > On Mon, Jun 26, 2023 at 4:10 PM Leonard Xu < > > > xbjt...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > >> Thanks Dong for driving this FLIP forward! > > > > > > > > > >> > > > > > > > > > >> Introducing `backlog status` concept for flink job > makes > > > > sense > > > > > to > > > > > > > me as > > > > > > > > > >> following reasons: > > > > > > > > > >> > > > > > > > > > >> From concept/API design perspective, it’s more general > and > > > > > natural > > > > > > > than > > > > > > > > > >> above proposals as it can be used in HybridSource for > > > bounded > > > > > > > records, CDC > > > > > > > > > >> Source for history snapshot and general sources like > > > > KafkaSource > > > > > > for > > > > > > > > > >> historical messages. > > > > > > > > > >> > > > > > > > > > >> From user cases/requirements, I’ve seen many users > > manually > > > to > > > > > set > > > > > > > larger > > > > > > > > > >> checkpoint interval during backfilling and then set a > > > shorter > > > > > > > checkpoint > > > > > > > > > >> interval for real-time processing in their production > > > > > environments > > > > > > > as a > > > > > > > > > >> flink application optimization. Now, the flink framework > > can > > > > > make > > > > > > > this > > > > > > > > > >> optimization no longer require the user to set the > > > checkpoint > > > > > > > interval and > > > > > > > > > >> restart the job multiple times. > > > > > > > > > >> > > > > > > > > > >> Following supporting using larger checkpoint for job > under > > > > > backlog > > > > > > > status > > > > > > > > > >> in current FLIP, we can explore supporting larger > > > > > > > parallelism/memory/cpu > > > > > > > > > >> for job under backlog status in the future. > > > > > > > > > >> > > > > > > > > > >> In short, the updated FLIP looks good to me. > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> Best, > > > > > > > > > >> Leonard > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >>> On Jun 22, 2023, at 12:07 PM, Dong Lin < > > > lindon...@gmail.com> > > > > > > > wrote: > > > > > > > > > >>> > > > > > > > > > >>> Hi Piotr, > > > > > > > > > >>> > > > > > > > > > >>> Thanks again for proposing the isProcessingBacklog > > concept. > > > > > > > > > >>> > > > > > > > > > >>> After discussing with Becket Qin and thinking about > this > > > > more, > > > > > I > > > > > > > agree it > > > > > > > > > >>> is a better idea to add a top-level concept to all > source > > > > > > > operators to > > > > > > > > > >>> address the target use-case. > > > > > > > > > >>> > > > > > > > > > >>> The main reason that changed my mind is that > > > > > isProcessingBacklog > > > > > > > can be > > > > > > > > > >>> described as an inherent/nature attribute of every > source > > > > > > instance > > > > > > > and > > > > > > > > > >> its > > > > > > > > > >>> semantics does not need to depend on any specific > > > > checkpointing > > > > > > > policy. > > > > > > > > > >>> Also, we can hardcode the isProcessingBacklog behavior > > for > > > > the > > > > > > > sources we > > > > > > > > > >>> have considered so far (e.g. HybridSource and MySQL CDC > > > > source) > > > > > > > without > > > > > > > > > >>> asking users to explicitly configure the per-source > > > behavior, > > > > > > which > > > > > > > > > >> indeed > > > > > > > > > >>> provides better user experience. > > > > > > > > > >>> > > > > > > > > > >>> I have updated the FLIP based on the latest > suggestions. > > > The > > > > > > > latest FLIP > > > > > > > > > >> no > > > > > > > > > >>> longer introduces per-source config that can be used by > > > > > > end-users. > > > > > > > While > > > > > > > > > >> I > > > > > > > > > >>> agree with you that CheckpointTrigger can be a useful > > > feature > > > > > to > > > > > > > address > > > > > > > > > >>> additional use-cases, I am not sure it is necessary for > > the > > > > > > > use-case > > > > > > > > > >>> targeted by FLIP-309. Maybe we can introduce > > > > CheckpointTrigger > > > > > > > separately > > > > > > > > > >>> in another FLIP? > > > > > > > > > >>> > > > > > > > > > >>> Can you help take another look at the updated FLIP? > > > > > > > > > >>> > > > > > > > > > >>> Best, > > > > > > > > > >>> Dong > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > > >>> On Fri, Jun 16, 2023 at 11:59 PM Piotr Nowojski < > > > > > > > pnowoj...@apache.org> > > > > > > > > > >>> wrote: > > > > > > > > > >>> > > > > > > > > > >>>> Hi Dong, > > > > > > > > > >>>> > > > > > > > > > >>>>> Suppose there are 1000 subtask and each subtask has > 1% > > > > chance > > > > > > of > > > > > > > being > > > > > > > > > >>>>> "backpressured" at a given time (due to random > traffic > > > > > spikes). > > > > > > > Then at > > > > > > > > > >>>> any > > > > > > > > > >>>>> given time, the chance of the job > > > > > > > > > >>>>> being considered not-backpressured = (1-0.01)^1000. > > Since > > > > we > > > > > > > evaluate > > > > > > > > > >> the > > > > > > > > > >>>>> backpressure metric once a second, the estimated time > > for > > > > the > > > > > > job > > > > > > > > > >>>>> to be considered not-backpressured is roughly 1 / > > > > > > > ((1-0.01)^1000) = > > > > > > > > > >> 23163 > > > > > > > > > >>>>> sec = 6.4 hours. > > > > > > > > > >>>>> > > > > > > > > > >>>>> This means that the job will effectively always use > the > > > > > longer > > > > > > > > > >>>>> checkpointing interval. It looks like a real concern, > > > > right? > > > > > > > > > >>>> > > > > > > > > > >>>> Sorry I don't understand where you are getting those > > > numbers > > > > > > from. > > > > > > > > > >>>> Instead of trying to find loophole after loophole, > could > > > you > > > > > try > > > > > > > to > > > > > > > > > >> think > > > > > > > > > >>>> how a given loophole could be improved/solved? > > > > > > > > > >>>> > > > > > > > > > >>>>> Hmm... I honestly think it will be useful to know the > > > APIs > > > > > due > > > > > > > to the > > > > > > > > > >>>>> following reasons. > > > > > > > > > >>>> > > > > > > > > > >>>> Please propose something. I don't think it's needed. > > > > > > > > > >>>> > > > > > > > > > >>>>> - For the use-case mentioned in FLIP-309 motivation > > > > section, > > > > > > > would the > > > > > > > > > >>>> APIs > > > > > > > > > >>>>> of this alternative approach be more or less usable? > > > > > > > > > >>>> > > > > > > > > > >>>> Everything that you originally wanted to achieve in > > > > FLIP-309, > > > > > > you > > > > > > > could > > > > > > > > > >> do > > > > > > > > > >>>> as well in my proposal. > > > > > > > > > >>>> Vide my many mentions of the "hacky solution". > > > > > > > > > >>>> > > > > > > > > > >>>>> - Can these APIs reliably address the extra use-case > > > (e.g. > > > > > > allow > > > > > > > > > >>>>> checkpointing interval to change dynamically even > > during > > > > the > > > > > > > unbounded > > > > > > > > > >>>>> phase) as it claims? > > > > > > > > > >>>> > > > > > > > > > >>>> I don't see why not. > > > > > > > > > >>>> > > > > > > > > > >>>>> - Can these APIs be decoupled from the APIs currently > > > > > proposed > > > > > > in > > > > > > > > > >>>> FLIP-309? > > > > > > > > > >>>> > > > > > > > > > >>>> Yes > > > > > > > > > >>>> > > > > > > > > > >>>>> For example, if the APIs of this alternative approach > > can > > > > be > > > > > > > decoupled > > > > > > > > > >>>> from > > > > > > > > > >>>>> the APIs currently proposed in FLIP-309, then it > might > > be > > > > > > > reasonable to > > > > > > > > > >>>>> work on this extra use-case with a more > > > > advanced/complicated > > > > > > > design > > > > > > > > > >>>>> separately in a followup work. > > > > > > > > > >>>> > > > > > > > > > >>>> As I voiced my concerns previously, the current design > > of > > > > > > > FLIP-309 would > > > > > > > > > >>>> clog the public API and in the long run confuse the > > users. > > > > IMO > > > > > > > It's > > > > > > > > > >>>> addressing the > > > > > > > > > >>>> problem in the wrong place. > > > > > > > > > >>>> > > > > > > > > > >>>>> Hmm.. do you mean we can do the following: > > > > > > > > > >>>>> - Have all source operators emit a metric named > > > > > > > "processingBacklog". > > > > > > > > > >>>>> - Add a job-level config that specifies "the > > > checkpointing > > > > > > > interval to > > > > > > > > > >> be > > > > > > > > > >>>>> used when any source is processing backlog". > > > > > > > > > >>>>> - The JM collects the "processingBacklog" > periodically > > > from > > > > > all > > > > > > > source > > > > > > > > > >>>>> operators and uses the newly added config value as > > > > > appropriate. > > > > > > > > > >>>> > > > > > > > > > >>>> Yes. > > > > > > > > > >>>> > > > > > > > > > >>>>> The challenge with this approach is that we need to > > > define > > > > > the > > > > > > > > > >> semantics > > > > > > > > > >>>> of > > > > > > > > > >>>>> this "processingBacklog" metric and have all source > > > > operators > > > > > > > > > >>>>> implement this metric. I am not sure we are able to > do > > > this > > > > > yet > > > > > > > without > > > > > > > > > >>>>> having users explicitly provide this information on a > > > > > > per-source > > > > > > > basis. > > > > > > > > > >>>>> > > > > > > > > > >>>>> Suppose the job read from a bounded Kafka source, > > should > > > it > > > > > > emit > > > > > > > > > >>>>> "processingBacklog=true"? If yes, then the job might > > use > > > > long > > > > > > > > > >>>> checkpointing > > > > > > > > > >>>>> interval even > > > > > > > > > >>>>> if the job is asked to process data starting from now > > to > > > > the > > > > > > > next 1 > > > > > > > > > >> hour. > > > > > > > > > >>>>> If no, then the job might use the short checkpointing > > > > > interval > > > > > > > > > >>>>> even if the job is asked to re-process data starting > > > from 7 > > > > > > days > > > > > > > ago. > > > > > > > > > >>>> > > > > > > > > > >>>> Yes. The same can be said of your proposal. Your > > proposal > > > > has > > > > > > the > > > > > > > very > > > > > > > > > >> same > > > > > > > > > >>>> issues > > > > > > > > > >>>> that every source would have to implement it > > differently, > > > > most > > > > > > > sources > > > > > > > > > >>>> would > > > > > > > > > >>>> have no idea how to properly calculate the new > requested > > > > > > > checkpoint > > > > > > > > > >>>> interval, > > > > > > > > > >>>> for those that do know how to do that, user would have > > to > > > > > > > configure > > > > > > > > > >> every > > > > > > > > > >>>> source > > > > > > > > > >>>> individually and yet again we would end up with a > > system, > > > > that > > > > > > > works > > > > > > > > > >> only > > > > > > > > > >>>> partially in > > > > > > > > > >>>> some special use cases (HybridSource), that's > confusing > > > the > > > > > > users > > > > > > > even > > > > > > > > > >>>> more. > > > > > > > > > >>>> > > > > > > > > > >>>> That's why I think the more generic solution, working > > > > > primarily > > > > > > > on the > > > > > > > > > >> same > > > > > > > > > >>>> metrics that are used by various auto scaling > solutions > > > > (like > > > > > > > Flink K8s > > > > > > > > > >>>> operator's > > > > > > > > > >>>> autosaler) would be better. The hacky solution I > > proposed > > > > to: > > > > > > > > > >>>> 1. show you that the generic solution is simply a > > superset > > > > of > > > > > > your > > > > > > > > > >> proposal > > > > > > > > > >>>> 2. if you are adamant that > > busyness/backpressured/records > > > > > > > processing > > > > > > > > > >>>> rate/pending records > > > > > > > > > >>>> metrics wouldn't cover your use case sufficiently > (imo > > > > they > > > > > > > can), > > > > > > > > > >> then > > > > > > > > > >>>> you can very easily > > > > > > > > > >>>> enhance this algorithm with using some hints from > the > > > > > sources. > > > > > > > Like > > > > > > > > > >>>> "processingBacklog==true" > > > > > > > > > >>>> to short circuit the main algorithm, if > > > > `processingBacklog` > > > > > is > > > > > > > > > >>>> available. > > > > > > > > > >>>> > > > > > > > > > >>>> Best, > > > > > > > > > >>>> Piotrek > > > > > > > > > >>>> > > > > > > > > > >>>> > > > > > > > > > >>>> pt., 16 cze 2023 o 04:45 Dong Lin < > lindon...@gmail.com> > > > > > > > napisał(a): > > > > > > > > > >>>> > > > > > > > > > >>>>> Hi again Piotr, > > > > > > > > > >>>>> > > > > > > > > > >>>>> Thank you for the reply. Please see my reply inline. > > > > > > > > > >>>>> > > > > > > > > > >>>>> On Fri, Jun 16, 2023 at 12:11 AM Piotr Nowojski < > > > > > > > > > >>>> piotr.nowoj...@gmail.com> > > > > > > > > > >>>>> wrote: > > > > > > > > > >>>>> > > > > > > > > > >>>>>> Hi again Dong, > > > > > > > > > >>>>>> > > > > > > > > > >>>>>>> I understand that JM will get the > > backpressure-related > > > > > > metrics > > > > > > > every > > > > > > > > > >>>>> time > > > > > > > > > >>>>>>> the RestServerEndpoint receives the REST request to > > get > > > > > these > > > > > > > > > >>>> metrics. > > > > > > > > > >>>>>> But > > > > > > > > > >>>>>>> I am not sure if RestServerEndpoint is already > always > > > > > > > receiving the > > > > > > > > > >>>>> REST > > > > > > > > > >>>>>>> metrics at regular interval (suppose there is no > > human > > > > > > manually > > > > > > > > > >>>>>>> opening/clicking the Flink Web UI). And if it does, > > > what > > > > is > > > > > > the > > > > > > > > > >>>>> interval? > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> Good catch, I've thought that metrics are > > pre-emptively > > > > sent > > > > > > to > > > > > > > JM > > > > > > > > > >>>> every > > > > > > > > > >>>>> 10 > > > > > > > > > >>>>>> seconds. > > > > > > > > > >>>>>> Indeed that's not the case at the moment, and that > > would > > > > > have > > > > > > > to be > > > > > > > > > >>>>>> improved. > > > > > > > > > >>>>>> > > > > > > > > > >>>>>>> I would be surprised if Flink is already paying > this > > > much > > > > > > > overhead > > > > > > > > > >>>> just > > > > > > > > > >>>>>> for > > > > > > > > > >>>>>>> metrics monitoring. That is the main reason I still > > > doubt > > > > > it > > > > > > > is true. > > > > > > > > > >>>>> Can > > > > > > > > > >>>>>>> you show where this 100 ms is currently configured? > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> Alternatively, maybe you mean that we should add > > extra > > > > code > > > > > > to > > > > > > > invoke > > > > > > > > > >>>>> the > > > > > > > > > >>>>>>> REST API at 100 ms interval. Then that means we > need > > to > > > > > > > considerably > > > > > > > > > >>>>>>> increase the network/cpu overhead at JM, where the > > > > overhead > > > > > > > will > > > > > > > > > >>>>> increase > > > > > > > > > >>>>>>> as the number of TM/slots increase, which may pose > > risk > > > > to > > > > > > the > > > > > > > > > >>>>>> scalability > > > > > > > > > >>>>>>> of the proposed design. I am not sure we should do > > > this. > > > > > What > > > > > > > do you > > > > > > > > > >>>>>> think? > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> Sorry. I didn't mean metric should be reported every > > > > 100ms. > > > > > I > > > > > > > meant > > > > > > > > > >>>> that > > > > > > > > > >>>>>> "backPressuredTimeMsPerSecond (metric) would report > (a > > > > value > > > > > > of) > > > > > > > > > >>>>> 100ms/s." > > > > > > > > > >>>>>> once per metric interval (10s?). > > > > > > > > > >>>>>> > > > > > > > > > >>>>> > > > > > > > > > >>>>> Suppose there are 1000 subtask and each subtask has > 1% > > > > chance > > > > > > of > > > > > > > being > > > > > > > > > >>>>> "backpressured" at a given time (due to random > traffic > > > > > spikes). > > > > > > > Then at > > > > > > > > > >>>> any > > > > > > > > > >>>>> given time, the chance of the job > > > > > > > > > >>>>> being considered not-backpressured = (1-0.01)^1000. > > Since > > > > we > > > > > > > evaluate > > > > > > > > > >> the > > > > > > > > > >>>>> backpressure metric once a second, the estimated time > > for > > > > the > > > > > > job > > > > > > > > > >>>>> to be considered not-backpressured is roughly 1 / > > > > > > > ((1-0.01)^1000) = > > > > > > > > > >> 23163 > > > > > > > > > >>>>> sec = 6.4 hours. > > > > > > > > > >>>>> > > > > > > > > > >>>>> This means that the job will effectively always use > the > > > > > longer > > > > > > > > > >>>>> checkpointing interval. It looks like a real concern, > > > > right? > > > > > > > > > >>>>> > > > > > > > > > >>>>> > > > > > > > > > >>>>>>> - What is the interface of this CheckpointTrigger? > > For > > > > > > > example, are > > > > > > > > > >>>> we > > > > > > > > > >>>>>>> going to give CheckpointTrigger a context that it > can > > > use > > > > > to > > > > > > > fetch > > > > > > > > > >>>>>>> arbitrary metric values? This can help us > understand > > > what > > > > > > > information > > > > > > > > > >>>>>> this > > > > > > > > > >>>>>>> user-defined CheckpointTrigger can use to make the > > > > > checkpoint > > > > > > > > > >>>> decision. > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> I honestly don't think this is important at this > stage > > > of > > > > > the > > > > > > > > > >>>> discussion. > > > > > > > > > >>>>>> It could have > > > > > > > > > >>>>>> whatever interface we would deem to be best. > Required > > > > > things: > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> - access to at least a subset of metrics that the > > given > > > > > > > > > >>>>> `CheckpointTrigger` > > > > > > > > > >>>>>> requests, > > > > > > > > > >>>>>> for example via some registration mechanism, so we > > don't > > > > > have > > > > > > to > > > > > > > > > >>>> fetch > > > > > > > > > >>>>>> all of the > > > > > > > > > >>>>>> metrics all the time from TMs. > > > > > > > > > >>>>>> - some way to influence `CheckpointCoordinator`. > > Either > > > > via > > > > > > > manually > > > > > > > > > >>>>>> triggering > > > > > > > > > >>>>>> checkpoints, and/or ability to change the > > checkpointing > > > > > > > interval. > > > > > > > > > >>>>>> > > > > > > > > > >>>>> > > > > > > > > > >>>>> Hmm... I honestly think it will be useful to know the > > > APIs > > > > > due > > > > > > > to the > > > > > > > > > >>>>> following reasons. > > > > > > > > > >>>>> > > > > > > > > > >>>>> We would need to know the concrete APIs to gauge the > > > > > following: > > > > > > > > > >>>>> - For the use-case mentioned in FLIP-309 motivation > > > > section, > > > > > > > would the > > > > > > > > > >>>> APIs > > > > > > > > > >>>>> of this alternative approach be more or less usable? > > > > > > > > > >>>>> - Can these APIs reliably address the extra use-case > > > (e.g. > > > > > > allow > > > > > > > > > >>>>> checkpointing interval to change dynamically even > > during > > > > the > > > > > > > unbounded > > > > > > > > > >>>>> phase) as it claims? > > > > > > > > > >>>>> - Can these APIs be decoupled from the APIs currently > > > > > proposed > > > > > > in > > > > > > > > > >>>> FLIP-309? > > > > > > > > > >>>>> > > > > > > > > > >>>>> For example, if the APIs of this alternative approach > > can > > > > be > > > > > > > decoupled > > > > > > > > > >>>> from > > > > > > > > > >>>>> the APIs currently proposed in FLIP-309, then it > might > > be > > > > > > > reasonable to > > > > > > > > > >>>>> work on this extra use-case with a more > > > > advanced/complicated > > > > > > > design > > > > > > > > > >>>>> separately in a followup work. > > > > > > > > > >>>>> > > > > > > > > > >>>>> > > > > > > > > > >>>>>>> - Where is this CheckpointTrigger running? For > > example, > > > > is > > > > > it > > > > > > > going > > > > > > > > > >>>> to > > > > > > > > > >>>>>> run > > > > > > > > > >>>>>>> on the subtask of every source operator? Or is it > > going > > > > to > > > > > > run > > > > > > > on the > > > > > > > > > >>>>> JM? > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> IMO on the JM. > > > > > > > > > >>>>>> > > > > > > > > > >>>>>>> - Are we going to provide a default implementation > of > > > > this > > > > > > > > > >>>>>>> CheckpointTrigger in Flink that implements the > > > algorithm > > > > > > > described > > > > > > > > > >>>>> below, > > > > > > > > > >>>>>>> or do we expect each source operator developer to > > > > implement > > > > > > > their own > > > > > > > > > >>>>>>> CheckpointTrigger? > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> As I mentioned before, I think we should provide at > > the > > > > very > > > > > > > least the > > > > > > > > > >>>>>> implementation > > > > > > > > > >>>>>> that replaces the current triggering mechanism > > > (statically > > > > > > > configured > > > > > > > > > >>>>>> checkpointing interval) > > > > > > > > > >>>>>> and it would be great to provide the backpressure > > > > monitoring > > > > > > > trigger > > > > > > > > > >> as > > > > > > > > > >>>>>> well. > > > > > > > > > >>>>>> > > > > > > > > > >>>>> > > > > > > > > > >>>>> I agree that if there is a good use-case that can be > > > > > addressed > > > > > > > by the > > > > > > > > > >>>>> proposed CheckpointTrigger, then it is reasonable > > > > > > > > > >>>>> to add CheckpointTrigger and replace the current > > > triggering > > > > > > > mechanism > > > > > > > > > >>>> with > > > > > > > > > >>>>> it. > > > > > > > > > >>>>> > > > > > > > > > >>>>> I also agree that we will likely find such a > use-case. > > > For > > > > > > > example, > > > > > > > > > >>>> suppose > > > > > > > > > >>>>> the source records have event timestamps, then it is > > > likely > > > > > > > > > >>>>> that we can use the trigger to dynamically control > the > > > > > > > checkpointing > > > > > > > > > >>>>> interval based on the difference between the > watermark > > > and > > > > > > > current > > > > > > > > > >> system > > > > > > > > > >>>>> time. > > > > > > > > > >>>>> > > > > > > > > > >>>>> But I am not sure the addition of this > > CheckpointTrigger > > > > > should > > > > > > > be > > > > > > > > > >>>> coupled > > > > > > > > > >>>>> with FLIP-309. Whether or not it is coupled probably > > > > depends > > > > > on > > > > > > > the > > > > > > > > > >>>>> concrete API design around CheckpointTrigger. > > > > > > > > > >>>>> > > > > > > > > > >>>>> If you would be adamant that the backpressure > > monitoring > > > > > > doesn't > > > > > > > cover > > > > > > > > > >>>> well > > > > > > > > > >>>>>> enough your use case, I would be ok to provide the > > hacky > > > > > > > version that > > > > > > > > > >> I > > > > > > > > > >>>>>> also mentioned > > > > > > > > > >>>>>> before: > > > > > > > > > >>>>> > > > > > > > > > >>>>> > > > > > > > > > >>>>>> """ > > > > > > > > > >>>>>> Especially that if my proposed algorithm wouldn't > work > > > > good > > > > > > > enough, > > > > > > > > > >>>> there > > > > > > > > > >>>>>> is > > > > > > > > > >>>>>> an obvious solution, that any source could add a > > metric, > > > > > like > > > > > > > let say > > > > > > > > > >>>>>> "processingBacklog: true/false", and the > > > > `CheckpointTrigger` > > > > > > > > > >>>>>> could use this as an override to always switch to > the > > > > > > > > > >>>>>> "slowCheckpointInterval". I don't think we need it, > > but > > > > > that's > > > > > > > always > > > > > > > > > >>>> an > > > > > > > > > >>>>>> option > > > > > > > > > >>>>>> that would be basically equivalent to your original > > > > > proposal. > > > > > > > > > >>>>>> """ > > > > > > > > > >>>>>> > > > > > > > > > >>>>> > > > > > > > > > >>>>> Hmm.. do you mean we can do the following: > > > > > > > > > >>>>> - Have all source operators emit a metric named > > > > > > > "processingBacklog". > > > > > > > > > >>>>> - Add a job-level config that specifies "the > > > checkpointing > > > > > > > interval to > > > > > > > > > >> be > > > > > > > > > >>>>> used when any source is processing backlog". > > > > > > > > > >>>>> - The JM collects the "processingBacklog" > periodically > > > from > > > > > all > > > > > > > source > > > > > > > > > >>>>> operators and uses the newly added config value as > > > > > appropriate. > > > > > > > > > >>>>> > > > > > > > > > >>>>> The challenge with this approach is that we need to > > > define > > > > > the > > > > > > > > > >> semantics > > > > > > > > > >>>> of > > > > > > > > > >>>>> this "processingBacklog" metric and have all source > > > > operators > > > > > > > > > >>>>> implement this metric. I am not sure we are able to > do > > > this > > > > > yet > > > > > > > without > > > > > > > > > >>>>> having users explicitly provide this information on a > > > > > > per-source > > > > > > > basis. > > > > > > > > > >>>>> > > > > > > > > > >>>>> Suppose the job read from a bounded Kafka source, > > should > > > it > > > > > > emit > > > > > > > > > >>>>> "processingBacklog=true"? If yes, then the job might > > use > > > > long > > > > > > > > > >>>> checkpointing > > > > > > > > > >>>>> interval even > > > > > > > > > >>>>> if the job is asked to process data starting from now > > to > > > > the > > > > > > > next 1 > > > > > > > > > >> hour. > > > > > > > > > >>>>> If no, then the job might use the short checkpointing > > > > > interval > > > > > > > > > >>>>> even if the job is asked to re-process data starting > > > from 7 > > > > > > days > > > > > > > ago. > > > > > > > > > >>>>> > > > > > > > > > >>>>> > > > > > > > > > >>>>>> > > > > > > > > > >>>>>>> - How can users specify the > > > > > > > > > >>>>>> fastCheckpointInterval/slowCheckpointInterval? > > > > > > > > > >>>>>>> For example, will we provide APIs on the > > > > CheckpointTrigger > > > > > > that > > > > > > > > > >>>>> end-users > > > > > > > > > >>>>>>> can use to specify the checkpointing interval? What > > > would > > > > > > that > > > > > > > look > > > > > > > > > >>>>> like? > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> Also as I mentioned before, just like metric > reporters > > > are > > > > > > > configured: > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> > > > > > > > > > >>>>> > > > > > > > > > >>>> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/metric_reporters/ > > > > > > > > > >>>>>> Every CheckpointTrigger could have its own custom > > > > > > configuration. > > > > > > > > > >>>>>> > > > > > > > > > >>>>>>> Overall, my gut feel is that the alternative > approach > > > > based > > > > > > on > > > > > > > > > >>>>>>> CheckpointTrigger is more complicated > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> Yes, as usual, more generic things are more > > complicated, > > > > but > > > > > > > often > > > > > > > > > >> more > > > > > > > > > >>>>>> useful in the long run. > > > > > > > > > >>>>>> > > > > > > > > > >>>>>>> and harder to use. > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> I don't agree. Why setting in config > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> execution.checkpointing.trigger: > > > > > > > > > >>>> BackPressureMonitoringCheckpointTrigger > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> > > > > > > > > > >>>>> > > > > > > > > > >>>> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > execution.checkpointing.BackPressureMonitoringCheckpointTrigger.fast-interval: > > > > > > > > > >>>>>> 1s > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> > > > > > > > > > >>>>> > > > > > > > > > >>>> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > execution.checkpointing.BackPressureMonitoringCheckpointTrigger.slow-interval: > > > > > > > > > >>>>>> 30s > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> that we could even provide a shortcut to the above > > > > construct > > > > > > > via: > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> execution.checkpointing.fast-interval: 1s > > > > > > > > > >>>>>> execution.checkpointing.slow-interval: 30s > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> is harder compared to setting two/three checkpoint > > > > > intervals, > > > > > > > one in > > > > > > > > > >>>> the > > > > > > > > > >>>>>> config/or via `env.enableCheckpointing(x)`, > > > > > > > > > >>>>>> secondly passing one/two (fast/slow) values on the > > > source > > > > > > > itself? > > > > > > > > > >>>>>> > > > > > > > > > >>>>> > > > > > > > > > >>>>> If we can address the use-case by providing just the > > two > > > > > > > job-level > > > > > > > > > >> config > > > > > > > > > >>>>> as described above, I agree it will indeed be > simpler. > > > > > > > > > >>>>> > > > > > > > > > >>>>> I have tried to achieve this goal. But the caveat is > > that > > > > it > > > > > > > requires > > > > > > > > > >>>> much > > > > > > > > > >>>>> more work than described above in order to give the > > > configs > > > > > > > > > >> well-defined > > > > > > > > > >>>>> semantics. So I find it simpler to just use the > > approach > > > in > > > > > > > FLIP-309. > > > > > > > > > >>>>> > > > > > > > > > >>>>> Let me explain my concern below. It will be great if > > you > > > or > > > > > > > someone > > > > > > > > > >> else > > > > > > > > > >>>>> can help provide a solution. > > > > > > > > > >>>>> > > > > > > > > > >>>>> 1) We need to clearly document when the fast-interval > > and > > > > > > > slow-interval > > > > > > > > > >>>>> will be used so that users can derive the expected > > > behavior > > > > > of > > > > > > > the job > > > > > > > > > >>>> and > > > > > > > > > >>>>> be able to config these values. > > > > > > > > > >>>>> > > > > > > > > > >>>>> 2) The trigger of fast/slow interval depends on the > > > > behavior > > > > > of > > > > > > > the > > > > > > > > > >>>> source > > > > > > > > > >>>>> (e.g. MySQL CDC, HybridSource). However, no existing > > > > concepts > > > > > > of > > > > > > > source > > > > > > > > > >>>>> operator (e.g. boundedness) can describe the target > > > > behavior. > > > > > > For > > > > > > > > > >>>> example, > > > > > > > > > >>>>> MySQL CDC internally has two phases, namely snapshot > > > phase > > > > > and > > > > > > > binlog > > > > > > > > > >>>>> phase, which are not explicitly exposed to its users > > via > > > > > source > > > > > > > > > >> operator > > > > > > > > > >>>>> API. And we probably should not enumerate all > internal > > > > phases > > > > > > of > > > > > > > all > > > > > > > > > >>>> source > > > > > > > > > >>>>> operators that are affected by fast/slow interval. > > > > > > > > > >>>>> > > > > > > > > > >>>>> 3) An alternative approach might be to define a new > > > concept > > > > > > (e.g. > > > > > > > > > >>>>> processingBacklog) that is applied to all source > > > operators. > > > > > > Then > > > > > > > the > > > > > > > > > >>>>> fast/slow interval's documentation can depend on this > > > > > concept. > > > > > > > That > > > > > > > > > >> means > > > > > > > > > >>>>> we have to add a top-level concept (similar to source > > > > > > > boundedness) and > > > > > > > > > >>>>> require all source operators to specify how they > > enforce > > > > this > > > > > > > concept > > > > > > > > > >>>> (e.g. > > > > > > > > > >>>>> FileSystemSource always emits > processingBacklog=true). > > > And > > > > > > there > > > > > > > might > > > > > > > > > >> be > > > > > > > > > >>>>> cases where the source itself (e.g. a bounded Kafka > > > Source) > > > > > can > > > > > > > not > > > > > > > > > >>>>> automatically derive the value of this concept, in > > which > > > > case > > > > > > we > > > > > > > need > > > > > > > > > >> to > > > > > > > > > >>>>> provide option for users to explicitly specify the > > value > > > > for > > > > > > this > > > > > > > > > >> concept > > > > > > > > > >>>>> on a per-source basis. > > > > > > > > > >>>>> > > > > > > > > > >>>>> > > > > > > > > > >>>>> > > > > > > > > > >>>>>>> And it probably also has the issues of "having two > > > places > > > > > to > > > > > > > > > >>>> configure > > > > > > > > > >>>>>> checkpointing > > > > > > > > > >>>>>>> interval" and "giving flexibility for every source > to > > > > > > > implement a > > > > > > > > > >>>>>> different > > > > > > > > > >>>>>>> API" (as mentioned below). > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> No, it doesn't. > > > > > > > > > >>>>>> > > > > > > > > > >>>>>>> IMO, it is a hard-requirement for the user-facing > API > > > to > > > > be > > > > > > > > > >>>>>>> clearly defined and users should be able to use the > > API > > > > > > without > > > > > > > > > >>>> concern > > > > > > > > > >>>>>> of > > > > > > > > > >>>>>>> regression. And this requirement is more important > > than > > > > the > > > > > > > other > > > > > > > > > >>>> goals > > > > > > > > > >>>>>>> discussed above because it is related to the > > > > > > > stability/performance of > > > > > > > > > >>>>> the > > > > > > > > > >>>>>>> production job. What do you think? > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> I don't agree with this. There are many things that > > work > > > > > > > something in > > > > > > > > > >>>>>> between perfectly and well enough > > > > > > > > > >>>>>> in some fraction of use cases (maybe in 99%, maybe > 95% > > > or > > > > > > maybe > > > > > > > 60%), > > > > > > > > > >>>>> while > > > > > > > > > >>>>>> still being very useful. > > > > > > > > > >>>>>> Good examples are: selection of state backend, > > unaligned > > > > > > > checkpoints, > > > > > > > > > >>>>>> buffer debloating but frankly if I go > > > > > > > > > >>>>>> through list of currently available config options, > > > > > something > > > > > > > like > > > > > > > > > >> half > > > > > > > > > >>>>> of > > > > > > > > > >>>>>> them can cause regressions. Heck, > > > > > > > > > >>>>>> even Flink itself doesn't work perfectly in 100% of > > the > > > > use > > > > > > > cases, due > > > > > > > > > >>>>> to a > > > > > > > > > >>>>>> variety of design choices. Of > > > > > > > > > >>>>>> course, the more use cases are fine with said > feature, > > > the > > > > > > > better, but > > > > > > > > > >>>> we > > > > > > > > > >>>>>> shouldn't fixate to perfectly cover > > > > > > > > > >>>>>> 100% of the cases, as that's impossible. > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> In this particular case, if back pressure monitoring > > > > > trigger > > > > > > > can work > > > > > > > > > >>>>> well > > > > > > > > > >>>>>> enough in 95% of cases, I would > > > > > > > > > >>>>>> say that's already better than the originally > proposed > > > > > > > alternative, > > > > > > > > > >>>> which > > > > > > > > > >>>>>> doesn't work at all if user has a large > > > > > > > > > >>>>>> backlog to reprocess from Kafka, including when > using > > > > > > > HybridSource > > > > > > > > > >>>> AFTER > > > > > > > > > >>>>>> the switch to Kafka has > > > > > > > > > >>>>>> happened. For the remaining 5%, we should try to > > improve > > > > the > > > > > > > behaviour > > > > > > > > > >>>>> over > > > > > > > > > >>>>>> time, but ultimately, users can > > > > > > > > > >>>>>> decide to just run a fixed checkpoint interval (or > at > > > > worst > > > > > > use > > > > > > > the > > > > > > > > > >>>> hacky > > > > > > > > > >>>>>> checkpoint trigger that I mentioned > > > > > > > > > >>>>>> before a couple of times). > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> Also to be pedantic, if a user naively selects > > > > slow-interval > > > > > > in > > > > > > > your > > > > > > > > > >>>>>> proposal to 30 minutes, when that user's > > > > > > > > > >>>>>> job fails on average every 15-20minutes, his job can > > end > > > > up > > > > > in > > > > > > > a state > > > > > > > > > >>>>> that > > > > > > > > > >>>>>> it can not make any progress, > > > > > > > > > >>>>>> this arguably is quite serious regression. > > > > > > > > > >>>>>> > > > > > > > > > >>>>> > > > > > > > > > >>>>> I probably should not say it is "hard requirement". > > After > > > > all > > > > > > > there are > > > > > > > > > >>>>> pros/cons. We will need to consider implementation > > > > > complexity, > > > > > > > > > >> usability, > > > > > > > > > >>>>> extensibility etc. > > > > > > > > > >>>>> > > > > > > > > > >>>>> I just don't think we should take it for granted to > > > > introduce > > > > > > > > > >> regression > > > > > > > > > >>>>> for one use-case in order to support another > use-case. > > If > > > > we > > > > > > can > > > > > > > not > > > > > > > > > >> find > > > > > > > > > >>>>> an algorithm/solution that addresses > > > > > > > > > >>>>> both use-case well, I hope we can be open to tackle > > them > > > > > > > separately so > > > > > > > > > >>>> that > > > > > > > > > >>>>> users can choose the option that best fits their > needs. > > > > > > > > > >>>>> > > > > > > > > > >>>>> All things else being equal, I think it is preferred > > for > > > > > > > user-facing > > > > > > > > > >> API > > > > > > > > > >>>> to > > > > > > > > > >>>>> be clearly defined and let users should be able to > use > > > the > > > > > API > > > > > > > without > > > > > > > > > >>>>> concern of regression. > > > > > > > > > >>>>> > > > > > > > > > >>>>> Maybe we can list pros/cons for the alternative > > > approaches > > > > we > > > > > > > have been > > > > > > > > > >>>>> discussing and see choose the best approach. And > maybe > > we > > > > > will > > > > > > > end up > > > > > > > > > >>>>> finding that use-case > > > > > > > > > >>>>> which needs CheckpointTrigger can be tackled > separately > > > > from > > > > > > the > > > > > > > > > >> use-case > > > > > > > > > >>>>> in FLIP-309. > > > > > > > > > >>>>> > > > > > > > > > >>>>> > > > > > > > > > >>>>>>> I am not sure if there is a typo. Because if > > > > > > > > > >>>>> backPressuredTimeMsPerSecond > > > > > > > > > >>>>>> = > > > > > > > > > >>>>>>> 0, then maxRecordsConsumedWithoutBackpressure = > > > > > > > > > >>>> numRecordsInPerSecond / > > > > > > > > > >>>>>>> 1000 * metricsUpdateInterval according to the above > > > > > > algorithm. > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> Do you mean "maxRecordsConsumedWithoutBackpressure > = > > > > > > > > > >>>>>> (numRecordsInPerSecond > > > > > > > > > >>>>>>> / (1 - backPressuredTimeMsPerSecond / 1000)) * > > > > > > > > > >>>> metricsUpdateInterval"? > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> It looks like there is indeed some mistake in my > > > proposal > > > > > > > above. Yours > > > > > > > > > >>>>> look > > > > > > > > > >>>>>> more correct, it probably > > > > > > > > > >>>>>> still needs some safeguard/special handling if > > > > > > > > > >>>>>> `backPressuredTimeMsPerSecond > 950` > > > > > > > > > >>>>>> > > > > > > > > > >>>>>>> The only information it can access is the backlog. > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> Again no. It can access whatever we want to provide > to > > > it. > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> Regarding the rest of your concerns. It's a matter > of > > > > > tweaking > > > > > > > the > > > > > > > > > >>>>>> parameters and the algorithm itself, > > > > > > > > > >>>>>> and how much safety-net do we want to have. > > Ultimately, > > > > I'm > > > > > > > pretty > > > > > > > > > >> sure > > > > > > > > > >>>>>> that's a (for 95-99% of cases) > > > > > > > > > >>>>>> solvable problem. If not, there is always the hacky > > > > > solution, > > > > > > > that > > > > > > > > > >>>> could > > > > > > > > > >>>>> be > > > > > > > > > >>>>>> even integrated into this above > > > > > > > > > >>>>>> mentioned algorithm as a short circuit to always > reach > > > > > > > > > >> `slow-interval`. > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> Apart of that, you picked 3 minutes as the > > checkpointing > > > > > > > interval in > > > > > > > > > >>>> your > > > > > > > > > >>>>>> counter example. In most cases > > > > > > > > > >>>>>> any interval above 1 minute would inflict pretty > > > > negligible > > > > > > > overheads, > > > > > > > > > >>>> so > > > > > > > > > >>>>>> all in all, I would doubt there is > > > > > > > > > >>>>>> a significant benefit (in most cases) of increasing > 3 > > > > minute > > > > > > > > > >> checkpoint > > > > > > > > > >>>>>> interval to anything more, let alone > > > > > > > > > >>>>>> 30 minutes. > > > > > > > > > >>>>>> > > > > > > > > > >>>>> > > > > > > > > > >>>>> I am not sure we should design the algorithm with the > > > > > > assumption > > > > > > > that > > > > > > > > > >> the > > > > > > > > > >>>>> short checkpointing interval will always be higher > > than 1 > > > > > > minute > > > > > > > etc. > > > > > > > > > >>>>> > > > > > > > > > >>>>> I agree the proposed algorithm can solve most cases > > where > > > > the > > > > > > > resource > > > > > > > > > >> is > > > > > > > > > >>>>> sufficient and there is always no backlog in source > > > > subtasks. > > > > > > On > > > > > > > the > > > > > > > > > >>>> other > > > > > > > > > >>>>> hand, what makes SRE > > > > > > > > > >>>>> life hard is probably the remaining 1-5% cases where > > the > > > > > > traffic > > > > > > > is > > > > > > > > > >> spiky > > > > > > > > > >>>>> and the cluster is reaching its capacity limit. The > > > ability > > > > > to > > > > > > > predict > > > > > > > > > >>>> and > > > > > > > > > >>>>> control Flink job's behavior (including checkpointing > > > > > interval) > > > > > > > can > > > > > > > > > >>>>> considerably reduce the burden of manging Flink jobs. > > > > > > > > > >>>>> > > > > > > > > > >>>>> Best, > > > > > > > > > >>>>> Dong > > > > > > > > > >>>>> > > > > > > > > > >>>>> > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> Best, > > > > > > > > > >>>>>> Piotrek > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> > > > > > > > > > >>>>>> sob., 3 cze 2023 o 05:44 Dong Lin < > > lindon...@gmail.com> > > > > > > > napisał(a): > > > > > > > > > >>>>>> > > > > > > > > > >>>>>>> Hi Piotr, > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> Thanks for the explanations. I have some followup > > > > questions > > > > > > > below. > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> On Fri, Jun 2, 2023 at 10:55 PM Piotr Nowojski < > > > > > > > pnowoj...@apache.org > > > > > > > > > >>>>> > > > > > > > > > >>>>>>> wrote: > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>>> Hi All, > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>> Thanks for chipping in the discussion Ahmed! > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>> Regarding using the REST API. Currently I'm > leaning > > > > > towards > > > > > > > > > >>>>>> implementing > > > > > > > > > >>>>>>>> this feature inside the Flink itself, via some > > > pluggable > > > > > > > interface. > > > > > > > > > >>>>>>>> REST API solution would be tempting, but I guess > not > > > > > > everyone > > > > > > > is > > > > > > > > > >>>>> using > > > > > > > > > >>>>>>>> Flink Kubernetes Operator. > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>> @Dong > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>>> I am not sure metrics such as isBackPressured are > > > > already > > > > > > > sent to > > > > > > > > > >>>>> JM. > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>> Fetching code path on the JM: > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>> > > > > > > > > > >>>>> > > > > > > > > > >>>> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl#queryTmMetricsFuture > > > > > > > > > >>>>>>>> > > > > > > > > > >>>> > > > > > > > > > > org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore#add > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>> Example code path accessing Task level metrics via > > JM > > > > > using > > > > > > > the > > > > > > > > > >>>>>>>> `MetricStore`: > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>> > > > > > > > > > >>>>> > > > > > > > > > >>>> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > org.apache.flink.runtime.rest.handler.job.metrics.AggregatingSubtasksMetricsHandler > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> Thanks for the code reference. I checked the code > > that > > > > > > invoked > > > > > > > these > > > > > > > > > >>>>> two > > > > > > > > > >>>>>>> classes and found the following information: > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> - AggregatingSubtasksMetricsHandler#getStoresis > > > currently > > > > > > > invoked > > > > > > > > > >>>> only > > > > > > > > > >>>>>>> when AggregatingJobsMetricsHandler is invoked. > > > > > > > > > >>>>>>> - AggregatingJobsMetricsHandler is only > instantiated > > > and > > > > > > > returned by > > > > > > > > > >>>>>>> WebMonitorEndpoint#initializeHandlers > > > > > > > > > >>>>>>> - WebMonitorEndpoint#initializeHandlers is only > used > > by > > > > > > > > > >>>>>> RestServerEndpoint. > > > > > > > > > >>>>>>> And RestServerEndpoint invokes these handlers in > > > response > > > > > to > > > > > > > external > > > > > > > > > >>>>>> REST > > > > > > > > > >>>>>>> request. > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> I understand that JM will get the > > backpressure-related > > > > > > metrics > > > > > > > every > > > > > > > > > >>>>> time > > > > > > > > > >>>>>>> the RestServerEndpoint receives the REST request to > > get > > > > > these > > > > > > > > > >>>> metrics. > > > > > > > > > >>>>>> But > > > > > > > > > >>>>>>> I am not sure if RestServerEndpoint is already > always > > > > > > > receiving the > > > > > > > > > >>>>> REST > > > > > > > > > >>>>>>> metrics at regular interval (suppose there is no > > human > > > > > > manually > > > > > > > > > >>>>>>> opening/clicking the Flink Web UI). And if it does, > > > what > > > > is > > > > > > the > > > > > > > > > >>>>> interval? > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>>>> For example, let's say every source operator > > subtask > > > > > > reports > > > > > > > this > > > > > > > > > >>>>>>> metric > > > > > > > > > >>>>>>>> to > > > > > > > > > >>>>>>>>> JM once every 10 seconds. There are 100 source > > > > subtasks. > > > > > > And > > > > > > > each > > > > > > > > > >>>>>>> subtask > > > > > > > > > >>>>>>>>> is backpressured roughly 10% of the total time > due > > to > > > > > > traffic > > > > > > > > > >>>>> spikes > > > > > > > > > >>>>>>> (and > > > > > > > > > >>>>>>>>> limited buffer). Then at any given time, there > are > > 1 > > > - > > > > > > > 0.9^100 = > > > > > > > > > >>>>>>> 99.997% > > > > > > > > > >>>>>>>>> chance that there is at least one subtask that is > > > > > > > backpressured. > > > > > > > > > >>>>> Then > > > > > > > > > >>>>>>> we > > > > > > > > > >>>>>>>>> have to wait for at least 10 seconds to check > > again. > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>> backPressuredTimeMsPerSecond and other related > > metrics > > > > > (like > > > > > > > > > >>>>>>>> busyTimeMsPerSecond) are not subject to that > > problem. > > > > > > > > > >>>>>>>> They are recalculated once every metric fetching > > > > interval, > > > > > > > and they > > > > > > > > > >>>>>>> report > > > > > > > > > >>>>>>>> accurately on average the given subtask spent > > > > > > > > > >>>>>> busy/idling/backpressured. > > > > > > > > > >>>>>>>> In your example, backPressuredTimeMsPerSecond > would > > > > report > > > > > > > 100ms/s. > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> Suppose every subtask is already reporting > > > > > > > > > >>>> backPressuredTimeMsPerSecond > > > > > > > > > >>>>>> to > > > > > > > > > >>>>>>> JM once every 100 ms. If a job has 10 operators > (that > > > are > > > > > not > > > > > > > > > >>>> chained) > > > > > > > > > >>>>>> and > > > > > > > > > >>>>>>> each operator has 100 subtasks, then JM would need > to > > > > > handle > > > > > > > 10000 > > > > > > > > > >>>>>> requests > > > > > > > > > >>>>>>> per second to receive metrics from these 1000 > > subtasks. > > > > It > > > > > > > seems > > > > > > > > > >>>> like a > > > > > > > > > >>>>>>> non-trivial overhead for medium-to-large sized jobs > > and > > > > can > > > > > > > make JM > > > > > > > > > >>>> the > > > > > > > > > >>>>>>> performance bottleneck during job execution. > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> I would be surprised if Flink is already paying > this > > > much > > > > > > > overhead > > > > > > > > > >>>> just > > > > > > > > > >>>>>> for > > > > > > > > > >>>>>>> metrics monitoring. That is the main reason I still > > > doubt > > > > > it > > > > > > > is true. > > > > > > > > > >>>>> Can > > > > > > > > > >>>>>>> you show where this 100 ms is currently configured? > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> Alternatively, maybe you mean that we should add > > extra > > > > code > > > > > > to > > > > > > > invoke > > > > > > > > > >>>>> the > > > > > > > > > >>>>>>> REST API at 100 ms interval. Then that means we > need > > to > > > > > > > considerably > > > > > > > > > >>>>>>> increase the network/cpu overhead at JM, where the > > > > overhead > > > > > > > will > > > > > > > > > >>>>> increase > > > > > > > > > >>>>>>> as the number of TM/slots increase, which may pose > > risk > > > > to > > > > > > the > > > > > > > > > >>>>>> scalability > > > > > > > > > >>>>>>> of the proposed design. I am not sure we should do > > > this. > > > > > What > > > > > > > do you > > > > > > > > > >>>>>> think? > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>>> While it will be nice to support additional > > use-cases > > > > > > > > > >>>>>>>>> with one proposal, it is probably also reasonable > > to > > > > make > > > > > > > > > >>>>> incremental > > > > > > > > > >>>>>>>>> progress and support the low-hanging-fruit > use-case > > > > > first. > > > > > > > The > > > > > > > > > >>>>> choice > > > > > > > > > >>>>>>>>> really depends on the complexity and the > importance > > > of > > > > > > > supporting > > > > > > > > > >>>>> the > > > > > > > > > >>>>>>>> extra > > > > > > > > > >>>>>>>>> use-cases. > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>> That would be true, if that was a private > > > implementation > > > > > > > detail or > > > > > > > > > >>>> if > > > > > > > > > >>>>>> the > > > > > > > > > >>>>>>>> low-hanging-fruit-solution would be on the direct > > path > > > > to > > > > > > the > > > > > > > final > > > > > > > > > >>>>>>>> solution. > > > > > > > > > >>>>>>>> That's unfortunately not the case here. This will > > add > > > > > public > > > > > > > facing > > > > > > > > > >>>>>> API, > > > > > > > > > >>>>>>>> that we will later need to maintain, no matter > what > > > the > > > > > > final > > > > > > > > > >>>>> solution > > > > > > > > > >>>>>>> will > > > > > > > > > >>>>>>>> be, > > > > > > > > > >>>>>>>> and at the moment at least I don't see it being > > > related > > > > > to a > > > > > > > > > >>>>> "perfect" > > > > > > > > > >>>>>>>> solution. > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> Sure. Then let's decide the final solution first. > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>>>> I guess the point is that the suggested approach, > > > which > > > > > > > > > >>>> dynamically > > > > > > > > > >>>>>>>>> determines the checkpointing interval based on > the > > > > > > > backpressure, > > > > > > > > > >>>>> may > > > > > > > > > >>>>>>>> cause > > > > > > > > > >>>>>>>>> regression when the checkpointing interval is > > > > relatively > > > > > > low. > > > > > > > > > >>>> This > > > > > > > > > >>>>>>> makes > > > > > > > > > >>>>>>>> it > > > > > > > > > >>>>>>>>> hard for users to enable this feature in > > production. > > > It > > > > > is > > > > > > > like > > > > > > > > > >>>> an > > > > > > > > > >>>>>>>>> auto-driving system that is not guaranteed to > work > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>> Yes, creating a more generic solution that would > > > require > > > > > > less > > > > > > > > > >>>>>>> configuration > > > > > > > > > >>>>>>>> is usually more difficult then static > > configurations. > > > > > > > > > >>>>>>>> It doesn't mean we shouldn't try to do that. > > > Especially > > > > > that > > > > > > > if my > > > > > > > > > >>>>>>> proposed > > > > > > > > > >>>>>>>> algorithm wouldn't work good enough, there is > > > > > > > > > >>>>>>>> an obvious solution, that any source could add a > > > metric, > > > > > > like > > > > > > > let > > > > > > > > > >>>> say > > > > > > > > > >>>>>>>> "processingBacklog: true/false", and the > > > > > `CheckpointTrigger` > > > > > > > > > >>>>>>>> could use this as an override to always switch to > > the > > > > > > > > > >>>>>>>> "slowCheckpointInterval". I don't think we need > it, > > > but > > > > > > that's > > > > > > > > > >>>> always > > > > > > > > > >>>>>> an > > > > > > > > > >>>>>>>> option > > > > > > > > > >>>>>>>> that would be basically equivalent to your > original > > > > > > proposal. > > > > > > > Or > > > > > > > > > >>>> even > > > > > > > > > >>>>>>>> source could add "suggestedCheckpointInterval : > > int", > > > > and > > > > > > > > > >>>>>>>> `CheckpointTrigger` could use that value if > present > > > as a > > > > > > hint > > > > > > > in > > > > > > > > > >>>> one > > > > > > > > > >>>>>> way > > > > > > > > > >>>>>>> or > > > > > > > > > >>>>>>>> another. > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> So far we have talked about the possibility of > using > > > > > > > > > >>>> CheckpointTrigger > > > > > > > > > >>>>>> and > > > > > > > > > >>>>>>> mentioned the CheckpointTrigger > > > > > > > > > >>>>>>> and read metric values. > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> Can you help answer the following questions so > that I > > > can > > > > > > > understand > > > > > > > > > >>>>> the > > > > > > > > > >>>>>>> alternative solution more concretely: > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> - What is the interface of this CheckpointTrigger? > > For > > > > > > > example, are > > > > > > > > > >>>> we > > > > > > > > > >>>>>>> going to give CheckpointTrigger a context that it > can > > > use > > > > > to > > > > > > > fetch > > > > > > > > > >>>>>>> arbitrary metric values? This can help us > understand > > > what > > > > > > > information > > > > > > > > > >>>>>> this > > > > > > > > > >>>>>>> user-defined CheckpointTrigger can use to make the > > > > > checkpoint > > > > > > > > > >>>> decision. > > > > > > > > > >>>>>>> - Where is this CheckpointTrigger running? For > > example, > > > > is > > > > > it > > > > > > > going > > > > > > > > > >>>> to > > > > > > > > > >>>>>> run > > > > > > > > > >>>>>>> on the subtask of every source operator? Or is it > > going > > > > to > > > > > > run > > > > > > > on the > > > > > > > > > >>>>> JM? > > > > > > > > > >>>>>>> - Are we going to provide a default implementation > of > > > > this > > > > > > > > > >>>>>>> CheckpointTrigger in Flink that implements the > > > algorithm > > > > > > > described > > > > > > > > > >>>>> below, > > > > > > > > > >>>>>>> or do we expect each source operator developer to > > > > implement > > > > > > > their own > > > > > > > > > >>>>>>> CheckpointTrigger? > > > > > > > > > >>>>>>> - How can users specify the > > > > > > > > > >>>>>> fastCheckpointInterval/slowCheckpointInterval? > > > > > > > > > >>>>>>> For example, will we provide APIs on the > > > > CheckpointTrigger > > > > > > that > > > > > > > > > >>>>> end-users > > > > > > > > > >>>>>>> can use to specify the checkpointing interval? What > > > would > > > > > > that > > > > > > > look > > > > > > > > > >>>>> like? > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> Overall, my gut feel is that the alternative > approach > > > > based > > > > > > on > > > > > > > > > >>>>>>> CheckpointTrigger is more complicated and harder to > > > use. > > > > > And > > > > > > it > > > > > > > > > >>>>> probably > > > > > > > > > >>>>>>> also has the issues of "having two places to > > configure > > > > > > > checkpointing > > > > > > > > > >>>>>>> interval" and "giving flexibility for every source > to > > > > > > > implement a > > > > > > > > > >>>>>> different > > > > > > > > > >>>>>>> API" (as mentioned below). > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> Maybe we can evaluate it more after knowing the > > answers > > > > to > > > > > > the > > > > > > > above > > > > > > > > > >>>>>>> questions. > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>>> On the other hand, the approach currently > proposed > > in > > > > the > > > > > > > FLIP is > > > > > > > > > >>>>>> much > > > > > > > > > >>>>>>>>> simpler as it does not depend on backpressure. > > Users > > > > > > specify > > > > > > > the > > > > > > > > > >>>>>> extra > > > > > > > > > >>>>>>>>> interval requirement on the specific sources > (e.g. > > > > > > > HybridSource, > > > > > > > > > >>>>>> MySQL > > > > > > > > > >>>>>>>> CDC > > > > > > > > > >>>>>>>>> Source) and can easily know the checkpointing > > > interval > > > > > will > > > > > > > be > > > > > > > > > >>>> used > > > > > > > > > >>>>>> on > > > > > > > > > >>>>>>>> the > > > > > > > > > >>>>>>>>> continuous phase of the corresponding source. > This > > is > > > > > > pretty > > > > > > > much > > > > > > > > > >>>>>> same > > > > > > > > > >>>>>>> as > > > > > > > > > >>>>>>>>> how users use the existing > > > > > execution.checkpointing.interval > > > > > > > > > >>>> config. > > > > > > > > > >>>>>> So > > > > > > > > > >>>>>>>>> there is no extra concern of regression caused by > > > this > > > > > > > approach. > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>> To an extent, but as I have already previously > > > > mentioned I > > > > > > > really > > > > > > > > > >>>>>> really > > > > > > > > > >>>>>>> do > > > > > > > > > >>>>>>>> not like idea of: > > > > > > > > > >>>>>>>> - having two places to configure checkpointing > > > interval > > > > > > > (config > > > > > > > > > >>>>> file > > > > > > > > > >>>>>>> and > > > > > > > > > >>>>>>>> in the Source builders) > > > > > > > > > >>>>>>>> - giving flexibility for every source to > implement a > > > > > > different > > > > > > > > > >>>> API > > > > > > > > > >>>>>> for > > > > > > > > > >>>>>>>> that purpose > > > > > > > > > >>>>>>>> - creating a solution that is not generic enough, > so > > > > that > > > > > we > > > > > > > will > > > > > > > > > >>>>>> need > > > > > > > > > >>>>>>> a > > > > > > > > > >>>>>>>> completely different mechanism in the future > anyway > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> Yeah, I understand different developers might have > > > > > different > > > > > > > > > >>>>>>> concerns/tastes for these APIs. Ultimately, there > > might > > > > not > > > > > > be > > > > > > > a > > > > > > > > > >>>>> perfect > > > > > > > > > >>>>>>> solution and we have to choose based on the > pros/cons > > > of > > > > > > these > > > > > > > > > >>>>> solutions. > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> I agree with you that, all things being equal, it > is > > > > > > > preferable to 1) > > > > > > > > > >>>>>> have > > > > > > > > > >>>>>>> one place to configure checkpointing intervals, 2) > > have > > > > all > > > > > > > source > > > > > > > > > >>>>>>> operators use the same API, and 3) create a > solution > > > that > > > > > is > > > > > > > generic > > > > > > > > > >>>>> and > > > > > > > > > >>>>>>> last lasting. Note that these three goals affects > the > > > > > > > usability and > > > > > > > > > >>>>>>> extensibility of the API, but not necessarily the > > > > > > > > > >>>> stability/performance > > > > > > > > > >>>>>> of > > > > > > > > > >>>>>>> the production job. > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> BTW, there are also other preferrable goals. For > > > example, > > > > > it > > > > > > > is very > > > > > > > > > >>>>>> useful > > > > > > > > > >>>>>>> for the job's behavior to be predictable and > > > > interpretable > > > > > so > > > > > > > that > > > > > > > > > >>>> SRE > > > > > > > > > >>>>>> can > > > > > > > > > >>>>>>> operator/debug the Flink in an easier way. We can > > list > > > > > these > > > > > > > > > >>>> pros/cons > > > > > > > > > >>>>>>> altogether later. > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> I am wondering if we can first agree on the > priority > > of > > > > > goals > > > > > > > we want > > > > > > > > > >>>>> to > > > > > > > > > >>>>>>> achieve. IMO, it is a hard-requirement for the > > > > user-facing > > > > > > API > > > > > > > to be > > > > > > > > > >>>>>>> clearly defined and users should be able to use the > > API > > > > > > without > > > > > > > > > >>>> concern > > > > > > > > > >>>>>> of > > > > > > > > > >>>>>>> regression. And this requirement is more important > > than > > > > the > > > > > > > other > > > > > > > > > >>>> goals > > > > > > > > > >>>>>>> discussed above because it is related to the > > > > > > > stability/performance of > > > > > > > > > >>>>> the > > > > > > > > > >>>>>>> production job. What do you think? > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>>> Sounds good. Looking forward to learning more > > ideas. > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>> I have thought about this a bit more, and I think > we > > > > don't > > > > > > > need to > > > > > > > > > >>>>>> check > > > > > > > > > >>>>>>>> for the backpressure status, or how much > overloaded > > > all > > > > of > > > > > > the > > > > > > > > > >>>>>> operators > > > > > > > > > >>>>>>>> are. > > > > > > > > > >>>>>>>> We could just check three things for source > > operators: > > > > > > > > > >>>>>>>> 1. pendingRecords (backlog length) > > > > > > > > > >>>>>>>> 2. numRecordsInPerSecond > > > > > > > > > >>>>>>>> 3. backPressuredTimeMsPerSecond > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>> // int metricsUpdateInterval = 10s // obtained > from > > > > config > > > > > > > > > >>>>>>>> // Next line calculates how many records can we > > > consume > > > > > from > > > > > > > the > > > > > > > > > >>>>>> backlog, > > > > > > > > > >>>>>>>> assuming > > > > > > > > > >>>>>>>> // that magically the reason behind a backpressure > > > > > vanishes. > > > > > > > We > > > > > > > > > >>>> will > > > > > > > > > >>>>>> use > > > > > > > > > >>>>>>>> this only as > > > > > > > > > >>>>>>>> // a safeguard against scenarios like for example > > if > > > > > > > backpressure > > > > > > > > > >>>>> was > > > > > > > > > >>>>>>>> caused by some > > > > > > > > > >>>>>>>> // intermittent failure/performance degradation. > > > > > > > > > >>>>>>>> maxRecordsConsumedWithoutBackpressure = > > > > > > > (numRecordsInPerSecond / > > > > > > > > > >>>>> (1000 > > > > > > > > > >>>>>>>> - backPressuredTimeMsPerSecond / 1000)) * > > > > > > > metricsUpdateInterval > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> I am not sure if there is a typo. Because if > > > > > > > > > >>>>>> backPressuredTimeMsPerSecond = > > > > > > > > > >>>>>>> 0, then maxRecordsConsumedWithoutBackpressure = > > > > > > > > > >>>> numRecordsInPerSecond / > > > > > > > > > >>>>>>> 1000 * metricsUpdateInterval according to the above > > > > > > algorithm. > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> Do you mean "maxRecordsConsumedWithoutBackpressure > = > > > > > > > > > >>>>>> (numRecordsInPerSecond > > > > > > > > > >>>>>>> / (1 - backPressuredTimeMsPerSecond / 1000)) * > > > > > > > > > >>>> metricsUpdateInterval"? > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>> // we are excluding > > > > maxRecordsConsumedWithoutBackpressure > > > > > > > from the > > > > > > > > > >>>>>>> backlog > > > > > > > > > >>>>>>>> as > > > > > > > > > >>>>>>>> // a safeguard against an intermittent back > pressure > > > > > > > problems, so > > > > > > > > > >>>>> that > > > > > > > > > >>>>>> we > > > > > > > > > >>>>>>>> don't > > > > > > > > > >>>>>>>> // calculate next checkpoint interval far far in > the > > > > > future, > > > > > > > while > > > > > > > > > >>>>> the > > > > > > > > > >>>>>>>> backpressure > > > > > > > > > >>>>>>>> // goes away before we will recalculate metrics > and > > > new > > > > > > > > > >>>> checkpointing > > > > > > > > > >>>>>>>> interval > > > > > > > > > >>>>>>>> timeToConsumeBacklog = (pendingRecords - > > > > > > > > > >>>>>>>> maxRecordsConsumedWithoutBackpressure) / > > > > > > numRecordsInPerSecond > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>> Then we can use those numbers to calculate desired > > > > > > > checkpointed > > > > > > > > > >>>>>> interval > > > > > > > > > >>>>>>>> for example like this: > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>> long calculatedCheckpointInterval = > > > > timeToConsumeBacklog / > > > > > > 10; > > > > > > > > > >>>> //this > > > > > > > > > >>>>>> may > > > > > > > > > >>>>>>>> need some refining > > > > > > > > > >>>>>>>> long nextCheckpointInterval = > > > > > > min(max(fastCheckpointInterval, > > > > > > > > > >>>>>>>> calculatedCheckpointInterval), > > > slowCheckpointInterval); > > > > > > > > > >>>>>>>> long nextCheckpointTs = lastCheckpointTs + > > > > > > > nextCheckpointInterval; > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>> WDYT? > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> I think the idea of the above algorithm is to > incline > > > to > > > > > use > > > > > > > the > > > > > > > > > >>>>>>> fastCheckpointInterval unless we are very sure the > > > > backlog > > > > > > > will take > > > > > > > > > >>>> a > > > > > > > > > >>>>>> long > > > > > > > > > >>>>>>> time to process. This can alleviate the concern of > > > > > regression > > > > > > > during > > > > > > > > > >>>>> the > > > > > > > > > >>>>>>> continuous_bounded phase since we are more likely > to > > > use > > > > > the > > > > > > > > > >>>>>>> fastCheckpointInterval. However, it can cause > > > regression > > > > > > > during the > > > > > > > > > >>>>>> bounded > > > > > > > > > >>>>>>> phase. > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> I will use a concrete example to explain the risk > of > > > > > > > regression: > > > > > > > > > >>>>>>> - The user is using HybridSource to read from HDFS > > > > followed > > > > > > by > > > > > > > Kafka. > > > > > > > > > >>>>> The > > > > > > > > > >>>>>>> data in HDFS is old and there is no need for data > > > > freshness > > > > > > > for the > > > > > > > > > >>>>> data > > > > > > > > > >>>>>> in > > > > > > > > > >>>>>>> HDFS. > > > > > > > > > >>>>>>> - The user configures the job as below: > > > > > > > > > >>>>>>> - fastCheckpointInterval = 3 minutes > > > > > > > > > >>>>>>> - slowCheckpointInterval = 30 minutes > > > > > > > > > >>>>>>> - metricsUpdateInterval = 100 ms > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> Using the above formulate, we can know that once > > > > > > pendingRecords > > > > > > > > > >>>>>>> <= numRecordsInPerSecond * 30-minutes, then > > > > > > > > > >>>>> calculatedCheckpointInterval > > > > > > > > > >>>>>> <= > > > > > > > > > >>>>>>> 3 minutes, meaning that we will use > > > > slowCheckpointInterval > > > > > as > > > > > > > the > > > > > > > > > >>>>>>> checkpointing interval. Then in the last 30 minutes > > of > > > > the > > > > > > > bounded > > > > > > > > > >>>>> phase, > > > > > > > > > >>>>>>> the checkpointing frequency will be 10X higher than > > > what > > > > > the > > > > > > > user > > > > > > > > > >>>>> wants. > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> Also note that the same issue would also > considerably > > > > limit > > > > > > the > > > > > > > > > >>>>> benefits > > > > > > > > > >>>>>> of > > > > > > > > > >>>>>>> the algorithm. For example, during the continuous > > > phase, > > > > > the > > > > > > > > > >>>> algorithm > > > > > > > > > >>>>>> will > > > > > > > > > >>>>>>> only be better than the approach in FLIP-309 when > > there > > > > is > > > > > at > > > > > > > least > > > > > > > > > >>>>>>> 30-minutes worth of backlog in the source. > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> Sure, having a slower checkpointing interval in > this > > > > > extreme > > > > > > > case > > > > > > > > > >>>>> (where > > > > > > > > > >>>>>>> there is 30-minutes backlog in the > > continous-unbounded > > > > > phase) > > > > > > > is > > > > > > > > > >>>> still > > > > > > > > > >>>>>>> useful when this happens. But since this is the > > > un-common > > > > > > > case, and > > > > > > > > > >>>> the > > > > > > > > > >>>>>>> right solution is probably to do capacity planning > to > > > > avoid > > > > > > > this from > > > > > > > > > >>>>>>> happening in the first place, I am not sure it is > > worth > > > > > > > optimizing > > > > > > > > > >>>> for > > > > > > > > > >>>>>> this > > > > > > > > > >>>>>>> case at the cost of regression in the bounded phase > > and > > > > the > > > > > > > reduced > > > > > > > > > >>>>>>> operational predictability for users (e.g. what > > > > > checkpointing > > > > > > > > > >>>> interval > > > > > > > > > >>>>>>> should I expect at this stage of the job). > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> I think the fundamental issue with this algorithm > is > > > that > > > > > it > > > > > > is > > > > > > > > > >>>> applied > > > > > > > > > >>>>>> to > > > > > > > > > >>>>>>> both the bounded phases and the continous_unbounded > > > > phases > > > > > > > without > > > > > > > > > >>>>>> knowing > > > > > > > > > >>>>>>> which phase the job is running at. The only > > information > > > > it > > > > > > can > > > > > > > access > > > > > > > > > >>>>> is > > > > > > > > > >>>>>>> the backlog. But two sources with the same amount > of > > > > > backlog > > > > > > > do not > > > > > > > > > >>>>>>> necessarily mean they have the same data freshness > > > > > > requirement. > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> In this particular example, users know that the > data > > in > > > > > HDFS > > > > > > > is very > > > > > > > > > >>>>> old > > > > > > > > > >>>>>>> and there is no need for data freshness. Users can > > > > express > > > > > > > signals > > > > > > > > > >>>> via > > > > > > > > > >>>>>> the > > > > > > > > > >>>>>>> per-source API proposed in the FLIP. This is why > the > > > > > current > > > > > > > approach > > > > > > > > > >>>>> in > > > > > > > > > >>>>>>> FLIP-309 can be better in this case. > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> What do you think? > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> Best, > > > > > > > > > >>>>>>> Dong > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>> Best, > > > > > > > > > >>>>>>>> Piotrek > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>>> > > > > > > > > > >>>>>>> > > > > > > > > > >>>>>> > > > > > > > > > >>>>> > > > > > > > > > >>>> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >