Hi Arvid,

Thank you for your feedback. It allows me to address the key points you
raised more clearly.

1. On Coarse-Grained Limiting and the Scope of this FLIP
Regarding coarse-grained limiting at the operator level, a similar
split-level limiting mechanism was already made possible by the
RateLimitedSourceReader wrapper in FLIP-238. The core goal of this proposal
is to solve a currently unaddressed problem: providing precise,
record-level rate limiting, which is crucial for sources like Kafka with
variable batch sizes.

2. On Mixin vs. a Unified Base-Class Implementation
The reason I did not opt for a Mixin interface is that my primary objective
is to deliver a truly out-of-the-box, universal capability. If we only
provide a Mixin and leave the implementation details to each connector, we
cannot guarantee a uniform and reliable behavior for users who configure
the standard rate-limit options. Placing the core logic in SourceReaderBase
is the only way to ensure the universality and consistency of this
capability, which is the very intent of this proposal.

3. On API Compatibility
I understand and share the high regard for API stability. However, after
further investigation, it seems there is no good way to introduce this new
feature while maintaining full backward compatibility without using
reflection. Therefore, requiring connectors to adapt appears to be a
necessary trade-off for adding this foundational capability to the
framework.

4. On notifyCheckpointAborted
This callback was added to the proposal based on earlier community
discussions (CC'ing Shengkai), with the goal of handling state cleanup on
checkpoint failure. However, after further consideration of Flink's
failover mechanism, I agree with you that this interface is indeed
unnecessary.

We don't actually depend on this callback for deadlock prevention, as
Flink's core failover mechanism already handles this scenario perfectly.
When a job rolls back, the task rebuild process destroys the old
RateLimiter instance and creates a new one. This automatically discards any
transient state that might cause a blockage (like a pending future) and
restores it with a clean, initial quota from the last successful snapshot.
Therefore, the potential for deadlock is fundamentally resolved by Flink's
core fault tolerance.

Thank you again for your valuable input and the in-depth discussion.

Best,
Zexian

Arvid Heise <ahe...@confluent.io.invalid> 于2025年8月5日周二 17:49写道:

> Hi Zexian,
>
> the general idea and approach LGTM. A couple of questions:
> * If we don't want to provide RateLimiter on operator level (seems to be
> out of scope for this FLIP), can we still make the RateLimiter a property
> of the Source similar to Watermark strategy and pass it through the
> implementation? I'm guessing that we will then only have very coarse-grain
> control because it's basically implemented on SourceOperator and hence can
> limit batches only. The upside is that we don't need to touch each and
> every connector and get something "for free" that probably works good
> enough for many cases.
> * We could additionally introduce some kind of mixin to the SourceReader
> interface that would be able to push down the RateLimiter into the
> connector. That would enable fine-grain control as you proposed.
> * Note that each and every change to the public APIs will break connectors
> in a way (your migration section is a bit slim on that part). In this case,
> if you add a new overload to the ctor of SourceReaderBase, implementing
> connectors will not run on Flink 2.1- anymore. Unfortunately, mixins have
> the same issue. I have yet to find a good solution to support features
> optionally, such that the source runs fine on older versions (without rate
> limiting) and on newer versions. I guess the only way would be some
> reflection magic and I'd rather have less of that in Flink than more.
> * I'm not sold on the idea that we need to pass notifyCheckpointAborted to
> the RateLimiter for the sources. Can you expand on why this is needed?
> Checkpoint barrier is injected differently into the sources than other
> operators. notifyCheckpointAborted is also not guaranteed to be called, so
> if there is a risk of deadlock, we should find other options.
>
> Best,
>
> Arvid
>
> On Tue, Aug 5, 2025 at 8:48 AM Jiangang Liu <liujiangangp...@gmail.com>
> wrote:
>
> > Thanks for your explanation.
> > 1. It makes sense to me to solve the imbalance in other issues.
> > 2. In fact, we support to adjust the source's qps dynamically in our
> inner
> > flink. We can support this feature later.
> >
> > Best,
> > Jiangang Liu
> >
> > Zexian WU <wzx3122351...@gmail.com> 于2025年8月5日周二 14:35写道:
> >
> > > Hi Jiangang,
> > >
> > > Thank you very much for your valuable feedback and insightful
> questions.
> > > The two scenarios you've distilled from real-world production
> experience
> > > are extremely helpful for improving this feature.
> > >
> > > 1. Regarding the load imbalance issue between source tasks:
> > >
> > > Your point about load imbalance between source tasks potentially
> leading
> > to
> > > a suboptimal rate-limiting effect is absolutely correct. Ideally, a
> > > mechanism that can dynamically allocate the rate based on the number of
> > > splits assigned to each subtask would certainly be a superior solution.
> > >
> > > In fact, in the current design, we've introduced the notifyAddingSplit
> > > interface, which provides the possibility for some level of local
> dynamic
> > > adjustment. However, to implement a globally unified, on-demand dynamic
> > > allocation, it would indeed require introducing a new communication
> > channel
> > > between the SourceReader and the existing Coordinator, which would add
> > > implementation complexity and potential performance overhead.
> > >
> > > Furthermore, a more direct optimization is to have the Enumerator in
> the
> > > Coordinator distribute splits as evenly as possible. I've noticed that
> > the
> > > community is already working on this with the ongoing FLIP-537:
> > Enumerator
> > > with Global Split Assignment Distribution for Balanced Split
> assignment.
> > > Improving the split distribution logic seems like a simpler and more
> > direct
> > > solution to the root problem than introducing a complex coordination
> > > mechanism just for rate limiting. Therefore, at this stage, we have
> opted
> > > for a simpler, self-contained approach as a robust first step.
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
> > >
> > > 2. Regarding the generic, operator-level configuration:
> > >
> > > Regarding the generic configuration method you mentioned—using a
> > > ConfigOption<Map> to configure QPS for any operator—that is indeed a
> very
> > > flexible and powerful design.
> > >
> > > I'd like to confirm with you: is the main purpose of your proposed
> > solution
> > > also to enable dynamic rate limiting more conveniently (for example,
> > > adjusting the rate at runtime via external signals)? This FLIP
> currently
> > > focuses primarily on the source side, as it addresses the most
> immediate
> > > pain point. Your idea is very insightful, and if our goals are aligned,
> > it
> > > serves as an excellent reference for the future evolution of this
> > feature.
> > >
> > > Thanks again for your insightful contributions; they are crucial for
> > > refining and evolving this feature.
> > >
> > > Best,
> > > Zexian
> > >
> > > Jiangang Liu <liujiangangp...@gmail.com> 于2025年8月4日周一 23:13写道:
> > >
> > > > Hello, Zexian. This is a great work for job's stability when failover
> > or
> > > > catching the lag. I just have some questions:
> > > >
> > > >    1. For some source tasks, their consuming splits may be more than
> > > >    others. The simple calculation QPS based on parallelism may cause
> > > >    imbalance. Can we support a calculation based on the split number
> > as a
> > > >    common way?
> > > >    2. In our company, we support to config each operator's qps by
> > > >    ConfigOption<Map>. This is convenient if we can get the operatorId
> > > > easily
> > > >    before submitting. But in open source, it may be hard to do so. I
> > > wonder
> > > >    whether there exist similar way so that we do not need to
> implement
> > > each
> > > >    connector.
> > > >
> > > > Best,
> > > > Jiangang Liu
> > > >
> > > > Zexian WU <wzx3122351...@gmail.com> 于2025年8月4日周一 14:35写道:
> > > >
> > > > > Hi Shengkai,
> > > > >
> > > > > Thank you for your thoughtful feedback and excellent questions.
> These
> > > are
> > > > > all great points. Here are my thoughts:
> > > > >
> > > > > Regarding the requestSize parameter and its behavior: requestSize
> is
> > > not
> > > > a
> > > > > request for permission but a post-facto count of records that have
> > > > already
> > > > > been emitted. If this count (e.g., 3) exceeds the available rate
> > credit
> > > > > (e.g., for 2 records), RateLimiter#acquire will return a
> > > CompletionStage
> > > > > that completes in the future. This creates a non-blocking pause in
> > the
> > > > > SourceReader, allowing it to pay back the "time debt" and ensuring
> > the
> > > > > average rate is maintained.
> > > > >
> > > > > Regarding notifyCheckpointAborted: You are absolutely right; this
> is
> > a
> > > > > critical point for preventing deadlocks. The specific plan is to
> add
> > a
> > > > > notifyCheckpointAborted method to the RateLimiter interface. Then,
> > > within
> > > > > the SourceReader implementation, I will handle the checkpoint abort
> > > > > callback and invoke this new method to reset the rate limiter's
> > > internal
> > > > > state.
> > > > >
> > > > > Regarding configuration and unsupported connectors: You've made an
> > > > > excellent point about this being a connector-specific feature.
> > > > >
> > > > > I agree that scan.rate-limit.record-per-second is a great name,
> and I
> > > > will
> > > > > propose it as a best practice. I plan to use it in our initial
> > > reference
> > > > > implementation, likely for the Kafka connector, and it can serve
> as a
> > > > > reference for other connectors in the future.
> > > > > You are correct that the decision to support this feature and how
> to
> > > name
> > > > > the option lies with each connector. The framework only provides
> the
> > > > > underlying mechanism (the RateLimiter integration). The mechanism
> for
> > > > > providing clear error feedback works perfectly with this model: a
> > > > connector
> > > > > that implements rate limiting will declare the option in its
> > > > > DynamicTableFactory. Connectors that don't support it will not
> > declare
> > > > the
> > > > > option. Consequently, Flink’s built-in validation will
> automatically
> > > > throw
> > > > > a ValidationException for an unknown option, providing the desired
> > user
> > > > > feedback.
> > > > >
> > > > > Best,
> > > > > Zexian
> > > > >
> > > > > Shengkai Fang <fskm...@gmail.com> 于2025年7月28日周一 14:57写道:
> > > > >
> > > > > > Hi, Zexian.
> > > > > >
> > > > > > Thanks for your FLIP. I think rate litmit is very important
> feature
> > > for
> > > > > our
> > > > > > users. But I have some questions about the FLIP:
> > > > > >
> > > > > > 1. How do I determine the input parameter `requestSize` for
> > > > > > `RateLimiter#acquire`? If the rate limiter indicates there are 2
> > > > > remaining
> > > > > > requests that can be emitted, but the `requestSize` is 3, what is
> > the
> > > > > > behavior here?
> > > > > > 2. CheckpointListener also has a method named
> > > notifyCheckpointAborted,
> > > > I
> > > > > > think RateLimiter also needs this. If the checkpoint aborts,
> please
> > > > clear
> > > > > > the status of the rate limiter.
> > > > > > 3. I think `scan.rate-limit.record-per-second` is better than
> > > > > > `scan.rate.limit.record-per-second`. It seems only FLIP-27 source
> > > > > supports
> > > > > > rate-limit, it's better we can throw an exception to notify users
> > if
> > > > the
> > > > > > source doesn't support this feature.
> > > > > >
> > > > > > Best,
> > > > > > Shengkai
> > > > > >
> > > > > > Zexian WU <wzx3122351...@gmail.com> 于2025年7月28日周一 11:52写道:
> > > > > >
> > > > > > > Hi Leonard,
> > > > > > >
> > > > > > > Thanks a lot for your support and positive feedback! I'm glad
> to
> > > hear
> > > > > you
> > > > > > > think the design meets the needs of most scenarios.
> > > > > > >
> > > > > > > Indeed, rate limiting is a fundamental and important feature,
> and
> > > I'm
> > > > > > > excited to help fill this gap.
> > > > > > >
> > > > > > > I'm also looking forward to more ideas and potential
> improvements
> > > > from
> > > > > > > other community members.
> > > > > > >
> > > > > > > Thanks again!
> > > > > > >
> > > > > > > Best,
> > > > > > > Zexian
> > > > > > >
> > > > > > >
> > > > > > > On 2025/07/24 12:19:39 Leonard Xu wrote:
> > > > > > > > Thanks Zexian for driving this work.
> > > > > > > >
> > > > > > > > Rate limiting is a common requirement, TBH, we should have
> > > > supported
> > > > > it
> > > > > > > in earlier stage, and the proposed design integrating it into
> the
> > > > > source
> > > > > > > operator lifecycle, it is already able to meet the vast
> majority
> > of
> > > > > > > scenarios, looks good from my side.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Leonard
> > > > > > > >
> > > > > > > >
> > > > > > > > > 2025 7月 18 12:01,Zexian WU <wz...@gmail.com> 写道:
> > > > > > > > >
> > > > > > > > > Hi everyone,
> > > > > > > > > I would like to start a discussion on a new Flink
> Improvement
> > > > > > Proposal
> > > > > > > (FLIP), FLIP-535: Introduce RateLimiter to Source.
> > > > > > > > > The full proposal can be found on the wiki:
> > > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-535%3A+Introduce+RateLimiter+to+Source
> > > > > > > > > Motivation
> > > > > > > > >
> > > > > > > > > In many production environments, Flink sources read from
> > shared
> > > > > > > external systems (like Kafka, Pulsar, or databases) where
> > resources
> > > > are
> > > > > > > limited. An uncontrolled data ingestion rate can lead to
> critical
> > > > > issues:
> > > > > > > > > Resource Contention: A high-throughput Flink job can
> > saturate a
> > > > > > shared
> > > > > > > Kafka cluster's bandwidth, starving other essential services.
> > > > > > > > > System Instability: Aggressive polling from a database
> (e.g.,
> > > > > MySQL)
> > > > > > > can overwhelm its IOPS, degrading performance for transactional
> > > > > queries.
> > > > > > > > > This proposal aims to provide a built-in, precise, and
> > > > easy-to-use
> > > > > > > rate-limiting mechanism for Flink Sources to ensure system
> > > stability
> > > > > and
> > > > > > > fair resource sharing.
> > > > > > > > > Proposed Solution
> > > > > > > > >
> > > > > > > > > The core idea is to integrate a flexible, record-based
> > > > > rate-limiting
> > > > > > > mechanism directly into the SourceReaderBase, making it
> available
> > > to
> > > > > all
> > > > > > > connectors built on the new source interface.
> > > > > > > > > Key Changes:
> > > > > > > > > Seamless Integration via SourceReaderBase:
> > > > > > > > > New constructors accepting a RateLimiterStrategy will be
> > added
> > > > > > > directly to SourceReaderBase. This allows any connector built
> on
> > > the
> > > > > > modern
> > > > > > > source interface (like Kafka or Pulsar) to enable rate limiting
> > > with
> > > > > > > minimal code changes.
> > > > > > > > > Accurate, Post-Emission Throttling:
> > > > > > > > > To support sources with unpredictable batch sizes (e.g.,
> > > Kafka),
> > > > > rate
> > > > > > > limiting is applied after records are emitted. The reader
> counts
> > > the
> > > > > > > records after each recordEmitter.emitRecord method call, counts
> > the
> > > > > > > records, and only then consults the RateLimiter. This ensures
> > > > > throttling
> > > > > > is
> > > > > > > based on the precise number of records processed.
> > > > > > > > > Fully Non-Blocking Operation:
> > > > > > > > > The entire mechanism is asynchronous and non-blocking. If a
> > > rate
> > > > > > limit
> > > > > > > is hit, the reader pauses by returning
> > InputStatus.MORE_AVAILABLE.
> > > > This
> > > > > > > yields control to the Flink task's event loop without blocking
> > the
> > > > > > > processing thread, ensuring that critical operations like
> > > > checkpointing
> > > > > > are
> > > > > > > never delayed.
> > > > > > > > > Unified Configuration via SQL/Table API:
> > > > > > > > > Users can configure rate limits consistently across
> different
> > > > > sources
> > > > > > > using a simple SQL table option, such as WITH
> ('scan.rate.limit'
> > =
> > > > > > '1000').
> > > > > > > This provides a unified and user-friendly experience for
> pipeline
> > > > > tuning.
> > > > > > > > > The design is backward-compatible, and existing custom
> > sources
> > > > will
> > > > > > > continue to work without any modification.
> > > > > > > > > I believe this feature will be a valuable addition for
> Flink
> > > > users
> > > > > > > operating in complex, multi-tenant environments. I'm looking
> > > forward
> > > > to
> > > > > > > your feedback, suggestions, and any potential concerns you
> might
> > > > have.
> > > > > > > > > Thanks,
> > > > > > > > > Zexian Wu
> > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to