Hi Piotr,

I am sorry if you feel unhappy or upset with us for not following/fixing
your proposal. It is not my intention to give you this feeling. After all,
we are all trying to make Flink better, to support more use-case with the
most maintainable code. I hope you understand that just like you, I have
also been doing my best to think through various design options and taking
time to evalute the pros/cons. Eventually, we probably still need to reach
consensus by clearly listing and comparing the objective pros/cons of
different proposals and identifying the best choice.

Regarding your concern (or frustration) that we are always finding issues
in your proposal, I would say it is normal (and probably necessary) for
developers to find pros/cons in each other's solutions, so that we can
eventually pick the right one. I will appreciate anyone who can correctly
pinpoint the concrete issue in my proposal so that I can improve it or
choose an alternative solution.

Regarding your concern that we are not spending enough effort to find
solutions and that the problem in your solution can be solved in a minute,
I would like to say that is not true. For each of your previous proposals,
I typically spent 1+ hours thinking through your proposal to understand
whether it works and why it does not work, and another 1+ hour to write
down the details and explain why it does not work. And I have had a variety
of offline discussions with my colleagues discussing various proposals
(including yours) with 6+ hours in total. Maybe I am not capable enough to
fix those issues in one minute or so so. If you think your proposal can be
easily fixed in one minute or so, I would really appreciate it if you can
think through your proposal and fix it in the first place :)

For your information, I have had several long discussions with my
colleagues at Alibaba and also Becket on this FLIP. We have seriously
considered your proposals and discussed in detail what are the pros/cons
and whether we can improve these solutions. The initial version of this
FLIP (which allows the source operator to specify checkpoint intervals)
does not get enough support due to concerns of not being generic (i.e.
users need to specify checkpoint intervals on a per-source basis). It is
only after I updated the FLIP to use the job-level
execution.checkpointing.interval-during-backlog, then they agree to give +1
to the FLIP. What I want to tell you is that your suggestions have been
taken seriously, and the quality of the FLIP has been taken seriously
by all those who have voted. As a result of taking your suggestion
seriously and trying to find improvements, we updated the FLIP to use
isProcessingBacklog.

I am wondering, do you think it will be useful to discuss face-to-face via
video conference call? It is not just between you and me. We can invite the
developers who are interested to join and help with the discussion. That
might improve communication efficiency and help us understand each other
better :)

I am writing this long email to hopefully get your understanding. I care
much more about the quality of the eventual solution rather than who
proposed the solution. Please bear with me and see my comments inline, with
an explanation of the pros/cons of these proposals.


On Wed, Jul 5, 2023 at 11:06 PM Piotr Nowojski <piotr.nowoj...@gmail.com>
wrote:

> Hi Guys,
>
> I would like to ask you again, to spend a bit more effort on trying to find
> solutions, not just pointing out problems. For 1.5 months,
> the discussion doesn't go in circle, but I'm suggesting a solution, you are
> trying to undermine it with some arguments, I'm coming
> back with a fix, often an extremely easy one, only for you to try to find
> yet another "issue". It doesn't bode well, if you are finding
> a "problem" that can be solved with a minute or so of thinking or even has
> already been solved.
>
> I have provided you so far with at least three distinct solutions that
> could address your exact target use-case. Two [1][2] generic
> enough to be probably good enough for the foreseeable future, one
> intermediate and not generic [3] but which wouldn't
> require @Public API changes or some custom hidden interfaces.


> All in all:
> - [1] with added metric hints like "isProcessingBacklog" solves your target
> use case pretty well. Downside is having to improve
>   how JM is collecting/aggregating metrics
>

Here is my analysis of this proposal compared to the current approach in
the FLIP-309.

pros:
- No need to add the public API
SplitEnumeratorContext#setIsProcessingBacklog.
cons:
- Need to add a public API that subclasses of SourceReader can use to
specify its IsProcessingBacklog metric value.
- Source Coordinator needs to periodically pull the isProcessingBacklog
metrics from all TMs throughout the job execution.

Here is why I think the cons outweigh the pros:
1) JM needs to collect/aggregate metrics with extra runtime overhead, which
is not necessary for the target use-case with the push-based approach in
FLIP-309.
2) For the target use-case, it is simpler and more intuitive for source
operators (e.g. HybridSource, MySQL CDC source) to be able to set its
isProcessingBacklog status in the SplitEnumerator. This is because the
switch between bounded/unbounded stages happens in their SplitEnumerator.



> - [2] is basically an equivalent of [1], replacing metrics with events. It
> also is a superset of your proposal
>

Previously, I thought you meant to add a generic logic in SourceReaderBase
to read existing metrics (e.g. backpressure) and emit the
IsProcessingBacklogEvent to SourceCoordinator. I am sorry if I have
misunderstood your suggetions.

After double-checking your previous suggestion, I am wondering if you are
OK with the following approach:

- Add a job-level config execution.checkpointing.interval-during-backlog
- Add an API SourceReaderContext#setProcessingBacklog(boolean
isProcessingBacklog).
- When this API is invoked, it internally sends an
internal SourceReaderBacklogEvent to SourceCoordinator.
- SourceCoordinator should keep track of the latest isProcessingBacklog
status from all its subtasks. And for now, we will hardcode the logic such
that if any source reader says it is under backlog, then
execution.checkpointing.interval-during-backlog is used.

This approach looks good to me as it can achieve the same performance with
the same number of public APIs for the target use-case. And I suppose in
the future we might be able to re-use this API for source reader to set its
backlog status based on its backpressure metrics, which could be an extra
advantage over the current approach.

Do you think we can agree to adopt the approach described above?


- [3] yes, it's hacky, but it's a solution that could be thrown away once
> we implement [1] or [2] . The only real theoretical
>   downside is that it cannot control the long checkpoint exactly (short
> checkpoint interval has to be a divisor of the long checkpoint
>   interval, but I simply can not imagine a practical use where that would
> be a blocker for a user. Please..., someone wanting to set
>   short checkpoint interval to 3min and long to 7 minutes, and that someone
> can not accept the long interval to be 9 minutes?
>   And that's even ignoring the fact that if someone has an issue with the 3
> minutes checkpoint interval, I can hardly think that merely
>   doubling the interval to 7 minutes would significantly solve any problem
> for that user.
>

Yes, this is a fabricated example that shows
execution.checkpointing.interval-during-backlog might not be accurately
enforced with this option. I think you are probably right that it might not
matter that much. I just think we should try our best to make Flink public
API's semantics (including configuration) clear, simple, and enforceable.
If we can make the user-facing configuration enforceable at the cost of an
extra developer facing API (i.e. setProcessingBacklog(...)), I would prefer
to do this.

It seems that we both agree that option [2] is better than [3]. I will skip
the further comments for this option and we can probably focus on
option [2] :)


> Dong a long time ago you wrote:
> > Sure. Then let's decide the final solution first.
>
> Have you thought about that? Maybe I'm wrong but I don't remember you
> describing in any of your proposals how they could be
> extended in the future, to cover more generic cases. Regardless if you
> either don't believe in the generic solution or struggle to
>

Yes, I have thought about the plan to extend the current FLIP to support
metrics (e.g. backpressure) based solution you described earlier. Actually,
I mentioned multiple times in the earlier email that your suggestion of
using metrics is valuable and I will do this in a follow-up FLIP.

Here are my comments from the previous email:
- See "I will add follow-up FLIPs to make use of the event-time metrics and
backpressure metrics" from Jul 3, 2023, 6:39 PM
- See "I agree it is valuable" from Jul 1, 2023, 11:00 PM
- See "we will create a followup FLIP (probably in FLIP-328)" from Jun 29,
2023, 11:01 AM

Frankly speaking, I think the idea around using the backpressure metrics
still needs a bit more thinking before we can propose a FLIP. But I am
pretty sure we can make use of the watermark/event-time to determine the
backlog status.

grasp it, if you can come back with something that can be easily extended
> in the future, up to a point where one could implement
> something similar to this backpressure detecting algorithm that I mentioned
> many times before, I would be happy to discuss and
> support it.
>

Here is my idea of extending the source reader to support event-time-based
backlog detecting algorithms:

- Add a job-level config such as watermark-lag-threshold-for-backlog. If
any source reader determines that the event-timestamp is available and the
system-time - watermark exceeds this threshold, then the source reader
considers its isProcessingBacklog=true.
- The source reader can send an event to the source coordinator. Note that
this might be doable in the SourceReaderBase without adding any public API
which the concrete SourceReader subclass needs to explicitly invoke.
- And in the future if FLIP-325 is accepted, insteading of sending the
event to SourceCoordinator and let SourceCoordinator inform the checkpoint
coordinator, the source reader might just emit the information as part of
the RecordAttributes and let the two-phase commit sink inform the
checkpoint coordinator.

Note that this is a sketch of the idea and it might need further
improvement. I just hope you understand that we have thought about this
idea and did quite a lot of thinking for these design options. If it is OK
with you, I hope we can make incremental progress and discuss the
metrics-based solution separately in a follow-up FLIP.

Last but not least, thanks for taking so much time to leave comments and
help us improve the FLIP. Please kindly bear with us in this discussion. I
am looking forward to collaborating with you to find the best design for
the target use-cases.

Best,
Dong


> Hang, about your points 1. and 2., do you think those problems are
> insurmountable and blockers for that counter proposal?
>
> > 1. It is hard to find the error checkpoint.
>
> No it's not, please take a look at what I exactly proposed and maybe at the
> code.
>
> > 2. (...) The failed checkpoint may make them think the job is unhealthy.
>
> Please read again what I wrote in [3]. I'm mentioning there a solution for
> this exact "problem".
>
> About the necessity of the config value, I'm still not convinced that's
> needed from the start, but yes we can add some config option
> if you think otherwise. This option, if named properly, could be re-used in
> the future for different solutions, so that's fine by me.
>
> Best,
> Piotrek
>
> [1] Introduced in my very first e-mail from 23 maj 2023, 16:26, and refined
> later with point "2." in my e-mail from 16 June 2023, 17:58
> [2] Section "2. ===============" in my e-mail from 30 June 2023, 16:34
> [3] Section "3. ===============" in my e-mail from 30 June 2023, 16:34
>
> All times in CEST.
>
>

Reply via email to