Hi everyone, I would like to start a discussion on a new Flink Improvement Proposal (FLIP), FLIP-535: Introduce RateLimiter to Source. The full proposal can be found on the wiki: https://cwiki.apache.org/confluence/display/FLINK/FLIP-535%3A+Introduce+RateLimiter+to+Source Motivation
In many production environments, Flink sources read from shared external systems (like Kafka, Pulsar, or databases) where resources are limited. An uncontrolled data ingestion rate can lead to critical issues: Resource Contention: A high-throughput Flink job can saturate a shared Kafka cluster's bandwidth, starving other essential services. System Instability: Aggressive polling from a database (e.g., MySQL) can overwhelm its IOPS, degrading performance for transactional queries. This proposal aims to provide a built-in, precise, and easy-to-use rate-limiting mechanism for Flink Sources to ensure system stability and fair resource sharing. Proposed Solution The core idea is to integrate a flexible, record-based rate-limiting mechanism directly into the SourceReaderBase, making it available to all connectors built on the new source interface. Key Changes: Seamless Integration via SourceReaderBase: New constructors accepting a RateLimiterStrategy will be added directly to SourceReaderBase. This allows any connector built on the modern source interface (like Kafka or Pulsar) to enable rate limiting with minimal code changes. Accurate, Post-Emission Throttling: To support sources with unpredictable batch sizes (e.g., Kafka), rate limiting is applied after records are emitted. The reader counts the records after each recordEmitter.emitRecord method call, counts the records, and only then consults the RateLimiter. This ensures throttling is based on the precise number of records processed. Fully Non-Blocking Operation: The entire mechanism is asynchronous and non-blocking. If a rate limit is hit, the reader pauses by returning InputStatus.MORE_AVAILABLE. This yields control to the Flink task's event loop without blocking the processing thread, ensuring that critical operations like checkpointing are never delayed. Unified Configuration via SQL/Table API: Users can configure rate limits consistently across different sources using a simple SQL table option, such as WITH ('scan.rate.limit' = '1000'). This provides a unified and user-friendly experience for pipeline tuning. The design is backward-compatible, and existing custom sources will continue to work without any modification. I believe this feature will be a valuable addition for Flink users operating in complex, multi-tenant environments. I'm looking forward to your feedback, suggestions, and any potential concerns you might have. Thanks, Zexian Wu