Hi Piotr and everyone,

I have documented the vision with a summary of the existing work in this
doc. Please feel free to review/comment/edit this doc. Looking forward to
working with you together in this line of work.

https://docs.google.com/document/d/1CgxXvPdAbv60R9yrrQAwaRgK3aMAgAL7RPPr799tOsQ/edit?usp=sharing

Best,
Dong

On Tue, Jul 11, 2023 at 1:07 AM Piotr Nowojski <piotr.nowoj...@gmail.com>
wrote:

> Hi All,
>
> Me and Dong chatted offline about the above mentioned issues (thanks for
> that offline chat
> I think it helped both of us a lot). The summary is below.
>
> > Previously, I thought you meant to add a generic logic in
> SourceReaderBase
> > to read existing metrics (e.g. backpressure) and emit the
> > IsProcessingBacklogEvent to SourceCoordinator. I am sorry if I have
> > misunderstood your suggetions.
> >
> > After double-checking your previous suggestion, I am wondering if you are
> > OK with the following approach:
> >
> > - Add a job-level config execution.checkpointing.interval-during-backlog
> > - Add an API SourceReaderContext#setProcessingBacklog(boolean
> > isProcessingBacklog).
> > - When this API is invoked, it internally sends an
> > internal SourceReaderBacklogEvent to SourceCoordinator.
> > - SourceCoordinator should keep track of the latest isProcessingBacklog
> > status from all its subtasks. And for now, we will hardcode the logic
> such
> > that if any source reader says it is under backlog, then
> > execution.checkpointing.interval-during-backlog is used.
> >
> > This approach looks good to me as it can achieve the same performance
> with
> > the same number of public APIs for the target use-case. And I suppose in
> > the future we might be able to re-use this API for source reader to set
> its
> > backlog status based on its backpressure metrics, which could be an extra
> > advantage over the current approach.
> >
> > Do you think we can agree to adopt the approach described above?
>
> Yes, I think that's a viable approach. I would be perfectly fine to not
> introduce
> `SourceReaderContext#setProcessingBacklog(boolean isProcessingBacklog).`
> and sending the `SourceReaderBacklogEvent` from SourceReader to JM
> in this FLIP. It could be implemented once we would decide to add some more
> generic
> ways of detecting backlog/backpressure on the SourceReader level.
>
> I think we could also just keep the current proposal of adding
> `SplitEnumeratorContext#setIsProcessingBacklog`, and use it in the sources
> that
> can set it on the `SplitEnumerator` level. Later we could merge this with
> another
> mechanisms of detecting "isProcessingBacklog", like based on watermark lag,
> backpressure, etc, via some component running on the JM.
>
> At the same time I'm fine with having the "isProcessingBacklog" concept to
> switch
> runtime back and forth between high and low latency modes instead of
> "backpressure". In FLIP-325 I have asked:
>
> > I think there is one thing that hasn't been discussed neither here nor in
> FLIP-309. Given that we have
> > three dimensions:
> > - e2e latency/checkpointing interval
> > - enabling some kind of batching/buffering on the operator level
> > - how much resources we want to allocate to the job
> >
> > How do we want Flink to adjust itself between those three? For example:
> > a) Should we assume that given Job has a fixed amount of assigned
> resources and make it paramount that
> >   Flink doesn't exceed those available resources? So in case of
> backpressure, we
> >   should extend checkpointing intervals, emit records less frequently and
> in batches.
> > b) Or should we assume that the amount of resources is flexible (up to a
> point?), and the desired e2e latency
> >   is the paramount aspect? So in case of backpressure, we should still
> adhere to the configured e2e latency,
> >   and wait for the user or autoscaler to scale up the job?
> >
> > In case of a), I think the concept of "isProcessingBacklog" is not
> needed, we could steer the behaviour only
> > using the backpressure information.
> >
> > On the other hand, in case of b), "isProcessingBacklog" information might
> be helpful, to let Flink know that
> > we can safely decrease the e2e latency/checkpoint interval even if there
> is no backpressure, to use fewer
> > resources (and let the autoscaler scale down the job).
> >
> > Do we want to have both, or only one of those? Do a) and b) complement
> one another? If job is backpressured,
> > we should follow a) and expose to autoscaler/users information "Hey! I'm
> barely keeping up! I need more resources!".
> > While, when there is no backpressure and latency doesn't matter
> (isProcessingBacklog=true), we can limit the resource
> > usage
>
> After thinking this over:
> - the case that we don't have "isProcessingBacklog" information, but the
> source operator is
>   back pressured, must be intermittent. EIther back pressure will go away,
> or shortly we should
>   reach the "isProcessingBacklog" state anyway
> - and even if we implement some back pressure detecting algorithm to switch
> the runtime into the
>   "high latency mode", we can always report that as "isProcessingBacklog"
> anyway, as runtime should
>    react the same way in both cases (backpressure and "isProcessingBacklog
> states).
>
> ===============
>
> With a common understanding of the final solution that we want to have in
> the future, I'm pretty much fine with the current
> FLIP-309 proposal, with a couple of remarks:
> 1. Could you include in the FLIP-309 the long term solution as we have
> discussed.
>         a) Would be nice to have some diagram showing how the
> "isProcessingBacklog" information would be travelling,
>              being aggregated and what will be done with that information.
> (from SourceReader/SplitEnumerator to some
>             "component" aggregating it, and then ... ?)
> 2. For me "processing backlog" doesn't necessarily equate to "backpressure"
> (HybridSource can be
>     both NOT backpressured and processing backlog at the same time). If you
> think the same way, can you include that
>     definition of "processing backlog" in the FLIP including its relation
> to the backpressure state? If not, we need to align
>     on that definition first :)
>
> Also I'm missing a big picture description, that would show what are you
> trying to achieve and what's the overarching vision
> behind all of the current and future FLIPs that you are planning in this
> area (FLIP-309, FLIP-325, FLIP-327, FLIP-331, ...?).
> Or was it described somewhere and I've missed it?
>
> Best,
> Piotrek
>
>
>
> czw., 6 lip 2023 o 06:25 Dong Lin <lindon...@gmail.com> napisał(a):
>
> > Hi Piotr,
> >
> > I am sorry if you feel unhappy or upset with us for not following/fixing
> > your proposal. It is not my intention to give you this feeling. After
> all,
> > we are all trying to make Flink better, to support more use-case with the
> > most maintainable code. I hope you understand that just like you, I have
> > also been doing my best to think through various design options and
> taking
> > time to evalute the pros/cons. Eventually, we probably still need to
> reach
> > consensus by clearly listing and comparing the objective pros/cons of
> > different proposals and identifying the best choice.
> >
> > Regarding your concern (or frustration) that we are always finding issues
> > in your proposal, I would say it is normal (and probably necessary) for
> > developers to find pros/cons in each other's solutions, so that we can
> > eventually pick the right one. I will appreciate anyone who can correctly
> > pinpoint the concrete issue in my proposal so that I can improve it or
> > choose an alternative solution.
> >
> > Regarding your concern that we are not spending enough effort to find
> > solutions and that the problem in your solution can be solved in a
> minute,
> > I would like to say that is not true. For each of your previous
> proposals,
> > I typically spent 1+ hours thinking through your proposal to understand
> > whether it works and why it does not work, and another 1+ hour to write
> > down the details and explain why it does not work. And I have had a
> variety
> > of offline discussions with my colleagues discussing various proposals
> > (including yours) with 6+ hours in total. Maybe I am not capable enough
> to
> > fix those issues in one minute or so so. If you think your proposal can
> be
> > easily fixed in one minute or so, I would really appreciate it if you can
> > think through your proposal and fix it in the first place :)
> >
> > For your information, I have had several long discussions with my
> > colleagues at Alibaba and also Becket on this FLIP. We have seriously
> > considered your proposals and discussed in detail what are the pros/cons
> > and whether we can improve these solutions. The initial version of this
> > FLIP (which allows the source operator to specify checkpoint intervals)
> > does not get enough support due to concerns of not being generic (i.e.
> > users need to specify checkpoint intervals on a per-source basis). It is
> > only after I updated the FLIP to use the job-level
> > execution.checkpointing.interval-during-backlog, then they agree to give
> +1
> > to the FLIP. What I want to tell you is that your suggestions have been
> > taken seriously, and the quality of the FLIP has been taken seriously
> > by all those who have voted. As a result of taking your suggestion
> > seriously and trying to find improvements, we updated the FLIP to use
> > isProcessingBacklog.
> >
> > I am wondering, do you think it will be useful to discuss face-to-face
> via
> > video conference call? It is not just between you and me. We can invite
> the
> > developers who are interested to join and help with the discussion. That
> > might improve communication efficiency and help us understand each other
> > better :)
> >
> > I am writing this long email to hopefully get your understanding. I care
> > much more about the quality of the eventual solution rather than who
> > proposed the solution. Please bear with me and see my comments inline,
> with
> > an explanation of the pros/cons of these proposals.
> >
> >
> > On Wed, Jul 5, 2023 at 11:06 PM Piotr Nowojski <piotr.nowoj...@gmail.com
> >
> > wrote:
> >
> > > Hi Guys,
> > >
> > > I would like to ask you again, to spend a bit more effort on trying to
> > find
> > > solutions, not just pointing out problems. For 1.5 months,
> > > the discussion doesn't go in circle, but I'm suggesting a solution, you
> > are
> > > trying to undermine it with some arguments, I'm coming
> > > back with a fix, often an extremely easy one, only for you to try to
> find
> > > yet another "issue". It doesn't bode well, if you are finding
> > > a "problem" that can be solved with a minute or so of thinking or even
> > has
> > > already been solved.
> > >
> > > I have provided you so far with at least three distinct solutions that
> > > could address your exact target use-case. Two [1][2] generic
> > > enough to be probably good enough for the foreseeable future, one
> > > intermediate and not generic [3] but which wouldn't
> > > require @Public API changes or some custom hidden interfaces.
> >
> >
> > > All in all:
> > > - [1] with added metric hints like "isProcessingBacklog" solves your
> > target
> > > use case pretty well. Downside is having to improve
> > >   how JM is collecting/aggregating metrics
> > >
> >
> > Here is my analysis of this proposal compared to the current approach in
> > the FLIP-309.
> >
> > pros:
> > - No need to add the public API
> > SplitEnumeratorContext#setIsProcessingBacklog.
> > cons:
> > - Need to add a public API that subclasses of SourceReader can use to
> > specify its IsProcessingBacklog metric value.
> > - Source Coordinator needs to periodically pull the isProcessingBacklog
> > metrics from all TMs throughout the job execution.
> >
> > Here is why I think the cons outweigh the pros:
> > 1) JM needs to collect/aggregate metrics with extra runtime overhead,
> which
> > is not necessary for the target use-case with the push-based approach in
> > FLIP-309.
> > 2) For the target use-case, it is simpler and more intuitive for source
> > operators (e.g. HybridSource, MySQL CDC source) to be able to set its
> > isProcessingBacklog status in the SplitEnumerator. This is because the
> > switch between bounded/unbounded stages happens in their SplitEnumerator.
> >
> >
> >
> > > - [2] is basically an equivalent of [1], replacing metrics with events.
> > It
> > > also is a superset of your proposal
> > >
> >
> > Previously, I thought you meant to add a generic logic in
> SourceReaderBase
> > to read existing metrics (e.g. backpressure) and emit the
> > IsProcessingBacklogEvent to SourceCoordinator. I am sorry if I have
> > misunderstood your suggetions.
> >
> > After double-checking your previous suggestion, I am wondering if you are
> > OK with the following approach:
> >
> > - Add a job-level config execution.checkpointing.interval-during-backlog
> > - Add an API SourceReaderContext#setProcessingBacklog(boolean
> > isProcessingBacklog).
> > - When this API is invoked, it internally sends an
> > internal SourceReaderBacklogEvent to SourceCoordinator.
> > - SourceCoordinator should keep track of the latest isProcessingBacklog
> > status from all its subtasks. And for now, we will hardcode the logic
> such
> > that if any source reader says it is under backlog, then
> > execution.checkpointing.interval-during-backlog is used.
> >
> > This approach looks good to me as it can achieve the same performance
> with
> > the same number of public APIs for the target use-case. And I suppose in
> > the future we might be able to re-use this API for source reader to set
> its
> > backlog status based on its backpressure metrics, which could be an extra
> > advantage over the current approach.
> >
> > Do you think we can agree to adopt the approach described above?
> >
> >
> > - [3] yes, it's hacky, but it's a solution that could be thrown away once
> > > we implement [1] or [2] . The only real theoretical
> > >   downside is that it cannot control the long checkpoint exactly (short
> > > checkpoint interval has to be a divisor of the long checkpoint
> > >   interval, but I simply can not imagine a practical use where that
> would
> > > be a blocker for a user. Please..., someone wanting to set
> > >   short checkpoint interval to 3min and long to 7 minutes, and that
> > someone
> > > can not accept the long interval to be 9 minutes?
> > >   And that's even ignoring the fact that if someone has an issue with
> > the 3
> > > minutes checkpoint interval, I can hardly think that merely
> > >   doubling the interval to 7 minutes would significantly solve any
> > problem
> > > for that user.
> > >
> >
> > Yes, this is a fabricated example that shows
> > execution.checkpointing.interval-during-backlog might not be accurately
> > enforced with this option. I think you are probably right that it might
> not
> > matter that much. I just think we should try our best to make Flink
> public
> > API's semantics (including configuration) clear, simple, and enforceable.
> > If we can make the user-facing configuration enforceable at the cost of
> an
> > extra developer facing API (i.e. setProcessingBacklog(...)), I would
> prefer
> > to do this.
> >
> > It seems that we both agree that option [2] is better than [3]. I will
> skip
> > the further comments for this option and we can probably focus on
> > option [2] :)
> >
> >
> > > Dong a long time ago you wrote:
> > > > Sure. Then let's decide the final solution first.
> > >
> > > Have you thought about that? Maybe I'm wrong but I don't remember you
> > > describing in any of your proposals how they could be
> > > extended in the future, to cover more generic cases. Regardless if you
> > > either don't believe in the generic solution or struggle to
> > >
> >
> > Yes, I have thought about the plan to extend the current FLIP to support
> > metrics (e.g. backpressure) based solution you described earlier.
> Actually,
> > I mentioned multiple times in the earlier email that your suggestion of
> > using metrics is valuable and I will do this in a follow-up FLIP.
> >
> > Here are my comments from the previous email:
> > - See "I will add follow-up FLIPs to make use of the event-time metrics
> and
> > backpressure metrics" from Jul 3, 2023, 6:39 PM
> > - See "I agree it is valuable" from Jul 1, 2023, 11:00 PM
> > - See "we will create a followup FLIP (probably in FLIP-328)" from Jun
> 29,
> > 2023, 11:01 AM
> >
> > Frankly speaking, I think the idea around using the backpressure metrics
> > still needs a bit more thinking before we can propose a FLIP. But I am
> > pretty sure we can make use of the watermark/event-time to determine the
> > backlog status.
> >
> > grasp it, if you can come back with something that can be easily extended
> > > in the future, up to a point where one could implement
> > > something similar to this backpressure detecting algorithm that I
> > mentioned
> > > many times before, I would be happy to discuss and
> > > support it.
> > >
> >
> > Here is my idea of extending the source reader to support
> event-time-based
> > backlog detecting algorithms:
> >
> > - Add a job-level config such as watermark-lag-threshold-for-backlog. If
> > any source reader determines that the event-timestamp is available and
> the
> > system-time - watermark exceeds this threshold, then the source reader
> > considers its isProcessingBacklog=true.
> > - The source reader can send an event to the source coordinator. Note
> that
> > this might be doable in the SourceReaderBase without adding any public
> API
> > which the concrete SourceReader subclass needs to explicitly invoke.
> > - And in the future if FLIP-325 is accepted, insteading of sending the
> > event to SourceCoordinator and let SourceCoordinator inform the
> checkpoint
> > coordinator, the source reader might just emit the information as part of
> > the RecordAttributes and let the two-phase commit sink inform the
> > checkpoint coordinator.
> >
> > Note that this is a sketch of the idea and it might need further
> > improvement. I just hope you understand that we have thought about this
> > idea and did quite a lot of thinking for these design options. If it is
> OK
> > with you, I hope we can make incremental progress and discuss the
> > metrics-based solution separately in a follow-up FLIP.
> >
> > Last but not least, thanks for taking so much time to leave comments and
> > help us improve the FLIP. Please kindly bear with us in this discussion.
> I
> > am looking forward to collaborating with you to find the best design for
> > the target use-cases.
> >
> > Best,
> > Dong
> >
> >
> > > Hang, about your points 1. and 2., do you think those problems are
> > > insurmountable and blockers for that counter proposal?
> > >
> > > > 1. It is hard to find the error checkpoint.
> > >
> > > No it's not, please take a look at what I exactly proposed and maybe at
> > the
> > > code.
> > >
> > > > 2. (...) The failed checkpoint may make them think the job is
> > unhealthy.
> > >
> > > Please read again what I wrote in [3]. I'm mentioning there a solution
> > for
> > > this exact "problem".
> > >
> > > About the necessity of the config value, I'm still not convinced that's
> > > needed from the start, but yes we can add some config option
> > > if you think otherwise. This option, if named properly, could be
> re-used
> > in
> > > the future for different solutions, so that's fine by me.
> > >
> > > Best,
> > > Piotrek
> > >
> > > [1] Introduced in my very first e-mail from 23 maj 2023, 16:26, and
> > refined
> > > later with point "2." in my e-mail from 16 June 2023, 17:58
> > > [2] Section "2. ===============" in my e-mail from 30 June 2023, 16:34
> > > [3] Section "3. ===============" in my e-mail from 30 June 2023, 16:34
> > >
> > > All times in CEST.
> > >
> > >
> >
>

Reply via email to