Hey You should send a message to user-unsubscr...@flink.apache.org if you want to unsubscribe the mail from u...@flink.apache.org.
You should send a message to dev-unsubscr...@flink.apache.org if you want to unsubscribe the mail from dev@flink.apache.org. 如果需要取消订阅 u...@flink.apache.org 邮件组,请发送任意内容的邮件到 user-unsubscr...@flink.apache.org,取消订阅后不再收到邮件组发送的邮件 如果需要取消订阅 dev@flink.apache.org 邮件组,请发送任意内容的邮件到 dev-unsubscr...@flink.apache.org ,取消订阅后不再收到邮件组发送的邮件 Best, Zexian 魏宪党 <1210798...@qq.com.invalid> 于2025年8月6日周三 09:17写道: > Unsubscribe > > > 魏宪党 > 1210798...@qq.com > > > > > > > > 原始邮件 > > > 发件人:Arvid Heise <ahe...@confluent.io.invalid> > 发件时间:2025年8月5日 17:48 > 收件人:dev <dev@flink.apache.org> > 主题:Re: Re: [DISCUSS] FLIP-535:Introduce RateLimiter to Source > > > > Hi Zexian, > > > the general idea and approach LGTM. A couple of questions: > > * If we don't want to provide RateLimiter on operator level (seems to be > > out of scope for this FLIP), can we still make the RateLimiter a property > > of the Source similar to Watermark strategy and pass it through the > > implementation? I'm guessing that we will then only have very coarse-grain > > control because it's basically implemented on SourceOperator and hence can > > limit batches only. The upside is that we don't need to touch each and > > every connector and get something "for free" that probably works good > enough for many cases. > > * We could additionally introduce some kind of mixin to the SourceReader > > interface that would be able to push down the RateLimiter into the > > connector. That would enable fine-grain control as you proposed. > > * Note that each and every change to the public APIs will break connectors > > in a way (your migration section is a bit slim on that part). In this case, > > if you add a new overload to the ctor of SourceReaderBase, implementing > > connectors will not run on Flink 2.1- anymore. Unfortunately, mixins have > > the same issue. I have yet to find a good solution to support features > > optionally, such that the source runs fine on older versions (without rate > > limiting) and on newer versions. I guess the only way would be some > > reflection magic and I'd rather have less of that in Flink than more. > > * I'm not sold on the idea that we need to pass notifyCheckpointAborted to > > the RateLimiter for the sources. Can you expand on why this is needed? > > Checkpoint barrier is injected differently into the sources than other > > operators. notifyCheckpointAborted is also not guaranteed to be called, so > > if there is a risk of deadlock, we should find other options. > > Best, > > Arvid > > > On Tue, Aug 5, 2025 at 8:48 > AM Jiangang Liu < > liujiangangp...@gmail.com> > wrote: > > > Thanks for your explanation. > > > 1. It makes sense to me to solve the imbalance in other issues. > > > 2. In fact, we support to adjust the source's qps dynamically in our inner > > > flink. We can support this feature later. > > > > Best, > > Jiangang Liu > > > > Zexian WU <wzx3122351...@gmail.com > > 于2025年8月5日周二 14:35写道: > > > > > Hi Jiangang, > > > > > > > Thank you very much for your valuable feedback and insightful questions. > > > > The two scenarios you've distilled from real-world production experience > > > > are extremely helpful for improving this feature. > > > > > > > 1. Regarding the load imbalance issue between source tasks: > > > > > > > Your point about load imbalance between source tasks potentially leading > > to > > > > a suboptimal rate-limiting effect is absolutely correct. Ideally, a > > > > mechanism that can dynamically allocate the rate based on the number of > > > > splits assigned to each subtask would certainly be a superior solution. > > > > > > > In fact, in the current design, we've introduced the notifyAddingSplit > > > > interface, which provides the possibility for some level of local dynamic > > > > adjustment. However, to implement a globally unified, on-demand dynamic > > > > allocation, it would indeed require introducing a new communication > > channel > > > > between the SourceReader and the existing Coordinator, which would add > > > > implementation complexity and potential performance overhead. > > > > > > > Furthermore, a more direct optimization is to have the Enumerator in the > > > > Coordinator distribute splits as evenly as possible. I've noticed that > > the > > > > community is already working on this with the ongoing FLIP-537: > > Enumerator > > > > with Global Split Assignment Distribution for Balanced Split assignment. > > > > Improving the split distribution logic seems like a simpler and more > > direct > > > > solution to the root problem than introducing a complex coordination > > > > mechanism just for rate limiting. Therefore, at this stage, we have opted > > > > for a simpler, self-contained approach as a robust first step. > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment > > > > <https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment> >> > ; > > > > 2. Regarding the generic, operator-level configuration: > > > > > > > Regarding the generic configuration method you mentioned—using a > > > > ConfigOption<Map> to configure QPS for any operator—that is indeed a very > > > flexible and powerful design. > > > > > > > I'd like to confirm with you: is the main purpose of your proposed > > solution > > > > also to enable dynamic rate limiting more conveniently (for example, > > > > adjusting the rate at runtime via external signals)? This FLIP currently > > > > focuses primarily on the source side, as it addresses the most immediate > > > > pain point. Your idea is very insightful, and if our goals are aligned, > > it > > > > serves as an excellent reference for the future evolution of this > > feature. > > > > > > > Thanks again for your insightful contributions; they are crucial for > > > refining and evolving this feature. > > > > > > Best, > > > Zexian > > > > > > Jiangang Liu <liujiangangp...@gmail.com > > 于2025年8月4日周一 23:13写道: > > > > > > > > Hello, Zexian. This is a great work for job's stability when failover > > or > > > > > catching the lag. I just have some questions: > > > > > > > > > 1. For some source tasks, their consuming splits may be more than > > > > > others. The simple calculation QPS based on parallelism may cause > > > > > imbalance. Can we support a calculation based on the split number > > as a > > > > common way? > > > > > 2. In our company, we support to config each operator's qps by > > > > > ConfigOption<Map>. This is convenient if we can get the operatorId > > > > easily > > > > > before submitting. But in open source, it may be hard to do so. I > > > wonder > > > > > whether there exist similar way so that we do not need to implement > > > each > > > > connector. > > > > > > > > Best, > > > > Jiangang Liu > > > > > > > > Zexian WU <wzx3122351...@gmail.com > > 于2025年8月4日周一 14:35写道: > > > > > > > > > Hi Shengkai, > > > > > > > > > > > Thank you for your thoughtful feedback and excellent questions. These > > > are > > > > > > all great points. Here are my thoughts: > > > > > > > > > > > Regarding the requestSize parameter and its behavior: requestSize is > > > not > > > > a > > > > > > request for permission but a post-facto count of records that have > > > > already > > > > > > been emitted. If this count (e.g., 3) exceeds the available rate > > credit > > > > > > (e.g., for 2 records), RateLimiter#acquire will return a > > > CompletionStage > > > > > > that completes in the future. This creates a non-blocking pause in > > the > > > > > > SourceReader, allowing it to pay back the "time debt" and ensuring > > the > > > > > > average rate is maintained. > > > > > > > > > > > Regarding notifyCheckpointAborted: You are absolutely right; this is > > a > > > > > > critical point for preventing deadlocks. The specific plan is to add > > a > > > > > > notifyCheckpointAborted method to the RateLimiter interface. Then, > > > within > > > > > > the SourceReader implementation, I will handle the checkpoint abort > > > > > > callback and invoke this new method to reset the rate limiter's > > > internal > > > > > state. > > > > > > > > > > > Regarding configuration and unsupported connectors: You've made an > > > > > > excellent point about this being a connector-specific feature. > > > > > > > > > > > I agree that scan.rate-limit.record-per-second is a great name, and I > > > > will > > > > > > propose it as a best practice. I plan to use it in our initial > > > reference > > > > > > implementation, likely for the Kafka connector, and it can serve as a > > > > > > reference for other connectors in the future. > > > > > > You are correct that the decision to support this feature and how to > > > name > > > > > > the option lies with each connector. The framework only provides the > > > > > > underlying mechanism (the RateLimiter integration). The mechanism for > > > > > > providing clear error feedback works perfectly with this model: a > > > > connector > > > > > > that implements rate limiting will declare the option in its > > > > > > DynamicTableFactory. Connectors that don't support it will not > > declare > > > > the > > > > > > option. Consequently, Flink’s built-in validation will automatically > > > > throw > > > > > > a ValidationException for an unknown option, providing the desired > > user > > > > > feedback. > > > > > > > > > > Best, > > > > > Zexian > > > > > > > > > > Shengkai Fang < > fskm...@gmail.com> 于2025年7月28日周一 14:57写道: > > > > > > > > > > > Hi, Zexian. > > > > > > > > > > > > > Thanks for your FLIP. I think rate litmit is very important feature > > > for > > > > > our > > > > > > > users. But I have some questions about the FLIP: > > > > > > > > > > > > > 1. How do I determine the input parameter `requestSize` for > > > > > > > `RateLimiter#acquire`? If the rate limiter indicates there are 2 > > > > > remaining > > > > > > > requests that can be emitted, but the `requestSize` is 3, what is > > the > > > > > > behavior here? > > > > > > > 2. CheckpointListener also has a method named > > > notifyCheckpointAborted, > > > > I > > > > > > > think RateLimiter also needs this. If the checkpoint aborts, please > > > > clear > > > > > > > the status of the rate limiter. > > > > > > > 3. I think `scan.rate-limit.record-per-second` is better than > > > > > > > `scan.rate.limit.record-per-second`. It seems only FLIP-27 source > > > > > supports > > > > > > > rate-limit, it's better we can throw an exception to notify users > > if > > > > the > > > > > > > source doesn't support this feature. > > > > > > > > > > > > Best, > > > > > > Shengkai > > > > > > > > > > > > Zexian WU < > wzx3122351...@gmail.com> 于2025年7月28日周一 11:52写道: > > > > > > > > > > > > > > Hi Leonard, > > > > > > > > > > > > > > > Thanks a lot for your support and positive feedback! I'm glad to > > > hear > > > > > you > > > > > > > > think the design meets the needs of most scenarios. > > > > > > > > > > > > > > > Indeed, rate limiting is a fundamental and important feature, and > > > I'm > > > > > > > > excited to help fill this gap. > > > > > > > > > > > > > > > I'm also looking forward to more ideas and potential improvements > > > > from > > > > > > > > other community members. > > > > > > > > > > > > > > > Thanks again! > > > > > > > > > > > > > > Best, > > > > > > > Zexian > > > > > > > > > > > > > > > > > > > > > > On 2025/07/24 12:19:39 Leonard Xu wrote: > > > > > > > > > Thanks Zexian for driving this work. > > > > > > > > > > > > > > > > > Rate limiting is a common requirement, TBH, we should have > > > > supported > > > > > it > > > > > > > > in earlier stage, and the proposed design integrating it into the > > > > > source > > > > > > > > operator lifecycle, it is already able to meet the vast majority > > of > > > > > > > > scenarios, looks good from my side. > > > > > > > > > > > > > > > > Best, > > > > > > > > > Leonard > > > > > > > > > > > > > > > > > > > > > > > > > > 2025 7月 18 12:01,Zexian WU < > wz...@gmail.com> 写道: > > > > > > > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > I would like to start a discussion on a new Flink Improvement > > > > > > Proposal > > > > > > > > (FLIP), FLIP-535: Introduce RateLimiter to Source. > > > > > > > > > > The full proposal can be found on the wiki: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-535%3A+Introduce+RateLimiter+to+Source > > > > > > > > > > Motivation > > > > > > > > > > <https://cwiki.apache.org/confluence/display/FLINK/FLIP-535%3A+Introduce+RateLimiter+to+Source> > > > > > > > Motivation> > > > > > > >> > ; > > > > > > > > > > In many production environments, Flink sources read from > > shared > > > > > > > > external systems (like Kafka, Pulsar, or databases) where > > resources > > > > are > > > > > > > > limited. An uncontrolled data ingestion rate can lead to critical > > > > > issues: > > > > > > > > > > Resource Contention: A high-throughput Flink job can > > saturate a > > > > > > shared > > > > > > > > Kafka cluster's bandwidth, starving other essential services. > > > > > > > > > > System Instability: Aggressive polling from a database (e.g., > > > > > MySQL) > > > > > > > > can overwhelm its IOPS, degrading performance for transactional > > > > > queries. > > > > > > > > > > This proposal aims to provide a built-in, precise, and > > > > easy-to-use > > > > > > > > rate-limiting mechanism for Flink Sources to ensure system > > > stability > > > > > and > > > > > > > > fair resource sharing. > > > > > > > > > > Proposed Solution > > > > > > > > > > > > > > > > > > > The core idea is to integrate a flexible, record-based > > > > > rate-limiting > > > > > > > > mechanism directly into the SourceReaderBase, making it available > > > to > > > > > all > > > > > > > > connectors built on the new source interface. > > > > > > > > > > Key Changes: > > > > > > > > > > Seamless Integration via SourceReaderBase: > > > > > > > > > > New constructors accepting a RateLimiterStrategy will be > > added > > > > > > > > directly to SourceReaderBase. This allows any connector built on > > > the > > > > > > modern > > > > > > > > source interface (like Kafka or Pulsar) to enable rate limiting > > > with > > > > > > > > minimal code changes. > > > > > > > > > > Accurate, Post-Emission Throttling: > > > > > > > > > > To support sources with unpredictable batch sizes (e.g., > > > Kafka), > > > > > rate > > > > > > > > limiting is applied after records are emitted. The reader counts > > > the > > > > > > > > records after each recordEmitter.emitRecord method call, counts > > the > > > > > > > > records, and only then consults the RateLimiter. This ensures > > > > > throttling > > > > > > is > > > > > > > > based on the precise number of records processed. > > > > > > > > > > Fully Non-Blocking Operation: > > > > > > > > > > The entire mechanism is asynchronous and non-blocking. If a > > > rate > > > > > > limit > > > > > > > > is hit, the reader pauses by returning > > InputStatus.MORE_AVAILABLE. > > > > This > > > > > > > > yields control to the Flink task's event loop without blocking > > the > > > > > > > > processing thread, ensuring that critical operations like > > > > checkpointing > > > > > > are > > > > > > > > never delayed. > > > > > > > > > > Unified Configuration via SQL/Table API: > > > > > > > > > > Users can configure rate limits consistently across different > > > > > sources > > > > > > > > using a simple SQL table option, such as WITH ('scan.rate.limit' > > = > > > > > > '1000'). > > > > > > > > This provides a unified and user-friendly experience for pipeline > > > > > tuning. > > > > > > > > > > The design is backward-compatible, and existing custom > > sources > > > > will > > > > > > > > continue to work without any modification. > > > > > > > > > > I believe this feature will be a valuable addition for Flink > > > > users > > > > > > > > operating in complex, multi-tenant environments. I'm looking > > > forward > > > > to > > > > > > > > your feedback, suggestions, and any potential concerns you might > > > > have. > > > > > > > > > > Thanks, > > > > > > > > > > Zexian Wu > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >