Hi Dong, Thanks for your quick reply. I think this has truly solved our problem and will enable us upgrade our existing jobs more seamless.
Best, Xiangyu Dong Lin <lindon...@gmail.com> 于2023年6月29日周四 10:50写道: > Hi Feng, > > Thanks for the feedback. Yes, you can configure the > execution.checkpointing.interval-during-backlog to effectively disable > checkpoint during backlog. > > Prior to your comment, the FLIP allows users to do this by setting the > config value to something large (e.g. 365 day). After thinking about this > more, we think it is more usable to allow users to achieve this goal by > setting the config value to 0. This is consistent with the existing > behavior of execution.checkpointing.interval -- the checkpoint is disabled > if user set execution.checkpointing.interval to 0. > > We have updated the description of > execution.checkpointing.interval-during-backlog > to say the following: > ... it is not null, the value must either be 0, which means the checkpoint > is disabled during backlog, or be larger than or equal to > execution.checkpointing.interval. > > Does this address your need? > > Best, > Dong > > > > On Thu, Jun 29, 2023 at 9:23 AM feng xiangyu <xiangyu...@gmail.com> wrote: > > > Hi Dong and Yunfeng, > > > > Thanks for the proposal, your flip sounds very useful from my > perspective. > > In our business, when we using hybrid source in production we also met > the > > problem described in your flip. > > In our solution, we tend to skip making any checkpoints before all batch > > tasks have finished and resume the periodic checkpoint only in streaming > > phrase. Within this flip, we can solve our problem in a more generic way. > > > > However, I am wondering if we still want to skip making any checkpoints > > during historical phrase, can we set this configuration > > "execution.checkpointing.interval-during-backlog" equals "-1" to cover > this > > case? > > > > Best, > > Xiangyu > > > > Hang Ruan <ruanhang1...@gmail.com> 于2023年6月28日周三 16:30写道: > > > > > Thanks for Dong and Yunfeng's work. > > > > > > The FLIP looks good to me. This new version is clearer to understand. > > > > > > Best, > > > Hang > > > > > > Dong Lin <lindon...@gmail.com> 于2023年6月27日周二 16:53写道: > > > > > > > Thanks Jack, Jingsong, and Zhu for the review! > > > > > > > > Thanks Zhu for the suggestion. I have updated the configuration name > as > > > > suggested. > > > > > > > > On Tue, Jun 27, 2023 at 4:45 PM Zhu Zhu <reed...@gmail.com> wrote: > > > > > > > > > Thanks Dong and Yunfeng for creating this FLIP and driving this > > > > discussion. > > > > > > > > > > The new design looks generally good to me. Increasing the > checkpoint > > > > > interval when the job is processing backlogs is easier for users to > > > > > understand and can help in more scenarios. > > > > > > > > > > I have one comment about the new configuration. > > > > > Naming the new configuration > > > > > "execution.checkpointing.interval-during-backlog" would be better > > > > > according to Flink config naming convention. > > > > > It is also because that nested config keys should be avoided. See > > > > > FLINK-29372 for more details. > > > > > > > > > > Thanks, > > > > > Zhu > > > > > > > > > > Jingsong Li <jingsongl...@gmail.com> 于2023年6月27日周二 15:45写道: > > > > > > > > > > > > Looks good to me! > > > > > > > > > > > > Thanks Dong, Yunfeng and all for your discussion and design. > > > > > > > > > > > > Best, > > > > > > Jingsong > > > > > > > > > > > > On Tue, Jun 27, 2023 at 3:35 PM Jark Wu <imj...@gmail.com> > wrote: > > > > > > > > > > > > > > Thank you Dong for driving this FLIP. > > > > > > > > > > > > > > The new design looks good to me! > > > > > > > > > > > > > > Best, > > > > > > > Jark > > > > > > > > > > > > > > > 2023年6月27日 14:38,Dong Lin <lindon...@gmail.com> 写道: > > > > > > > > > > > > > > > > Thank you Leonard for the review! > > > > > > > > > > > > > > > > Hi Piotr, do you have any comments on the latest proposal? > > > > > > > > > > > > > > > > I am wondering if it is OK to start the voting thread this > > week. > > > > > > > > > > > > > > > > On Mon, Jun 26, 2023 at 4:10 PM Leonard Xu < > xbjt...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > > > >> Thanks Dong for driving this FLIP forward! > > > > > > > >> > > > > > > > >> Introducing `backlog status` concept for flink job makes > > sense > > > to > > > > > me as > > > > > > > >> following reasons: > > > > > > > >> > > > > > > > >> From concept/API design perspective, it’s more general and > > > natural > > > > > than > > > > > > > >> above proposals as it can be used in HybridSource for > bounded > > > > > records, CDC > > > > > > > >> Source for history snapshot and general sources like > > KafkaSource > > > > for > > > > > > > >> historical messages. > > > > > > > >> > > > > > > > >> From user cases/requirements, I’ve seen many users manually > to > > > set > > > > > larger > > > > > > > >> checkpoint interval during backfilling and then set a > shorter > > > > > checkpoint > > > > > > > >> interval for real-time processing in their production > > > environments > > > > > as a > > > > > > > >> flink application optimization. Now, the flink framework can > > > make > > > > > this > > > > > > > >> optimization no longer require the user to set the > checkpoint > > > > > interval and > > > > > > > >> restart the job multiple times. > > > > > > > >> > > > > > > > >> Following supporting using larger checkpoint for job under > > > backlog > > > > > status > > > > > > > >> in current FLIP, we can explore supporting larger > > > > > parallelism/memory/cpu > > > > > > > >> for job under backlog status in the future. > > > > > > > >> > > > > > > > >> In short, the updated FLIP looks good to me. > > > > > > > >> > > > > > > > >> > > > > > > > >> Best, > > > > > > > >> Leonard > > > > > > > >> > > > > > > > >> > > > > > > > >>> On Jun 22, 2023, at 12:07 PM, Dong Lin < > lindon...@gmail.com> > > > > > wrote: > > > > > > > >>> > > > > > > > >>> Hi Piotr, > > > > > > > >>> > > > > > > > >>> Thanks again for proposing the isProcessingBacklog concept. > > > > > > > >>> > > > > > > > >>> After discussing with Becket Qin and thinking about this > > more, > > > I > > > > > agree it > > > > > > > >>> is a better idea to add a top-level concept to all source > > > > > operators to > > > > > > > >>> address the target use-case. > > > > > > > >>> > > > > > > > >>> The main reason that changed my mind is that > > > isProcessingBacklog > > > > > can be > > > > > > > >>> described as an inherent/nature attribute of every source > > > > instance > > > > > and > > > > > > > >> its > > > > > > > >>> semantics does not need to depend on any specific > > checkpointing > > > > > policy. > > > > > > > >>> Also, we can hardcode the isProcessingBacklog behavior for > > the > > > > > sources we > > > > > > > >>> have considered so far (e.g. HybridSource and MySQL CDC > > source) > > > > > without > > > > > > > >>> asking users to explicitly configure the per-source > behavior, > > > > which > > > > > > > >> indeed > > > > > > > >>> provides better user experience. > > > > > > > >>> > > > > > > > >>> I have updated the FLIP based on the latest suggestions. > The > > > > > latest FLIP > > > > > > > >> no > > > > > > > >>> longer introduces per-source config that can be used by > > > > end-users. > > > > > While > > > > > > > >> I > > > > > > > >>> agree with you that CheckpointTrigger can be a useful > feature > > > to > > > > > address > > > > > > > >>> additional use-cases, I am not sure it is necessary for the > > > > > use-case > > > > > > > >>> targeted by FLIP-309. Maybe we can introduce > > CheckpointTrigger > > > > > separately > > > > > > > >>> in another FLIP? > > > > > > > >>> > > > > > > > >>> Can you help take another look at the updated FLIP? > > > > > > > >>> > > > > > > > >>> Best, > > > > > > > >>> Dong > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> On Fri, Jun 16, 2023 at 11:59 PM Piotr Nowojski < > > > > > pnowoj...@apache.org> > > > > > > > >>> wrote: > > > > > > > >>> > > > > > > > >>>> Hi Dong, > > > > > > > >>>> > > > > > > > >>>>> Suppose there are 1000 subtask and each subtask has 1% > > chance > > > > of > > > > > being > > > > > > > >>>>> "backpressured" at a given time (due to random traffic > > > spikes). > > > > > Then at > > > > > > > >>>> any > > > > > > > >>>>> given time, the chance of the job > > > > > > > >>>>> being considered not-backpressured = (1-0.01)^1000. Since > > we > > > > > evaluate > > > > > > > >> the > > > > > > > >>>>> backpressure metric once a second, the estimated time for > > the > > > > job > > > > > > > >>>>> to be considered not-backpressured is roughly 1 / > > > > > ((1-0.01)^1000) = > > > > > > > >> 23163 > > > > > > > >>>>> sec = 6.4 hours. > > > > > > > >>>>> > > > > > > > >>>>> This means that the job will effectively always use the > > > longer > > > > > > > >>>>> checkpointing interval. It looks like a real concern, > > right? > > > > > > > >>>> > > > > > > > >>>> Sorry I don't understand where you are getting those > numbers > > > > from. > > > > > > > >>>> Instead of trying to find loophole after loophole, could > you > > > try > > > > > to > > > > > > > >> think > > > > > > > >>>> how a given loophole could be improved/solved? > > > > > > > >>>> > > > > > > > >>>>> Hmm... I honestly think it will be useful to know the > APIs > > > due > > > > > to the > > > > > > > >>>>> following reasons. > > > > > > > >>>> > > > > > > > >>>> Please propose something. I don't think it's needed. > > > > > > > >>>> > > > > > > > >>>>> - For the use-case mentioned in FLIP-309 motivation > > section, > > > > > would the > > > > > > > >>>> APIs > > > > > > > >>>>> of this alternative approach be more or less usable? > > > > > > > >>>> > > > > > > > >>>> Everything that you originally wanted to achieve in > > FLIP-309, > > > > you > > > > > could > > > > > > > >> do > > > > > > > >>>> as well in my proposal. > > > > > > > >>>> Vide my many mentions of the "hacky solution". > > > > > > > >>>> > > > > > > > >>>>> - Can these APIs reliably address the extra use-case > (e.g. > > > > allow > > > > > > > >>>>> checkpointing interval to change dynamically even during > > the > > > > > unbounded > > > > > > > >>>>> phase) as it claims? > > > > > > > >>>> > > > > > > > >>>> I don't see why not. > > > > > > > >>>> > > > > > > > >>>>> - Can these APIs be decoupled from the APIs currently > > > proposed > > > > in > > > > > > > >>>> FLIP-309? > > > > > > > >>>> > > > > > > > >>>> Yes > > > > > > > >>>> > > > > > > > >>>>> For example, if the APIs of this alternative approach can > > be > > > > > decoupled > > > > > > > >>>> from > > > > > > > >>>>> the APIs currently proposed in FLIP-309, then it might be > > > > > reasonable to > > > > > > > >>>>> work on this extra use-case with a more > > advanced/complicated > > > > > design > > > > > > > >>>>> separately in a followup work. > > > > > > > >>>> > > > > > > > >>>> As I voiced my concerns previously, the current design of > > > > > FLIP-309 would > > > > > > > >>>> clog the public API and in the long run confuse the users. > > IMO > > > > > It's > > > > > > > >>>> addressing the > > > > > > > >>>> problem in the wrong place. > > > > > > > >>>> > > > > > > > >>>>> Hmm.. do you mean we can do the following: > > > > > > > >>>>> - Have all source operators emit a metric named > > > > > "processingBacklog". > > > > > > > >>>>> - Add a job-level config that specifies "the > checkpointing > > > > > interval to > > > > > > > >> be > > > > > > > >>>>> used when any source is processing backlog". > > > > > > > >>>>> - The JM collects the "processingBacklog" periodically > from > > > all > > > > > source > > > > > > > >>>>> operators and uses the newly added config value as > > > appropriate. > > > > > > > >>>> > > > > > > > >>>> Yes. > > > > > > > >>>> > > > > > > > >>>>> The challenge with this approach is that we need to > define > > > the > > > > > > > >> semantics > > > > > > > >>>> of > > > > > > > >>>>> this "processingBacklog" metric and have all source > > operators > > > > > > > >>>>> implement this metric. I am not sure we are able to do > this > > > yet > > > > > without > > > > > > > >>>>> having users explicitly provide this information on a > > > > per-source > > > > > basis. > > > > > > > >>>>> > > > > > > > >>>>> Suppose the job read from a bounded Kafka source, should > it > > > > emit > > > > > > > >>>>> "processingBacklog=true"? If yes, then the job might use > > long > > > > > > > >>>> checkpointing > > > > > > > >>>>> interval even > > > > > > > >>>>> if the job is asked to process data starting from now to > > the > > > > > next 1 > > > > > > > >> hour. > > > > > > > >>>>> If no, then the job might use the short checkpointing > > > interval > > > > > > > >>>>> even if the job is asked to re-process data starting > from 7 > > > > days > > > > > ago. > > > > > > > >>>> > > > > > > > >>>> Yes. The same can be said of your proposal. Your proposal > > has > > > > the > > > > > very > > > > > > > >> same > > > > > > > >>>> issues > > > > > > > >>>> that every source would have to implement it differently, > > most > > > > > sources > > > > > > > >>>> would > > > > > > > >>>> have no idea how to properly calculate the new requested > > > > > checkpoint > > > > > > > >>>> interval, > > > > > > > >>>> for those that do know how to do that, user would have to > > > > > configure > > > > > > > >> every > > > > > > > >>>> source > > > > > > > >>>> individually and yet again we would end up with a system, > > that > > > > > works > > > > > > > >> only > > > > > > > >>>> partially in > > > > > > > >>>> some special use cases (HybridSource), that's confusing > the > > > > users > > > > > even > > > > > > > >>>> more. > > > > > > > >>>> > > > > > > > >>>> That's why I think the more generic solution, working > > > primarily > > > > > on the > > > > > > > >> same > > > > > > > >>>> metrics that are used by various auto scaling solutions > > (like > > > > > Flink K8s > > > > > > > >>>> operator's > > > > > > > >>>> autosaler) would be better. The hacky solution I proposed > > to: > > > > > > > >>>> 1. show you that the generic solution is simply a superset > > of > > > > your > > > > > > > >> proposal > > > > > > > >>>> 2. if you are adamant that busyness/backpressured/records > > > > > processing > > > > > > > >>>> rate/pending records > > > > > > > >>>> metrics wouldn't cover your use case sufficiently (imo > > they > > > > > can), > > > > > > > >> then > > > > > > > >>>> you can very easily > > > > > > > >>>> enhance this algorithm with using some hints from the > > > sources. > > > > > Like > > > > > > > >>>> "processingBacklog==true" > > > > > > > >>>> to short circuit the main algorithm, if > > `processingBacklog` > > > is > > > > > > > >>>> available. > > > > > > > >>>> > > > > > > > >>>> Best, > > > > > > > >>>> Piotrek > > > > > > > >>>> > > > > > > > >>>> > > > > > > > >>>> pt., 16 cze 2023 o 04:45 Dong Lin <lindon...@gmail.com> > > > > > napisał(a): > > > > > > > >>>> > > > > > > > >>>>> Hi again Piotr, > > > > > > > >>>>> > > > > > > > >>>>> Thank you for the reply. Please see my reply inline. > > > > > > > >>>>> > > > > > > > >>>>> On Fri, Jun 16, 2023 at 12:11 AM Piotr Nowojski < > > > > > > > >>>> piotr.nowoj...@gmail.com> > > > > > > > >>>>> wrote: > > > > > > > >>>>> > > > > > > > >>>>>> Hi again Dong, > > > > > > > >>>>>> > > > > > > > >>>>>>> I understand that JM will get the backpressure-related > > > > metrics > > > > > every > > > > > > > >>>>> time > > > > > > > >>>>>>> the RestServerEndpoint receives the REST request to get > > > these > > > > > > > >>>> metrics. > > > > > > > >>>>>> But > > > > > > > >>>>>>> I am not sure if RestServerEndpoint is already always > > > > > receiving the > > > > > > > >>>>> REST > > > > > > > >>>>>>> metrics at regular interval (suppose there is no human > > > > manually > > > > > > > >>>>>>> opening/clicking the Flink Web UI). And if it does, > what > > is > > > > the > > > > > > > >>>>> interval? > > > > > > > >>>>>> > > > > > > > >>>>>> Good catch, I've thought that metrics are pre-emptively > > sent > > > > to > > > > > JM > > > > > > > >>>> every > > > > > > > >>>>> 10 > > > > > > > >>>>>> seconds. > > > > > > > >>>>>> Indeed that's not the case at the moment, and that would > > > have > > > > > to be > > > > > > > >>>>>> improved. > > > > > > > >>>>>> > > > > > > > >>>>>>> I would be surprised if Flink is already paying this > much > > > > > overhead > > > > > > > >>>> just > > > > > > > >>>>>> for > > > > > > > >>>>>>> metrics monitoring. That is the main reason I still > doubt > > > it > > > > > is true. > > > > > > > >>>>> Can > > > > > > > >>>>>>> you show where this 100 ms is currently configured? > > > > > > > >>>>>>> > > > > > > > >>>>>>> Alternatively, maybe you mean that we should add extra > > code > > > > to > > > > > invoke > > > > > > > >>>>> the > > > > > > > >>>>>>> REST API at 100 ms interval. Then that means we need to > > > > > considerably > > > > > > > >>>>>>> increase the network/cpu overhead at JM, where the > > overhead > > > > > will > > > > > > > >>>>> increase > > > > > > > >>>>>>> as the number of TM/slots increase, which may pose risk > > to > > > > the > > > > > > > >>>>>> scalability > > > > > > > >>>>>>> of the proposed design. I am not sure we should do > this. > > > What > > > > > do you > > > > > > > >>>>>> think? > > > > > > > >>>>>> > > > > > > > >>>>>> Sorry. I didn't mean metric should be reported every > > 100ms. > > > I > > > > > meant > > > > > > > >>>> that > > > > > > > >>>>>> "backPressuredTimeMsPerSecond (metric) would report (a > > value > > > > of) > > > > > > > >>>>> 100ms/s." > > > > > > > >>>>>> once per metric interval (10s?). > > > > > > > >>>>>> > > > > > > > >>>>> > > > > > > > >>>>> Suppose there are 1000 subtask and each subtask has 1% > > chance > > > > of > > > > > being > > > > > > > >>>>> "backpressured" at a given time (due to random traffic > > > spikes). > > > > > Then at > > > > > > > >>>> any > > > > > > > >>>>> given time, the chance of the job > > > > > > > >>>>> being considered not-backpressured = (1-0.01)^1000. Since > > we > > > > > evaluate > > > > > > > >> the > > > > > > > >>>>> backpressure metric once a second, the estimated time for > > the > > > > job > > > > > > > >>>>> to be considered not-backpressured is roughly 1 / > > > > > ((1-0.01)^1000) = > > > > > > > >> 23163 > > > > > > > >>>>> sec = 6.4 hours. > > > > > > > >>>>> > > > > > > > >>>>> This means that the job will effectively always use the > > > longer > > > > > > > >>>>> checkpointing interval. It looks like a real concern, > > right? > > > > > > > >>>>> > > > > > > > >>>>> > > > > > > > >>>>>>> - What is the interface of this CheckpointTrigger? For > > > > > example, are > > > > > > > >>>> we > > > > > > > >>>>>>> going to give CheckpointTrigger a context that it can > use > > > to > > > > > fetch > > > > > > > >>>>>>> arbitrary metric values? This can help us understand > what > > > > > information > > > > > > > >>>>>> this > > > > > > > >>>>>>> user-defined CheckpointTrigger can use to make the > > > checkpoint > > > > > > > >>>> decision. > > > > > > > >>>>>> > > > > > > > >>>>>> I honestly don't think this is important at this stage > of > > > the > > > > > > > >>>> discussion. > > > > > > > >>>>>> It could have > > > > > > > >>>>>> whatever interface we would deem to be best. Required > > > things: > > > > > > > >>>>>> > > > > > > > >>>>>> - access to at least a subset of metrics that the given > > > > > > > >>>>> `CheckpointTrigger` > > > > > > > >>>>>> requests, > > > > > > > >>>>>> for example via some registration mechanism, so we don't > > > have > > > > to > > > > > > > >>>> fetch > > > > > > > >>>>>> all of the > > > > > > > >>>>>> metrics all the time from TMs. > > > > > > > >>>>>> - some way to influence `CheckpointCoordinator`. Either > > via > > > > > manually > > > > > > > >>>>>> triggering > > > > > > > >>>>>> checkpoints, and/or ability to change the checkpointing > > > > > interval. > > > > > > > >>>>>> > > > > > > > >>>>> > > > > > > > >>>>> Hmm... I honestly think it will be useful to know the > APIs > > > due > > > > > to the > > > > > > > >>>>> following reasons. > > > > > > > >>>>> > > > > > > > >>>>> We would need to know the concrete APIs to gauge the > > > following: > > > > > > > >>>>> - For the use-case mentioned in FLIP-309 motivation > > section, > > > > > would the > > > > > > > >>>> APIs > > > > > > > >>>>> of this alternative approach be more or less usable? > > > > > > > >>>>> - Can these APIs reliably address the extra use-case > (e.g. > > > > allow > > > > > > > >>>>> checkpointing interval to change dynamically even during > > the > > > > > unbounded > > > > > > > >>>>> phase) as it claims? > > > > > > > >>>>> - Can these APIs be decoupled from the APIs currently > > > proposed > > > > in > > > > > > > >>>> FLIP-309? > > > > > > > >>>>> > > > > > > > >>>>> For example, if the APIs of this alternative approach can > > be > > > > > decoupled > > > > > > > >>>> from > > > > > > > >>>>> the APIs currently proposed in FLIP-309, then it might be > > > > > reasonable to > > > > > > > >>>>> work on this extra use-case with a more > > advanced/complicated > > > > > design > > > > > > > >>>>> separately in a followup work. > > > > > > > >>>>> > > > > > > > >>>>> > > > > > > > >>>>>>> - Where is this CheckpointTrigger running? For example, > > is > > > it > > > > > going > > > > > > > >>>> to > > > > > > > >>>>>> run > > > > > > > >>>>>>> on the subtask of every source operator? Or is it going > > to > > > > run > > > > > on the > > > > > > > >>>>> JM? > > > > > > > >>>>>> > > > > > > > >>>>>> IMO on the JM. > > > > > > > >>>>>> > > > > > > > >>>>>>> - Are we going to provide a default implementation of > > this > > > > > > > >>>>>>> CheckpointTrigger in Flink that implements the > algorithm > > > > > described > > > > > > > >>>>> below, > > > > > > > >>>>>>> or do we expect each source operator developer to > > implement > > > > > their own > > > > > > > >>>>>>> CheckpointTrigger? > > > > > > > >>>>>> > > > > > > > >>>>>> As I mentioned before, I think we should provide at the > > very > > > > > least the > > > > > > > >>>>>> implementation > > > > > > > >>>>>> that replaces the current triggering mechanism > (statically > > > > > configured > > > > > > > >>>>>> checkpointing interval) > > > > > > > >>>>>> and it would be great to provide the backpressure > > monitoring > > > > > trigger > > > > > > > >> as > > > > > > > >>>>>> well. > > > > > > > >>>>>> > > > > > > > >>>>> > > > > > > > >>>>> I agree that if there is a good use-case that can be > > > addressed > > > > > by the > > > > > > > >>>>> proposed CheckpointTrigger, then it is reasonable > > > > > > > >>>>> to add CheckpointTrigger and replace the current > triggering > > > > > mechanism > > > > > > > >>>> with > > > > > > > >>>>> it. > > > > > > > >>>>> > > > > > > > >>>>> I also agree that we will likely find such a use-case. > For > > > > > example, > > > > > > > >>>> suppose > > > > > > > >>>>> the source records have event timestamps, then it is > likely > > > > > > > >>>>> that we can use the trigger to dynamically control the > > > > > checkpointing > > > > > > > >>>>> interval based on the difference between the watermark > and > > > > > current > > > > > > > >> system > > > > > > > >>>>> time. > > > > > > > >>>>> > > > > > > > >>>>> But I am not sure the addition of this CheckpointTrigger > > > should > > > > > be > > > > > > > >>>> coupled > > > > > > > >>>>> with FLIP-309. Whether or not it is coupled probably > > depends > > > on > > > > > the > > > > > > > >>>>> concrete API design around CheckpointTrigger. > > > > > > > >>>>> > > > > > > > >>>>> If you would be adamant that the backpressure monitoring > > > > doesn't > > > > > cover > > > > > > > >>>> well > > > > > > > >>>>>> enough your use case, I would be ok to provide the hacky > > > > > version that > > > > > > > >> I > > > > > > > >>>>>> also mentioned > > > > > > > >>>>>> before: > > > > > > > >>>>> > > > > > > > >>>>> > > > > > > > >>>>>> """ > > > > > > > >>>>>> Especially that if my proposed algorithm wouldn't work > > good > > > > > enough, > > > > > > > >>>> there > > > > > > > >>>>>> is > > > > > > > >>>>>> an obvious solution, that any source could add a metric, > > > like > > > > > let say > > > > > > > >>>>>> "processingBacklog: true/false", and the > > `CheckpointTrigger` > > > > > > > >>>>>> could use this as an override to always switch to the > > > > > > > >>>>>> "slowCheckpointInterval". I don't think we need it, but > > > that's > > > > > always > > > > > > > >>>> an > > > > > > > >>>>>> option > > > > > > > >>>>>> that would be basically equivalent to your original > > > proposal. > > > > > > > >>>>>> """ > > > > > > > >>>>>> > > > > > > > >>>>> > > > > > > > >>>>> Hmm.. do you mean we can do the following: > > > > > > > >>>>> - Have all source operators emit a metric named > > > > > "processingBacklog". > > > > > > > >>>>> - Add a job-level config that specifies "the > checkpointing > > > > > interval to > > > > > > > >> be > > > > > > > >>>>> used when any source is processing backlog". > > > > > > > >>>>> - The JM collects the "processingBacklog" periodically > from > > > all > > > > > source > > > > > > > >>>>> operators and uses the newly added config value as > > > appropriate. > > > > > > > >>>>> > > > > > > > >>>>> The challenge with this approach is that we need to > define > > > the > > > > > > > >> semantics > > > > > > > >>>> of > > > > > > > >>>>> this "processingBacklog" metric and have all source > > operators > > > > > > > >>>>> implement this metric. I am not sure we are able to do > this > > > yet > > > > > without > > > > > > > >>>>> having users explicitly provide this information on a > > > > per-source > > > > > basis. > > > > > > > >>>>> > > > > > > > >>>>> Suppose the job read from a bounded Kafka source, should > it > > > > emit > > > > > > > >>>>> "processingBacklog=true"? If yes, then the job might use > > long > > > > > > > >>>> checkpointing > > > > > > > >>>>> interval even > > > > > > > >>>>> if the job is asked to process data starting from now to > > the > > > > > next 1 > > > > > > > >> hour. > > > > > > > >>>>> If no, then the job might use the short checkpointing > > > interval > > > > > > > >>>>> even if the job is asked to re-process data starting > from 7 > > > > days > > > > > ago. > > > > > > > >>>>> > > > > > > > >>>>> > > > > > > > >>>>>> > > > > > > > >>>>>>> - How can users specify the > > > > > > > >>>>>> fastCheckpointInterval/slowCheckpointInterval? > > > > > > > >>>>>>> For example, will we provide APIs on the > > CheckpointTrigger > > > > that > > > > > > > >>>>> end-users > > > > > > > >>>>>>> can use to specify the checkpointing interval? What > would > > > > that > > > > > look > > > > > > > >>>>> like? > > > > > > > >>>>>> > > > > > > > >>>>>> Also as I mentioned before, just like metric reporters > are > > > > > configured: > > > > > > > >>>>>> > > > > > > > >>>>>> > > > > > > > >>>>> > > > > > > > >>>> > > > > > > > >> > > > > > > > > > > > > > > > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/metric_reporters/ > > > > > > > >>>>>> Every CheckpointTrigger could have its own custom > > > > configuration. > > > > > > > >>>>>> > > > > > > > >>>>>>> Overall, my gut feel is that the alternative approach > > based > > > > on > > > > > > > >>>>>>> CheckpointTrigger is more complicated > > > > > > > >>>>>> > > > > > > > >>>>>> Yes, as usual, more generic things are more complicated, > > but > > > > > often > > > > > > > >> more > > > > > > > >>>>>> useful in the long run. > > > > > > > >>>>>> > > > > > > > >>>>>>> and harder to use. > > > > > > > >>>>>> > > > > > > > >>>>>> I don't agree. Why setting in config > > > > > > > >>>>>> > > > > > > > >>>>>> execution.checkpointing.trigger: > > > > > > > >>>> BackPressureMonitoringCheckpointTrigger > > > > > > > >>>>>> > > > > > > > >>>>>> > > > > > > > >>>>> > > > > > > > >>>> > > > > > > > >> > > > > > > > > > > > > > > > execution.checkpointing.BackPressureMonitoringCheckpointTrigger.fast-interval: > > > > > > > >>>>>> 1s > > > > > > > >>>>>> > > > > > > > >>>>>> > > > > > > > >>>>> > > > > > > > >>>> > > > > > > > >> > > > > > > > > > > > > > > > execution.checkpointing.BackPressureMonitoringCheckpointTrigger.slow-interval: > > > > > > > >>>>>> 30s > > > > > > > >>>>>> > > > > > > > >>>>>> that we could even provide a shortcut to the above > > construct > > > > > via: > > > > > > > >>>>>> > > > > > > > >>>>>> execution.checkpointing.fast-interval: 1s > > > > > > > >>>>>> execution.checkpointing.slow-interval: 30s > > > > > > > >>>>>> > > > > > > > >>>>>> is harder compared to setting two/three checkpoint > > > intervals, > > > > > one in > > > > > > > >>>> the > > > > > > > >>>>>> config/or via `env.enableCheckpointing(x)`, > > > > > > > >>>>>> secondly passing one/two (fast/slow) values on the > source > > > > > itself? > > > > > > > >>>>>> > > > > > > > >>>>> > > > > > > > >>>>> If we can address the use-case by providing just the two > > > > > job-level > > > > > > > >> config > > > > > > > >>>>> as described above, I agree it will indeed be simpler. > > > > > > > >>>>> > > > > > > > >>>>> I have tried to achieve this goal. But the caveat is that > > it > > > > > requires > > > > > > > >>>> much > > > > > > > >>>>> more work than described above in order to give the > configs > > > > > > > >> well-defined > > > > > > > >>>>> semantics. So I find it simpler to just use the approach > in > > > > > FLIP-309. > > > > > > > >>>>> > > > > > > > >>>>> Let me explain my concern below. It will be great if you > or > > > > > someone > > > > > > > >> else > > > > > > > >>>>> can help provide a solution. > > > > > > > >>>>> > > > > > > > >>>>> 1) We need to clearly document when the fast-interval and > > > > > slow-interval > > > > > > > >>>>> will be used so that users can derive the expected > behavior > > > of > > > > > the job > > > > > > > >>>> and > > > > > > > >>>>> be able to config these values. > > > > > > > >>>>> > > > > > > > >>>>> 2) The trigger of fast/slow interval depends on the > > behavior > > > of > > > > > the > > > > > > > >>>> source > > > > > > > >>>>> (e.g. MySQL CDC, HybridSource). However, no existing > > concepts > > > > of > > > > > source > > > > > > > >>>>> operator (e.g. boundedness) can describe the target > > behavior. > > > > For > > > > > > > >>>> example, > > > > > > > >>>>> MySQL CDC internally has two phases, namely snapshot > phase > > > and > > > > > binlog > > > > > > > >>>>> phase, which are not explicitly exposed to its users via > > > source > > > > > > > >> operator > > > > > > > >>>>> API. And we probably should not enumerate all internal > > phases > > > > of > > > > > all > > > > > > > >>>> source > > > > > > > >>>>> operators that are affected by fast/slow interval. > > > > > > > >>>>> > > > > > > > >>>>> 3) An alternative approach might be to define a new > concept > > > > (e.g. > > > > > > > >>>>> processingBacklog) that is applied to all source > operators. > > > > Then > > > > > the > > > > > > > >>>>> fast/slow interval's documentation can depend on this > > > concept. > > > > > That > > > > > > > >> means > > > > > > > >>>>> we have to add a top-level concept (similar to source > > > > > boundedness) and > > > > > > > >>>>> require all source operators to specify how they enforce > > this > > > > > concept > > > > > > > >>>> (e.g. > > > > > > > >>>>> FileSystemSource always emits processingBacklog=true). > And > > > > there > > > > > might > > > > > > > >> be > > > > > > > >>>>> cases where the source itself (e.g. a bounded Kafka > Source) > > > can > > > > > not > > > > > > > >>>>> automatically derive the value of this concept, in which > > case > > > > we > > > > > need > > > > > > > >> to > > > > > > > >>>>> provide option for users to explicitly specify the value > > for > > > > this > > > > > > > >> concept > > > > > > > >>>>> on a per-source basis. > > > > > > > >>>>> > > > > > > > >>>>> > > > > > > > >>>>> > > > > > > > >>>>>>> And it probably also has the issues of "having two > places > > > to > > > > > > > >>>> configure > > > > > > > >>>>>> checkpointing > > > > > > > >>>>>>> interval" and "giving flexibility for every source to > > > > > implement a > > > > > > > >>>>>> different > > > > > > > >>>>>>> API" (as mentioned below). > > > > > > > >>>>>> > > > > > > > >>>>>> No, it doesn't. > > > > > > > >>>>>> > > > > > > > >>>>>>> IMO, it is a hard-requirement for the user-facing API > to > > be > > > > > > > >>>>>>> clearly defined and users should be able to use the API > > > > without > > > > > > > >>>> concern > > > > > > > >>>>>> of > > > > > > > >>>>>>> regression. And this requirement is more important than > > the > > > > > other > > > > > > > >>>> goals > > > > > > > >>>>>>> discussed above because it is related to the > > > > > stability/performance of > > > > > > > >>>>> the > > > > > > > >>>>>>> production job. What do you think? > > > > > > > >>>>>> > > > > > > > >>>>>> I don't agree with this. There are many things that work > > > > > something in > > > > > > > >>>>>> between perfectly and well enough > > > > > > > >>>>>> in some fraction of use cases (maybe in 99%, maybe 95% > or > > > > maybe > > > > > 60%), > > > > > > > >>>>> while > > > > > > > >>>>>> still being very useful. > > > > > > > >>>>>> Good examples are: selection of state backend, unaligned > > > > > checkpoints, > > > > > > > >>>>>> buffer debloating but frankly if I go > > > > > > > >>>>>> through list of currently available config options, > > > something > > > > > like > > > > > > > >> half > > > > > > > >>>>> of > > > > > > > >>>>>> them can cause regressions. Heck, > > > > > > > >>>>>> even Flink itself doesn't work perfectly in 100% of the > > use > > > > > cases, due > > > > > > > >>>>> to a > > > > > > > >>>>>> variety of design choices. Of > > > > > > > >>>>>> course, the more use cases are fine with said feature, > the > > > > > better, but > > > > > > > >>>> we > > > > > > > >>>>>> shouldn't fixate to perfectly cover > > > > > > > >>>>>> 100% of the cases, as that's impossible. > > > > > > > >>>>>> > > > > > > > >>>>>> In this particular case, if back pressure monitoring > > > trigger > > > > > can work > > > > > > > >>>>> well > > > > > > > >>>>>> enough in 95% of cases, I would > > > > > > > >>>>>> say that's already better than the originally proposed > > > > > alternative, > > > > > > > >>>> which > > > > > > > >>>>>> doesn't work at all if user has a large > > > > > > > >>>>>> backlog to reprocess from Kafka, including when using > > > > > HybridSource > > > > > > > >>>> AFTER > > > > > > > >>>>>> the switch to Kafka has > > > > > > > >>>>>> happened. For the remaining 5%, we should try to improve > > the > > > > > behaviour > > > > > > > >>>>> over > > > > > > > >>>>>> time, but ultimately, users can > > > > > > > >>>>>> decide to just run a fixed checkpoint interval (or at > > worst > > > > use > > > > > the > > > > > > > >>>> hacky > > > > > > > >>>>>> checkpoint trigger that I mentioned > > > > > > > >>>>>> before a couple of times). > > > > > > > >>>>>> > > > > > > > >>>>>> Also to be pedantic, if a user naively selects > > slow-interval > > > > in > > > > > your > > > > > > > >>>>>> proposal to 30 minutes, when that user's > > > > > > > >>>>>> job fails on average every 15-20minutes, his job can end > > up > > > in > > > > > a state > > > > > > > >>>>> that > > > > > > > >>>>>> it can not make any progress, > > > > > > > >>>>>> this arguably is quite serious regression. > > > > > > > >>>>>> > > > > > > > >>>>> > > > > > > > >>>>> I probably should not say it is "hard requirement". After > > all > > > > > there are > > > > > > > >>>>> pros/cons. We will need to consider implementation > > > complexity, > > > > > > > >> usability, > > > > > > > >>>>> extensibility etc. > > > > > > > >>>>> > > > > > > > >>>>> I just don't think we should take it for granted to > > introduce > > > > > > > >> regression > > > > > > > >>>>> for one use-case in order to support another use-case. If > > we > > > > can > > > > > not > > > > > > > >> find > > > > > > > >>>>> an algorithm/solution that addresses > > > > > > > >>>>> both use-case well, I hope we can be open to tackle them > > > > > separately so > > > > > > > >>>> that > > > > > > > >>>>> users can choose the option that best fits their needs. > > > > > > > >>>>> > > > > > > > >>>>> All things else being equal, I think it is preferred for > > > > > user-facing > > > > > > > >> API > > > > > > > >>>> to > > > > > > > >>>>> be clearly defined and let users should be able to use > the > > > API > > > > > without > > > > > > > >>>>> concern of regression. > > > > > > > >>>>> > > > > > > > >>>>> Maybe we can list pros/cons for the alternative > approaches > > we > > > > > have been > > > > > > > >>>>> discussing and see choose the best approach. And maybe we > > > will > > > > > end up > > > > > > > >>>>> finding that use-case > > > > > > > >>>>> which needs CheckpointTrigger can be tackled separately > > from > > > > the > > > > > > > >> use-case > > > > > > > >>>>> in FLIP-309. > > > > > > > >>>>> > > > > > > > >>>>> > > > > > > > >>>>>>> I am not sure if there is a typo. Because if > > > > > > > >>>>> backPressuredTimeMsPerSecond > > > > > > > >>>>>> = > > > > > > > >>>>>>> 0, then maxRecordsConsumedWithoutBackpressure = > > > > > > > >>>> numRecordsInPerSecond / > > > > > > > >>>>>>> 1000 * metricsUpdateInterval according to the above > > > > algorithm. > > > > > > > >>>>>>> > > > > > > > >>>>>>> Do you mean "maxRecordsConsumedWithoutBackpressure = > > > > > > > >>>>>> (numRecordsInPerSecond > > > > > > > >>>>>>> / (1 - backPressuredTimeMsPerSecond / 1000)) * > > > > > > > >>>> metricsUpdateInterval"? > > > > > > > >>>>>> > > > > > > > >>>>>> It looks like there is indeed some mistake in my > proposal > > > > > above. Yours > > > > > > > >>>>> look > > > > > > > >>>>>> more correct, it probably > > > > > > > >>>>>> still needs some safeguard/special handling if > > > > > > > >>>>>> `backPressuredTimeMsPerSecond > 950` > > > > > > > >>>>>> > > > > > > > >>>>>>> The only information it can access is the backlog. > > > > > > > >>>>>> > > > > > > > >>>>>> Again no. It can access whatever we want to provide to > it. > > > > > > > >>>>>> > > > > > > > >>>>>> Regarding the rest of your concerns. It's a matter of > > > tweaking > > > > > the > > > > > > > >>>>>> parameters and the algorithm itself, > > > > > > > >>>>>> and how much safety-net do we want to have. Ultimately, > > I'm > > > > > pretty > > > > > > > >> sure > > > > > > > >>>>>> that's a (for 95-99% of cases) > > > > > > > >>>>>> solvable problem. If not, there is always the hacky > > > solution, > > > > > that > > > > > > > >>>> could > > > > > > > >>>>> be > > > > > > > >>>>>> even integrated into this above > > > > > > > >>>>>> mentioned algorithm as a short circuit to always reach > > > > > > > >> `slow-interval`. > > > > > > > >>>>>> > > > > > > > >>>>>> Apart of that, you picked 3 minutes as the checkpointing > > > > > interval in > > > > > > > >>>> your > > > > > > > >>>>>> counter example. In most cases > > > > > > > >>>>>> any interval above 1 minute would inflict pretty > > negligible > > > > > overheads, > > > > > > > >>>> so > > > > > > > >>>>>> all in all, I would doubt there is > > > > > > > >>>>>> a significant benefit (in most cases) of increasing 3 > > minute > > > > > > > >> checkpoint > > > > > > > >>>>>> interval to anything more, let alone > > > > > > > >>>>>> 30 minutes. > > > > > > > >>>>>> > > > > > > > >>>>> > > > > > > > >>>>> I am not sure we should design the algorithm with the > > > > assumption > > > > > that > > > > > > > >> the > > > > > > > >>>>> short checkpointing interval will always be higher than 1 > > > > minute > > > > > etc. > > > > > > > >>>>> > > > > > > > >>>>> I agree the proposed algorithm can solve most cases where > > the > > > > > resource > > > > > > > >> is > > > > > > > >>>>> sufficient and there is always no backlog in source > > subtasks. > > > > On > > > > > the > > > > > > > >>>> other > > > > > > > >>>>> hand, what makes SRE > > > > > > > >>>>> life hard is probably the remaining 1-5% cases where the > > > > traffic > > > > > is > > > > > > > >> spiky > > > > > > > >>>>> and the cluster is reaching its capacity limit. The > ability > > > to > > > > > predict > > > > > > > >>>> and > > > > > > > >>>>> control Flink job's behavior (including checkpointing > > > interval) > > > > > can > > > > > > > >>>>> considerably reduce the burden of manging Flink jobs. > > > > > > > >>>>> > > > > > > > >>>>> Best, > > > > > > > >>>>> Dong > > > > > > > >>>>> > > > > > > > >>>>> > > > > > > > >>>>>> > > > > > > > >>>>>> Best, > > > > > > > >>>>>> Piotrek > > > > > > > >>>>>> > > > > > > > >>>>>> > > > > > > > >>>>>> > > > > > > > >>>>>> > > > > > > > >>>>>> > > > > > > > >>>>>> sob., 3 cze 2023 o 05:44 Dong Lin <lindon...@gmail.com> > > > > > napisał(a): > > > > > > > >>>>>> > > > > > > > >>>>>>> Hi Piotr, > > > > > > > >>>>>>> > > > > > > > >>>>>>> Thanks for the explanations. I have some followup > > questions > > > > > below. > > > > > > > >>>>>>> > > > > > > > >>>>>>> On Fri, Jun 2, 2023 at 10:55 PM Piotr Nowojski < > > > > > pnowoj...@apache.org > > > > > > > >>>>> > > > > > > > >>>>>>> wrote: > > > > > > > >>>>>>> > > > > > > > >>>>>>>> Hi All, > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> Thanks for chipping in the discussion Ahmed! > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> Regarding using the REST API. Currently I'm leaning > > > towards > > > > > > > >>>>>> implementing > > > > > > > >>>>>>>> this feature inside the Flink itself, via some > pluggable > > > > > interface. > > > > > > > >>>>>>>> REST API solution would be tempting, but I guess not > > > > everyone > > > > > is > > > > > > > >>>>> using > > > > > > > >>>>>>>> Flink Kubernetes Operator. > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> @Dong > > > > > > > >>>>>>>> > > > > > > > >>>>>>>>> I am not sure metrics such as isBackPressured are > > already > > > > > sent to > > > > > > > >>>>> JM. > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> Fetching code path on the JM: > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> > > > > > > > >>>>>>> > > > > > > > >>>>>> > > > > > > > >>>>> > > > > > > > >>>> > > > > > > > >> > > > > > > > > > > > > > > > org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl#queryTmMetricsFuture > > > > > > > >>>>>>>> > > > > > > > >>>> > > > > > > org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore#add > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> Example code path accessing Task level metrics via JM > > > using > > > > > the > > > > > > > >>>>>>>> `MetricStore`: > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> > > > > > > > >>>>>>> > > > > > > > >>>>>> > > > > > > > >>>>> > > > > > > > >>>> > > > > > > > >> > > > > > > > > > > > > > > > org.apache.flink.runtime.rest.handler.job.metrics.AggregatingSubtasksMetricsHandler > > > > > > > >>>>>>>> > > > > > > > >>>>>>> > > > > > > > >>>>>>> Thanks for the code reference. I checked the code that > > > > invoked > > > > > these > > > > > > > >>>>> two > > > > > > > >>>>>>> classes and found the following information: > > > > > > > >>>>>>> > > > > > > > >>>>>>> - AggregatingSubtasksMetricsHandler#getStoresis > currently > > > > > invoked > > > > > > > >>>> only > > > > > > > >>>>>>> when AggregatingJobsMetricsHandler is invoked. > > > > > > > >>>>>>> - AggregatingJobsMetricsHandler is only instantiated > and > > > > > returned by > > > > > > > >>>>>>> WebMonitorEndpoint#initializeHandlers > > > > > > > >>>>>>> - WebMonitorEndpoint#initializeHandlers is only used by > > > > > > > >>>>>> RestServerEndpoint. > > > > > > > >>>>>>> And RestServerEndpoint invokes these handlers in > response > > > to > > > > > external > > > > > > > >>>>>> REST > > > > > > > >>>>>>> request. > > > > > > > >>>>>>> > > > > > > > >>>>>>> I understand that JM will get the backpressure-related > > > > metrics > > > > > every > > > > > > > >>>>> time > > > > > > > >>>>>>> the RestServerEndpoint receives the REST request to get > > > these > > > > > > > >>>> metrics. > > > > > > > >>>>>> But > > > > > > > >>>>>>> I am not sure if RestServerEndpoint is already always > > > > > receiving the > > > > > > > >>>>> REST > > > > > > > >>>>>>> metrics at regular interval (suppose there is no human > > > > manually > > > > > > > >>>>>>> opening/clicking the Flink Web UI). And if it does, > what > > is > > > > the > > > > > > > >>>>> interval? > > > > > > > >>>>>>> > > > > > > > >>>>>>> > > > > > > > >>>>>>> > > > > > > > >>>>>>>>> For example, let's say every source operator subtask > > > > reports > > > > > this > > > > > > > >>>>>>> metric > > > > > > > >>>>>>>> to > > > > > > > >>>>>>>>> JM once every 10 seconds. There are 100 source > > subtasks. > > > > And > > > > > each > > > > > > > >>>>>>> subtask > > > > > > > >>>>>>>>> is backpressured roughly 10% of the total time due to > > > > traffic > > > > > > > >>>>> spikes > > > > > > > >>>>>>> (and > > > > > > > >>>>>>>>> limited buffer). Then at any given time, there are 1 > - > > > > > 0.9^100 = > > > > > > > >>>>>>> 99.997% > > > > > > > >>>>>>>>> chance that there is at least one subtask that is > > > > > backpressured. > > > > > > > >>>>> Then > > > > > > > >>>>>>> we > > > > > > > >>>>>>>>> have to wait for at least 10 seconds to check again. > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> backPressuredTimeMsPerSecond and other related metrics > > > (like > > > > > > > >>>>>>>> busyTimeMsPerSecond) are not subject to that problem. > > > > > > > >>>>>>>> They are recalculated once every metric fetching > > interval, > > > > > and they > > > > > > > >>>>>>> report > > > > > > > >>>>>>>> accurately on average the given subtask spent > > > > > > > >>>>>> busy/idling/backpressured. > > > > > > > >>>>>>>> In your example, backPressuredTimeMsPerSecond would > > report > > > > > 100ms/s. > > > > > > > >>>>>>> > > > > > > > >>>>>>> > > > > > > > >>>>>>> Suppose every subtask is already reporting > > > > > > > >>>> backPressuredTimeMsPerSecond > > > > > > > >>>>>> to > > > > > > > >>>>>>> JM once every 100 ms. If a job has 10 operators (that > are > > > not > > > > > > > >>>> chained) > > > > > > > >>>>>> and > > > > > > > >>>>>>> each operator has 100 subtasks, then JM would need to > > > handle > > > > > 10000 > > > > > > > >>>>>> requests > > > > > > > >>>>>>> per second to receive metrics from these 1000 subtasks. > > It > > > > > seems > > > > > > > >>>> like a > > > > > > > >>>>>>> non-trivial overhead for medium-to-large sized jobs and > > can > > > > > make JM > > > > > > > >>>> the > > > > > > > >>>>>>> performance bottleneck during job execution. > > > > > > > >>>>>>> > > > > > > > >>>>>>> I would be surprised if Flink is already paying this > much > > > > > overhead > > > > > > > >>>> just > > > > > > > >>>>>> for > > > > > > > >>>>>>> metrics monitoring. That is the main reason I still > doubt > > > it > > > > > is true. > > > > > > > >>>>> Can > > > > > > > >>>>>>> you show where this 100 ms is currently configured? > > > > > > > >>>>>>> > > > > > > > >>>>>>> Alternatively, maybe you mean that we should add extra > > code > > > > to > > > > > invoke > > > > > > > >>>>> the > > > > > > > >>>>>>> REST API at 100 ms interval. Then that means we need to > > > > > considerably > > > > > > > >>>>>>> increase the network/cpu overhead at JM, where the > > overhead > > > > > will > > > > > > > >>>>> increase > > > > > > > >>>>>>> as the number of TM/slots increase, which may pose risk > > to > > > > the > > > > > > > >>>>>> scalability > > > > > > > >>>>>>> of the proposed design. I am not sure we should do > this. > > > What > > > > > do you > > > > > > > >>>>>> think? > > > > > > > >>>>>>> > > > > > > > >>>>>>> > > > > > > > >>>>>>>> > > > > > > > >>>>>>>>> While it will be nice to support additional use-cases > > > > > > > >>>>>>>>> with one proposal, it is probably also reasonable to > > make > > > > > > > >>>>> incremental > > > > > > > >>>>>>>>> progress and support the low-hanging-fruit use-case > > > first. > > > > > The > > > > > > > >>>>> choice > > > > > > > >>>>>>>>> really depends on the complexity and the importance > of > > > > > supporting > > > > > > > >>>>> the > > > > > > > >>>>>>>> extra > > > > > > > >>>>>>>>> use-cases. > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> That would be true, if that was a private > implementation > > > > > detail or > > > > > > > >>>> if > > > > > > > >>>>>> the > > > > > > > >>>>>>>> low-hanging-fruit-solution would be on the direct path > > to > > > > the > > > > > final > > > > > > > >>>>>>>> solution. > > > > > > > >>>>>>>> That's unfortunately not the case here. This will add > > > public > > > > > facing > > > > > > > >>>>>> API, > > > > > > > >>>>>>>> that we will later need to maintain, no matter what > the > > > > final > > > > > > > >>>>> solution > > > > > > > >>>>>>> will > > > > > > > >>>>>>>> be, > > > > > > > >>>>>>>> and at the moment at least I don't see it being > related > > > to a > > > > > > > >>>>> "perfect" > > > > > > > >>>>>>>> solution. > > > > > > > >>>>>>> > > > > > > > >>>>>>> > > > > > > > >>>>>>> Sure. Then let's decide the final solution first. > > > > > > > >>>>>>> > > > > > > > >>>>>>> > > > > > > > >>>>>>>>> I guess the point is that the suggested approach, > which > > > > > > > >>>> dynamically > > > > > > > >>>>>>>>> determines the checkpointing interval based on the > > > > > backpressure, > > > > > > > >>>>> may > > > > > > > >>>>>>>> cause > > > > > > > >>>>>>>>> regression when the checkpointing interval is > > relatively > > > > low. > > > > > > > >>>> This > > > > > > > >>>>>>> makes > > > > > > > >>>>>>>> it > > > > > > > >>>>>>>>> hard for users to enable this feature in production. > It > > > is > > > > > like > > > > > > > >>>> an > > > > > > > >>>>>>>>> auto-driving system that is not guaranteed to work > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> Yes, creating a more generic solution that would > require > > > > less > > > > > > > >>>>>>> configuration > > > > > > > >>>>>>>> is usually more difficult then static configurations. > > > > > > > >>>>>>>> It doesn't mean we shouldn't try to do that. > Especially > > > that > > > > > if my > > > > > > > >>>>>>> proposed > > > > > > > >>>>>>>> algorithm wouldn't work good enough, there is > > > > > > > >>>>>>>> an obvious solution, that any source could add a > metric, > > > > like > > > > > let > > > > > > > >>>> say > > > > > > > >>>>>>>> "processingBacklog: true/false", and the > > > `CheckpointTrigger` > > > > > > > >>>>>>>> could use this as an override to always switch to the > > > > > > > >>>>>>>> "slowCheckpointInterval". I don't think we need it, > but > > > > that's > > > > > > > >>>> always > > > > > > > >>>>>> an > > > > > > > >>>>>>>> option > > > > > > > >>>>>>>> that would be basically equivalent to your original > > > > proposal. > > > > > Or > > > > > > > >>>> even > > > > > > > >>>>>>>> source could add "suggestedCheckpointInterval : int", > > and > > > > > > > >>>>>>>> `CheckpointTrigger` could use that value if present > as a > > > > hint > > > > > in > > > > > > > >>>> one > > > > > > > >>>>>> way > > > > > > > >>>>>>> or > > > > > > > >>>>>>>> another. > > > > > > > >>>>>>>> > > > > > > > >>>>>>> > > > > > > > >>>>>>> So far we have talked about the possibility of using > > > > > > > >>>> CheckpointTrigger > > > > > > > >>>>>> and > > > > > > > >>>>>>> mentioned the CheckpointTrigger > > > > > > > >>>>>>> and read metric values. > > > > > > > >>>>>>> > > > > > > > >>>>>>> Can you help answer the following questions so that I > can > > > > > understand > > > > > > > >>>>> the > > > > > > > >>>>>>> alternative solution more concretely: > > > > > > > >>>>>>> > > > > > > > >>>>>>> - What is the interface of this CheckpointTrigger? For > > > > > example, are > > > > > > > >>>> we > > > > > > > >>>>>>> going to give CheckpointTrigger a context that it can > use > > > to > > > > > fetch > > > > > > > >>>>>>> arbitrary metric values? This can help us understand > what > > > > > information > > > > > > > >>>>>> this > > > > > > > >>>>>>> user-defined CheckpointTrigger can use to make the > > > checkpoint > > > > > > > >>>> decision. > > > > > > > >>>>>>> - Where is this CheckpointTrigger running? For example, > > is > > > it > > > > > going > > > > > > > >>>> to > > > > > > > >>>>>> run > > > > > > > >>>>>>> on the subtask of every source operator? Or is it going > > to > > > > run > > > > > on the > > > > > > > >>>>> JM? > > > > > > > >>>>>>> - Are we going to provide a default implementation of > > this > > > > > > > >>>>>>> CheckpointTrigger in Flink that implements the > algorithm > > > > > described > > > > > > > >>>>> below, > > > > > > > >>>>>>> or do we expect each source operator developer to > > implement > > > > > their own > > > > > > > >>>>>>> CheckpointTrigger? > > > > > > > >>>>>>> - How can users specify the > > > > > > > >>>>>> fastCheckpointInterval/slowCheckpointInterval? > > > > > > > >>>>>>> For example, will we provide APIs on the > > CheckpointTrigger > > > > that > > > > > > > >>>>> end-users > > > > > > > >>>>>>> can use to specify the checkpointing interval? What > would > > > > that > > > > > look > > > > > > > >>>>> like? > > > > > > > >>>>>>> > > > > > > > >>>>>>> Overall, my gut feel is that the alternative approach > > based > > > > on > > > > > > > >>>>>>> CheckpointTrigger is more complicated and harder to > use. > > > And > > > > it > > > > > > > >>>>> probably > > > > > > > >>>>>>> also has the issues of "having two places to configure > > > > > checkpointing > > > > > > > >>>>>>> interval" and "giving flexibility for every source to > > > > > implement a > > > > > > > >>>>>> different > > > > > > > >>>>>>> API" (as mentioned below). > > > > > > > >>>>>>> > > > > > > > >>>>>>> Maybe we can evaluate it more after knowing the answers > > to > > > > the > > > > > above > > > > > > > >>>>>>> questions. > > > > > > > >>>>>>> > > > > > > > >>>>>>> > > > > > > > >>>>>>> > > > > > > > >>>>>>>> > > > > > > > >>>>>>>>> On the other hand, the approach currently proposed in > > the > > > > > FLIP is > > > > > > > >>>>>> much > > > > > > > >>>>>>>>> simpler as it does not depend on backpressure. Users > > > > specify > > > > > the > > > > > > > >>>>>> extra > > > > > > > >>>>>>>>> interval requirement on the specific sources (e.g. > > > > > HybridSource, > > > > > > > >>>>>> MySQL > > > > > > > >>>>>>>> CDC > > > > > > > >>>>>>>>> Source) and can easily know the checkpointing > interval > > > will > > > > > be > > > > > > > >>>> used > > > > > > > >>>>>> on > > > > > > > >>>>>>>> the > > > > > > > >>>>>>>>> continuous phase of the corresponding source. This is > > > > pretty > > > > > much > > > > > > > >>>>>> same > > > > > > > >>>>>>> as > > > > > > > >>>>>>>>> how users use the existing > > > execution.checkpointing.interval > > > > > > > >>>> config. > > > > > > > >>>>>> So > > > > > > > >>>>>>>>> there is no extra concern of regression caused by > this > > > > > approach. > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> To an extent, but as I have already previously > > mentioned I > > > > > really > > > > > > > >>>>>> really > > > > > > > >>>>>>> do > > > > > > > >>>>>>>> not like idea of: > > > > > > > >>>>>>>> - having two places to configure checkpointing > interval > > > > > (config > > > > > > > >>>>> file > > > > > > > >>>>>>> and > > > > > > > >>>>>>>> in the Source builders) > > > > > > > >>>>>>>> - giving flexibility for every source to implement a > > > > different > > > > > > > >>>> API > > > > > > > >>>>>> for > > > > > > > >>>>>>>> that purpose > > > > > > > >>>>>>>> - creating a solution that is not generic enough, so > > that > > > we > > > > > will > > > > > > > >>>>>> need > > > > > > > >>>>>>> a > > > > > > > >>>>>>>> completely different mechanism in the future anyway > > > > > > > >>>>>>>> > > > > > > > >>>>>>> > > > > > > > >>>>>>> Yeah, I understand different developers might have > > > different > > > > > > > >>>>>>> concerns/tastes for these APIs. Ultimately, there might > > not > > > > be > > > > > a > > > > > > > >>>>> perfect > > > > > > > >>>>>>> solution and we have to choose based on the pros/cons > of > > > > these > > > > > > > >>>>> solutions. > > > > > > > >>>>>>> > > > > > > > >>>>>>> I agree with you that, all things being equal, it is > > > > > preferable to 1) > > > > > > > >>>>>> have > > > > > > > >>>>>>> one place to configure checkpointing intervals, 2) have > > all > > > > > source > > > > > > > >>>>>>> operators use the same API, and 3) create a solution > that > > > is > > > > > generic > > > > > > > >>>>> and > > > > > > > >>>>>>> last lasting. Note that these three goals affects the > > > > > usability and > > > > > > > >>>>>>> extensibility of the API, but not necessarily the > > > > > > > >>>> stability/performance > > > > > > > >>>>>> of > > > > > > > >>>>>>> the production job. > > > > > > > >>>>>>> > > > > > > > >>>>>>> BTW, there are also other preferrable goals. For > example, > > > it > > > > > is very > > > > > > > >>>>>> useful > > > > > > > >>>>>>> for the job's behavior to be predictable and > > interpretable > > > so > > > > > that > > > > > > > >>>> SRE > > > > > > > >>>>>> can > > > > > > > >>>>>>> operator/debug the Flink in an easier way. We can list > > > these > > > > > > > >>>> pros/cons > > > > > > > >>>>>>> altogether later. > > > > > > > >>>>>>> > > > > > > > >>>>>>> I am wondering if we can first agree on the priority of > > > goals > > > > > we want > > > > > > > >>>>> to > > > > > > > >>>>>>> achieve. IMO, it is a hard-requirement for the > > user-facing > > > > API > > > > > to be > > > > > > > >>>>>>> clearly defined and users should be able to use the API > > > > without > > > > > > > >>>> concern > > > > > > > >>>>>> of > > > > > > > >>>>>>> regression. And this requirement is more important than > > the > > > > > other > > > > > > > >>>> goals > > > > > > > >>>>>>> discussed above because it is related to the > > > > > stability/performance of > > > > > > > >>>>> the > > > > > > > >>>>>>> production job. What do you think? > > > > > > > >>>>>>> > > > > > > > >>>>>>> > > > > > > > >>>>>>>> > > > > > > > >>>>>>>>> Sounds good. Looking forward to learning more ideas. > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> I have thought about this a bit more, and I think we > > don't > > > > > need to > > > > > > > >>>>>> check > > > > > > > >>>>>>>> for the backpressure status, or how much overloaded > all > > of > > > > the > > > > > > > >>>>>> operators > > > > > > > >>>>>>>> are. > > > > > > > >>>>>>>> We could just check three things for source operators: > > > > > > > >>>>>>>> 1. pendingRecords (backlog length) > > > > > > > >>>>>>>> 2. numRecordsInPerSecond > > > > > > > >>>>>>>> 3. backPressuredTimeMsPerSecond > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> // int metricsUpdateInterval = 10s // obtained from > > config > > > > > > > >>>>>>>> // Next line calculates how many records can we > consume > > > from > > > > > the > > > > > > > >>>>>> backlog, > > > > > > > >>>>>>>> assuming > > > > > > > >>>>>>>> // that magically the reason behind a backpressure > > > vanishes. > > > > > We > > > > > > > >>>> will > > > > > > > >>>>>> use > > > > > > > >>>>>>>> this only as > > > > > > > >>>>>>>> // a safeguard against scenarios like for example if > > > > > backpressure > > > > > > > >>>>> was > > > > > > > >>>>>>>> caused by some > > > > > > > >>>>>>>> // intermittent failure/performance degradation. > > > > > > > >>>>>>>> maxRecordsConsumedWithoutBackpressure = > > > > > (numRecordsInPerSecond / > > > > > > > >>>>> (1000 > > > > > > > >>>>>>>> - backPressuredTimeMsPerSecond / 1000)) * > > > > > metricsUpdateInterval > > > > > > > >>>>>>>> > > > > > > > >>>>>>> > > > > > > > >>>>>>> I am not sure if there is a typo. Because if > > > > > > > >>>>>> backPressuredTimeMsPerSecond = > > > > > > > >>>>>>> 0, then maxRecordsConsumedWithoutBackpressure = > > > > > > > >>>> numRecordsInPerSecond / > > > > > > > >>>>>>> 1000 * metricsUpdateInterval according to the above > > > > algorithm. > > > > > > > >>>>>>> > > > > > > > >>>>>>> Do you mean "maxRecordsConsumedWithoutBackpressure = > > > > > > > >>>>>> (numRecordsInPerSecond > > > > > > > >>>>>>> / (1 - backPressuredTimeMsPerSecond / 1000)) * > > > > > > > >>>> metricsUpdateInterval"? > > > > > > > >>>>>>> > > > > > > > >>>>>>> > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> // we are excluding > > maxRecordsConsumedWithoutBackpressure > > > > > from the > > > > > > > >>>>>>> backlog > > > > > > > >>>>>>>> as > > > > > > > >>>>>>>> // a safeguard against an intermittent back pressure > > > > > problems, so > > > > > > > >>>>> that > > > > > > > >>>>>> we > > > > > > > >>>>>>>> don't > > > > > > > >>>>>>>> // calculate next checkpoint interval far far in the > > > future, > > > > > while > > > > > > > >>>>> the > > > > > > > >>>>>>>> backpressure > > > > > > > >>>>>>>> // goes away before we will recalculate metrics and > new > > > > > > > >>>> checkpointing > > > > > > > >>>>>>>> interval > > > > > > > >>>>>>>> timeToConsumeBacklog = (pendingRecords - > > > > > > > >>>>>>>> maxRecordsConsumedWithoutBackpressure) / > > > > numRecordsInPerSecond > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> Then we can use those numbers to calculate desired > > > > > checkpointed > > > > > > > >>>>>> interval > > > > > > > >>>>>>>> for example like this: > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> long calculatedCheckpointInterval = > > timeToConsumeBacklog / > > > > 10; > > > > > > > >>>> //this > > > > > > > >>>>>> may > > > > > > > >>>>>>>> need some refining > > > > > > > >>>>>>>> long nextCheckpointInterval = > > > > min(max(fastCheckpointInterval, > > > > > > > >>>>>>>> calculatedCheckpointInterval), > slowCheckpointInterval); > > > > > > > >>>>>>>> long nextCheckpointTs = lastCheckpointTs + > > > > > nextCheckpointInterval; > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> WDYT? > > > > > > > >>>>>>> > > > > > > > >>>>>>> > > > > > > > >>>>>>> I think the idea of the above algorithm is to incline > to > > > use > > > > > the > > > > > > > >>>>>>> fastCheckpointInterval unless we are very sure the > > backlog > > > > > will take > > > > > > > >>>> a > > > > > > > >>>>>> long > > > > > > > >>>>>>> time to process. This can alleviate the concern of > > > regression > > > > > during > > > > > > > >>>>> the > > > > > > > >>>>>>> continuous_bounded phase since we are more likely to > use > > > the > > > > > > > >>>>>>> fastCheckpointInterval. However, it can cause > regression > > > > > during the > > > > > > > >>>>>> bounded > > > > > > > >>>>>>> phase. > > > > > > > >>>>>>> > > > > > > > >>>>>>> I will use a concrete example to explain the risk of > > > > > regression: > > > > > > > >>>>>>> - The user is using HybridSource to read from HDFS > > followed > > > > by > > > > > Kafka. > > > > > > > >>>>> The > > > > > > > >>>>>>> data in HDFS is old and there is no need for data > > freshness > > > > > for the > > > > > > > >>>>> data > > > > > > > >>>>>> in > > > > > > > >>>>>>> HDFS. > > > > > > > >>>>>>> - The user configures the job as below: > > > > > > > >>>>>>> - fastCheckpointInterval = 3 minutes > > > > > > > >>>>>>> - slowCheckpointInterval = 30 minutes > > > > > > > >>>>>>> - metricsUpdateInterval = 100 ms > > > > > > > >>>>>>> > > > > > > > >>>>>>> Using the above formulate, we can know that once > > > > pendingRecords > > > > > > > >>>>>>> <= numRecordsInPerSecond * 30-minutes, then > > > > > > > >>>>> calculatedCheckpointInterval > > > > > > > >>>>>> <= > > > > > > > >>>>>>> 3 minutes, meaning that we will use > > slowCheckpointInterval > > > as > > > > > the > > > > > > > >>>>>>> checkpointing interval. Then in the last 30 minutes of > > the > > > > > bounded > > > > > > > >>>>> phase, > > > > > > > >>>>>>> the checkpointing frequency will be 10X higher than > what > > > the > > > > > user > > > > > > > >>>>> wants. > > > > > > > >>>>>>> > > > > > > > >>>>>>> Also note that the same issue would also considerably > > limit > > > > the > > > > > > > >>>>> benefits > > > > > > > >>>>>> of > > > > > > > >>>>>>> the algorithm. For example, during the continuous > phase, > > > the > > > > > > > >>>> algorithm > > > > > > > >>>>>> will > > > > > > > >>>>>>> only be better than the approach in FLIP-309 when there > > is > > > at > > > > > least > > > > > > > >>>>>>> 30-minutes worth of backlog in the source. > > > > > > > >>>>>>> > > > > > > > >>>>>>> Sure, having a slower checkpointing interval in this > > > extreme > > > > > case > > > > > > > >>>>> (where > > > > > > > >>>>>>> there is 30-minutes backlog in the continous-unbounded > > > phase) > > > > > is > > > > > > > >>>> still > > > > > > > >>>>>>> useful when this happens. But since this is the > un-common > > > > > case, and > > > > > > > >>>> the > > > > > > > >>>>>>> right solution is probably to do capacity planning to > > avoid > > > > > this from > > > > > > > >>>>>>> happening in the first place, I am not sure it is worth > > > > > optimizing > > > > > > > >>>> for > > > > > > > >>>>>> this > > > > > > > >>>>>>> case at the cost of regression in the bounded phase and > > the > > > > > reduced > > > > > > > >>>>>>> operational predictability for users (e.g. what > > > checkpointing > > > > > > > >>>> interval > > > > > > > >>>>>>> should I expect at this stage of the job). > > > > > > > >>>>>>> > > > > > > > >>>>>>> I think the fundamental issue with this algorithm is > that > > > it > > > > is > > > > > > > >>>> applied > > > > > > > >>>>>> to > > > > > > > >>>>>>> both the bounded phases and the continous_unbounded > > phases > > > > > without > > > > > > > >>>>>> knowing > > > > > > > >>>>>>> which phase the job is running at. The only information > > it > > > > can > > > > > access > > > > > > > >>>>> is > > > > > > > >>>>>>> the backlog. But two sources with the same amount of > > > backlog > > > > > do not > > > > > > > >>>>>>> necessarily mean they have the same data freshness > > > > requirement. > > > > > > > >>>>>>> > > > > > > > >>>>>>> In this particular example, users know that the data in > > > HDFS > > > > > is very > > > > > > > >>>>> old > > > > > > > >>>>>>> and there is no need for data freshness. Users can > > express > > > > > signals > > > > > > > >>>> via > > > > > > > >>>>>> the > > > > > > > >>>>>>> per-source API proposed in the FLIP. This is why the > > > current > > > > > approach > > > > > > > >>>>> in > > > > > > > >>>>>>> FLIP-309 can be better in this case. > > > > > > > >>>>>>> > > > > > > > >>>>>>> What do you think? > > > > > > > >>>>>>> > > > > > > > >>>>>>> Best, > > > > > > > >>>>>>> Dong > > > > > > > >>>>>>> > > > > > > > >>>>>>> > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> Best, > > > > > > > >>>>>>>> Piotrek > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> > > > > > > > >>>>>>> > > > > > > > >>>>>> > > > > > > > >>>>> > > > > > > > >>>> > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >