Hi Piotr and everyone, I have documented the vision with a summary of the existing work in this doc. Please feel free to review/comment/edit this doc. Looking forward to working with you together in this line of work.
https://docs.google.com/document/d/1CgxXvPdAbv60R9yrrQAwaRgK3aMAgAL7RPPr799tOsQ/edit?usp=sharing Best, Dong On Tue, Jul 11, 2023 at 1:07 AM Piotr Nowojski <piotr.nowoj...@gmail.com> wrote: > Hi All, > > Me and Dong chatted offline about the above mentioned issues (thanks for > that offline chat > I think it helped both of us a lot). The summary is below. > > > Previously, I thought you meant to add a generic logic in > SourceReaderBase > > to read existing metrics (e.g. backpressure) and emit the > > IsProcessingBacklogEvent to SourceCoordinator. I am sorry if I have > > misunderstood your suggetions. > > > > After double-checking your previous suggestion, I am wondering if you are > > OK with the following approach: > > > > - Add a job-level config execution.checkpointing.interval-during-backlog > > - Add an API SourceReaderContext#setProcessingBacklog(boolean > > isProcessingBacklog). > > - When this API is invoked, it internally sends an > > internal SourceReaderBacklogEvent to SourceCoordinator. > > - SourceCoordinator should keep track of the latest isProcessingBacklog > > status from all its subtasks. And for now, we will hardcode the logic > such > > that if any source reader says it is under backlog, then > > execution.checkpointing.interval-during-backlog is used. > > > > This approach looks good to me as it can achieve the same performance > with > > the same number of public APIs for the target use-case. And I suppose in > > the future we might be able to re-use this API for source reader to set > its > > backlog status based on its backpressure metrics, which could be an extra > > advantage over the current approach. > > > > Do you think we can agree to adopt the approach described above? > > Yes, I think that's a viable approach. I would be perfectly fine to not > introduce > `SourceReaderContext#setProcessingBacklog(boolean isProcessingBacklog).` > and sending the `SourceReaderBacklogEvent` from SourceReader to JM > in this FLIP. It could be implemented once we would decide to add some more > generic > ways of detecting backlog/backpressure on the SourceReader level. > > I think we could also just keep the current proposal of adding > `SplitEnumeratorContext#setIsProcessingBacklog`, and use it in the sources > that > can set it on the `SplitEnumerator` level. Later we could merge this with > another > mechanisms of detecting "isProcessingBacklog", like based on watermark lag, > backpressure, etc, via some component running on the JM. > > At the same time I'm fine with having the "isProcessingBacklog" concept to > switch > runtime back and forth between high and low latency modes instead of > "backpressure". In FLIP-325 I have asked: > > > I think there is one thing that hasn't been discussed neither here nor in > FLIP-309. Given that we have > > three dimensions: > > - e2e latency/checkpointing interval > > - enabling some kind of batching/buffering on the operator level > > - how much resources we want to allocate to the job > > > > How do we want Flink to adjust itself between those three? For example: > > a) Should we assume that given Job has a fixed amount of assigned > resources and make it paramount that > > Flink doesn't exceed those available resources? So in case of > backpressure, we > > should extend checkpointing intervals, emit records less frequently and > in batches. > > b) Or should we assume that the amount of resources is flexible (up to a > point?), and the desired e2e latency > > is the paramount aspect? So in case of backpressure, we should still > adhere to the configured e2e latency, > > and wait for the user or autoscaler to scale up the job? > > > > In case of a), I think the concept of "isProcessingBacklog" is not > needed, we could steer the behaviour only > > using the backpressure information. > > > > On the other hand, in case of b), "isProcessingBacklog" information might > be helpful, to let Flink know that > > we can safely decrease the e2e latency/checkpoint interval even if there > is no backpressure, to use fewer > > resources (and let the autoscaler scale down the job). > > > > Do we want to have both, or only one of those? Do a) and b) complement > one another? If job is backpressured, > > we should follow a) and expose to autoscaler/users information "Hey! I'm > barely keeping up! I need more resources!". > > While, when there is no backpressure and latency doesn't matter > (isProcessingBacklog=true), we can limit the resource > > usage > > After thinking this over: > - the case that we don't have "isProcessingBacklog" information, but the > source operator is > back pressured, must be intermittent. EIther back pressure will go away, > or shortly we should > reach the "isProcessingBacklog" state anyway > - and even if we implement some back pressure detecting algorithm to switch > the runtime into the > "high latency mode", we can always report that as "isProcessingBacklog" > anyway, as runtime should > react the same way in both cases (backpressure and "isProcessingBacklog > states). > > =============== > > With a common understanding of the final solution that we want to have in > the future, I'm pretty much fine with the current > FLIP-309 proposal, with a couple of remarks: > 1. Could you include in the FLIP-309 the long term solution as we have > discussed. > a) Would be nice to have some diagram showing how the > "isProcessingBacklog" information would be travelling, > being aggregated and what will be done with that information. > (from SourceReader/SplitEnumerator to some > "component" aggregating it, and then ... ?) > 2. For me "processing backlog" doesn't necessarily equate to "backpressure" > (HybridSource can be > both NOT backpressured and processing backlog at the same time). If you > think the same way, can you include that > definition of "processing backlog" in the FLIP including its relation > to the backpressure state? If not, we need to align > on that definition first :) > > Also I'm missing a big picture description, that would show what are you > trying to achieve and what's the overarching vision > behind all of the current and future FLIPs that you are planning in this > area (FLIP-309, FLIP-325, FLIP-327, FLIP-331, ...?). > Or was it described somewhere and I've missed it? > > Best, > Piotrek > > > > czw., 6 lip 2023 o 06:25 Dong Lin <lindon...@gmail.com> napisał(a): > > > Hi Piotr, > > > > I am sorry if you feel unhappy or upset with us for not following/fixing > > your proposal. It is not my intention to give you this feeling. After > all, > > we are all trying to make Flink better, to support more use-case with the > > most maintainable code. I hope you understand that just like you, I have > > also been doing my best to think through various design options and > taking > > time to evalute the pros/cons. Eventually, we probably still need to > reach > > consensus by clearly listing and comparing the objective pros/cons of > > different proposals and identifying the best choice. > > > > Regarding your concern (or frustration) that we are always finding issues > > in your proposal, I would say it is normal (and probably necessary) for > > developers to find pros/cons in each other's solutions, so that we can > > eventually pick the right one. I will appreciate anyone who can correctly > > pinpoint the concrete issue in my proposal so that I can improve it or > > choose an alternative solution. > > > > Regarding your concern that we are not spending enough effort to find > > solutions and that the problem in your solution can be solved in a > minute, > > I would like to say that is not true. For each of your previous > proposals, > > I typically spent 1+ hours thinking through your proposal to understand > > whether it works and why it does not work, and another 1+ hour to write > > down the details and explain why it does not work. And I have had a > variety > > of offline discussions with my colleagues discussing various proposals > > (including yours) with 6+ hours in total. Maybe I am not capable enough > to > > fix those issues in one minute or so so. If you think your proposal can > be > > easily fixed in one minute or so, I would really appreciate it if you can > > think through your proposal and fix it in the first place :) > > > > For your information, I have had several long discussions with my > > colleagues at Alibaba and also Becket on this FLIP. We have seriously > > considered your proposals and discussed in detail what are the pros/cons > > and whether we can improve these solutions. The initial version of this > > FLIP (which allows the source operator to specify checkpoint intervals) > > does not get enough support due to concerns of not being generic (i.e. > > users need to specify checkpoint intervals on a per-source basis). It is > > only after I updated the FLIP to use the job-level > > execution.checkpointing.interval-during-backlog, then they agree to give > +1 > > to the FLIP. What I want to tell you is that your suggestions have been > > taken seriously, and the quality of the FLIP has been taken seriously > > by all those who have voted. As a result of taking your suggestion > > seriously and trying to find improvements, we updated the FLIP to use > > isProcessingBacklog. > > > > I am wondering, do you think it will be useful to discuss face-to-face > via > > video conference call? It is not just between you and me. We can invite > the > > developers who are interested to join and help with the discussion. That > > might improve communication efficiency and help us understand each other > > better :) > > > > I am writing this long email to hopefully get your understanding. I care > > much more about the quality of the eventual solution rather than who > > proposed the solution. Please bear with me and see my comments inline, > with > > an explanation of the pros/cons of these proposals. > > > > > > On Wed, Jul 5, 2023 at 11:06 PM Piotr Nowojski <piotr.nowoj...@gmail.com > > > > wrote: > > > > > Hi Guys, > > > > > > I would like to ask you again, to spend a bit more effort on trying to > > find > > > solutions, not just pointing out problems. For 1.5 months, > > > the discussion doesn't go in circle, but I'm suggesting a solution, you > > are > > > trying to undermine it with some arguments, I'm coming > > > back with a fix, often an extremely easy one, only for you to try to > find > > > yet another "issue". It doesn't bode well, if you are finding > > > a "problem" that can be solved with a minute or so of thinking or even > > has > > > already been solved. > > > > > > I have provided you so far with at least three distinct solutions that > > > could address your exact target use-case. Two [1][2] generic > > > enough to be probably good enough for the foreseeable future, one > > > intermediate and not generic [3] but which wouldn't > > > require @Public API changes or some custom hidden interfaces. > > > > > > > All in all: > > > - [1] with added metric hints like "isProcessingBacklog" solves your > > target > > > use case pretty well. Downside is having to improve > > > how JM is collecting/aggregating metrics > > > > > > > Here is my analysis of this proposal compared to the current approach in > > the FLIP-309. > > > > pros: > > - No need to add the public API > > SplitEnumeratorContext#setIsProcessingBacklog. > > cons: > > - Need to add a public API that subclasses of SourceReader can use to > > specify its IsProcessingBacklog metric value. > > - Source Coordinator needs to periodically pull the isProcessingBacklog > > metrics from all TMs throughout the job execution. > > > > Here is why I think the cons outweigh the pros: > > 1) JM needs to collect/aggregate metrics with extra runtime overhead, > which > > is not necessary for the target use-case with the push-based approach in > > FLIP-309. > > 2) For the target use-case, it is simpler and more intuitive for source > > operators (e.g. HybridSource, MySQL CDC source) to be able to set its > > isProcessingBacklog status in the SplitEnumerator. This is because the > > switch between bounded/unbounded stages happens in their SplitEnumerator. > > > > > > > > > - [2] is basically an equivalent of [1], replacing metrics with events. > > It > > > also is a superset of your proposal > > > > > > > Previously, I thought you meant to add a generic logic in > SourceReaderBase > > to read existing metrics (e.g. backpressure) and emit the > > IsProcessingBacklogEvent to SourceCoordinator. I am sorry if I have > > misunderstood your suggetions. > > > > After double-checking your previous suggestion, I am wondering if you are > > OK with the following approach: > > > > - Add a job-level config execution.checkpointing.interval-during-backlog > > - Add an API SourceReaderContext#setProcessingBacklog(boolean > > isProcessingBacklog). > > - When this API is invoked, it internally sends an > > internal SourceReaderBacklogEvent to SourceCoordinator. > > - SourceCoordinator should keep track of the latest isProcessingBacklog > > status from all its subtasks. And for now, we will hardcode the logic > such > > that if any source reader says it is under backlog, then > > execution.checkpointing.interval-during-backlog is used. > > > > This approach looks good to me as it can achieve the same performance > with > > the same number of public APIs for the target use-case. And I suppose in > > the future we might be able to re-use this API for source reader to set > its > > backlog status based on its backpressure metrics, which could be an extra > > advantage over the current approach. > > > > Do you think we can agree to adopt the approach described above? > > > > > > - [3] yes, it's hacky, but it's a solution that could be thrown away once > > > we implement [1] or [2] . The only real theoretical > > > downside is that it cannot control the long checkpoint exactly (short > > > checkpoint interval has to be a divisor of the long checkpoint > > > interval, but I simply can not imagine a practical use where that > would > > > be a blocker for a user. Please..., someone wanting to set > > > short checkpoint interval to 3min and long to 7 minutes, and that > > someone > > > can not accept the long interval to be 9 minutes? > > > And that's even ignoring the fact that if someone has an issue with > > the 3 > > > minutes checkpoint interval, I can hardly think that merely > > > doubling the interval to 7 minutes would significantly solve any > > problem > > > for that user. > > > > > > > Yes, this is a fabricated example that shows > > execution.checkpointing.interval-during-backlog might not be accurately > > enforced with this option. I think you are probably right that it might > not > > matter that much. I just think we should try our best to make Flink > public > > API's semantics (including configuration) clear, simple, and enforceable. > > If we can make the user-facing configuration enforceable at the cost of > an > > extra developer facing API (i.e. setProcessingBacklog(...)), I would > prefer > > to do this. > > > > It seems that we both agree that option [2] is better than [3]. I will > skip > > the further comments for this option and we can probably focus on > > option [2] :) > > > > > > > Dong a long time ago you wrote: > > > > Sure. Then let's decide the final solution first. > > > > > > Have you thought about that? Maybe I'm wrong but I don't remember you > > > describing in any of your proposals how they could be > > > extended in the future, to cover more generic cases. Regardless if you > > > either don't believe in the generic solution or struggle to > > > > > > > Yes, I have thought about the plan to extend the current FLIP to support > > metrics (e.g. backpressure) based solution you described earlier. > Actually, > > I mentioned multiple times in the earlier email that your suggestion of > > using metrics is valuable and I will do this in a follow-up FLIP. > > > > Here are my comments from the previous email: > > - See "I will add follow-up FLIPs to make use of the event-time metrics > and > > backpressure metrics" from Jul 3, 2023, 6:39 PM > > - See "I agree it is valuable" from Jul 1, 2023, 11:00 PM > > - See "we will create a followup FLIP (probably in FLIP-328)" from Jun > 29, > > 2023, 11:01 AM > > > > Frankly speaking, I think the idea around using the backpressure metrics > > still needs a bit more thinking before we can propose a FLIP. But I am > > pretty sure we can make use of the watermark/event-time to determine the > > backlog status. > > > > grasp it, if you can come back with something that can be easily extended > > > in the future, up to a point where one could implement > > > something similar to this backpressure detecting algorithm that I > > mentioned > > > many times before, I would be happy to discuss and > > > support it. > > > > > > > Here is my idea of extending the source reader to support > event-time-based > > backlog detecting algorithms: > > > > - Add a job-level config such as watermark-lag-threshold-for-backlog. If > > any source reader determines that the event-timestamp is available and > the > > system-time - watermark exceeds this threshold, then the source reader > > considers its isProcessingBacklog=true. > > - The source reader can send an event to the source coordinator. Note > that > > this might be doable in the SourceReaderBase without adding any public > API > > which the concrete SourceReader subclass needs to explicitly invoke. > > - And in the future if FLIP-325 is accepted, insteading of sending the > > event to SourceCoordinator and let SourceCoordinator inform the > checkpoint > > coordinator, the source reader might just emit the information as part of > > the RecordAttributes and let the two-phase commit sink inform the > > checkpoint coordinator. > > > > Note that this is a sketch of the idea and it might need further > > improvement. I just hope you understand that we have thought about this > > idea and did quite a lot of thinking for these design options. If it is > OK > > with you, I hope we can make incremental progress and discuss the > > metrics-based solution separately in a follow-up FLIP. > > > > Last but not least, thanks for taking so much time to leave comments and > > help us improve the FLIP. Please kindly bear with us in this discussion. > I > > am looking forward to collaborating with you to find the best design for > > the target use-cases. > > > > Best, > > Dong > > > > > > > Hang, about your points 1. and 2., do you think those problems are > > > insurmountable and blockers for that counter proposal? > > > > > > > 1. It is hard to find the error checkpoint. > > > > > > No it's not, please take a look at what I exactly proposed and maybe at > > the > > > code. > > > > > > > 2. (...) The failed checkpoint may make them think the job is > > unhealthy. > > > > > > Please read again what I wrote in [3]. I'm mentioning there a solution > > for > > > this exact "problem". > > > > > > About the necessity of the config value, I'm still not convinced that's > > > needed from the start, but yes we can add some config option > > > if you think otherwise. This option, if named properly, could be > re-used > > in > > > the future for different solutions, so that's fine by me. > > > > > > Best, > > > Piotrek > > > > > > [1] Introduced in my very first e-mail from 23 maj 2023, 16:26, and > > refined > > > later with point "2." in my e-mail from 16 June 2023, 17:58 > > > [2] Section "2. ===============" in my e-mail from 30 June 2023, 16:34 > > > [3] Section "3. ===============" in my e-mail from 30 June 2023, 16:34 > > > > > > All times in CEST. > > > > > > > > >