Hey

You should send a message to user-unsubscr...@flink.apache.org if you
want to unsubscribe the mail from u...@flink.apache.org.

You should send a message to dev-unsubscr...@flink.apache.org if you
want to unsubscribe the mail from dev@flink.apache.org.


如果需要取消订阅 u...@flink.apache.org 邮件组,请发送任意内容的邮件到
user-unsubscr...@flink.apache.org,取消订阅后不再收到邮件组发送的邮件

如果需要取消订阅 dev@flink.apache.org 邮件组,请发送任意内容的邮件到
dev-unsubscr...@flink.apache.org ,取消订阅后不再收到邮件组发送的邮件

Best,

Zexian

魏宪党 <1210798...@qq.com.invalid> 于2025年8月6日周三 09:17写道:

> Unsubscribe
>
>
> 魏宪党
> 1210798...@qq.com
>
>
>
>
>
>
>
>          原始邮件
>
>
> 发件人:Arvid Heise <ahe...@confluent.io.invalid&gt;
> 发件时间:2025年8月5日 17:48
> 收件人:dev <dev@flink.apache.org&gt;
> 主题:Re: Re: [DISCUSS] FLIP-535:Introduce RateLimiter to Source
>
>
>
>        Hi&nbsp;Zexian,
>
>
> the&nbsp;general&nbsp;idea&nbsp;and&nbsp;approach&nbsp;LGTM.&nbsp;A&nbsp;couple&nbsp;of&nbsp;questions:
>
> *&nbsp;If&nbsp;we&nbsp;don't&nbsp;want&nbsp;to&nbsp;provide&nbsp;RateLimiter&nbsp;on&nbsp;operator&nbsp;level&nbsp;(seems&nbsp;to&nbsp;be
>
> out&nbsp;of&nbsp;scope&nbsp;for&nbsp;this&nbsp;FLIP),&nbsp;can&nbsp;we&nbsp;still&nbsp;make&nbsp;the&nbsp;RateLimiter&nbsp;a&nbsp;property
>
> of&nbsp;the&nbsp;Source&nbsp;similar&nbsp;to&nbsp;Watermark&nbsp;strategy&nbsp;and&nbsp;pass&nbsp;it&nbsp;through&nbsp;the
>
> implementation?&nbsp;I'm&nbsp;guessing&nbsp;that&nbsp;we&nbsp;will&nbsp;then&nbsp;only&nbsp;have&nbsp;very&nbsp;coarse-grain
>
> control&nbsp;because&nbsp;it's&nbsp;basically&nbsp;implemented&nbsp;on&nbsp;SourceOperator&nbsp;and&nbsp;hence&nbsp;can
>
> limit&nbsp;batches&nbsp;only.&nbsp;The&nbsp;upside&nbsp;is&nbsp;that&nbsp;we&nbsp;don't&nbsp;need&nbsp;to&nbsp;touch&nbsp;each&nbsp;and
>
> every&nbsp;connector&nbsp;and&nbsp;get&nbsp;something&nbsp;"for&nbsp;free"&nbsp;that&nbsp;probably&nbsp;works&nbsp;good
> enough&nbsp;for&nbsp;many&nbsp;cases.
>
> *&nbsp;We&nbsp;could&nbsp;additionally&nbsp;introduce&nbsp;some&nbsp;kind&nbsp;of&nbsp;mixin&nbsp;to&nbsp;the&nbsp;SourceReader
>
> interface&nbsp;that&nbsp;would&nbsp;be&nbsp;able&nbsp;to&nbsp;push&nbsp;down&nbsp;the&nbsp;RateLimiter&nbsp;into&nbsp;the
>
> connector.&nbsp;That&nbsp;would&nbsp;enable&nbsp;fine-grain&nbsp;control&nbsp;as&nbsp;you&nbsp;proposed.
>
> *&nbsp;Note&nbsp;that&nbsp;each&nbsp;and&nbsp;every&nbsp;change&nbsp;to&nbsp;the&nbsp;public&nbsp;APIs&nbsp;will&nbsp;break&nbsp;connectors
>
> in&nbsp;a&nbsp;way&nbsp;(your&nbsp;migration&nbsp;section&nbsp;is&nbsp;a&nbsp;bit&nbsp;slim&nbsp;on&nbsp;that&nbsp;part).&nbsp;In&nbsp;this&nbsp;case,
>
> if&nbsp;you&nbsp;add&nbsp;a&nbsp;new&nbsp;overload&nbsp;to&nbsp;the&nbsp;ctor&nbsp;of&nbsp;SourceReaderBase,&nbsp;implementing
>
> connectors&nbsp;will&nbsp;not&nbsp;run&nbsp;on&nbsp;Flink&nbsp;2.1-&nbsp;anymore.&nbsp;Unfortunately,&nbsp;mixins&nbsp;have
>
> the&nbsp;same&nbsp;issue.&nbsp;I&nbsp;have&nbsp;yet&nbsp;to&nbsp;find&nbsp;a&nbsp;good&nbsp;solution&nbsp;to&nbsp;support&nbsp;features
>
> optionally,&nbsp;such&nbsp;that&nbsp;the&nbsp;source&nbsp;runs&nbsp;fine&nbsp;on&nbsp;older&nbsp;versions&nbsp;(without&nbsp;rate
>
> limiting)&nbsp;and&nbsp;on&nbsp;newer&nbsp;versions.&nbsp;I&nbsp;guess&nbsp;the&nbsp;only&nbsp;way&nbsp;would&nbsp;be&nbsp;some
>
> reflection&nbsp;magic&nbsp;and&nbsp;I'd&nbsp;rather&nbsp;have&nbsp;less&nbsp;of&nbsp;that&nbsp;in&nbsp;Flink&nbsp;than&nbsp;more.
>
> *&nbsp;I'm&nbsp;not&nbsp;sold&nbsp;on&nbsp;the&nbsp;idea&nbsp;that&nbsp;we&nbsp;need&nbsp;to&nbsp;pass&nbsp;notifyCheckpointAborted&nbsp;to
>
> the&nbsp;RateLimiter&nbsp;for&nbsp;the&nbsp;sources.&nbsp;Can&nbsp;you&nbsp;expand&nbsp;on&nbsp;why&nbsp;this&nbsp;is&nbsp;needed?
>
> Checkpoint&nbsp;barrier&nbsp;is&nbsp;injected&nbsp;differently&nbsp;into&nbsp;the&nbsp;sources&nbsp;than&nbsp;other
>
> operators.&nbsp;notifyCheckpointAborted&nbsp;is&nbsp;also&nbsp;not&nbsp;guaranteed&nbsp;to&nbsp;be&nbsp;called,&nbsp;so
>
> if&nbsp;there&nbsp;is&nbsp;a&nbsp;risk&nbsp;of&nbsp;deadlock,&nbsp;we&nbsp;should&nbsp;find&nbsp;other&nbsp;options.
>
> Best,
>
> Arvid
>
>
> On&nbsp;Tue,&nbsp;Aug&nbsp;5,&nbsp;2025&nbsp;at&nbsp;8:48 
> AM&nbsp;Jiangang&nbsp;Liu&nbsp;<
> liujiangangp...@gmail.com&gt;
> wrote:
>
> &gt;&nbsp;Thanks&nbsp;for&nbsp;your&nbsp;explanation.
>
> &gt;&nbsp;1.&nbsp;It&nbsp;makes&nbsp;sense&nbsp;to&nbsp;me&nbsp;to&nbsp;solve&nbsp;the&nbsp;imbalance&nbsp;in&nbsp;other&nbsp;issues.
>
> &gt;&nbsp;2.&nbsp;In&nbsp;fact,&nbsp;we&nbsp;support&nbsp;to&nbsp;adjust&nbsp;the&nbsp;source's&nbsp;qps&nbsp;dynamically&nbsp;in&nbsp;our&nbsp;inner
>
> &gt;&nbsp;flink.&nbsp;We&nbsp;can&nbsp;support&nbsp;this&nbsp;feature&nbsp;later.
> &gt;
> &gt;&nbsp;Best,
> &gt;&nbsp;Jiangang&nbsp;Liu
> &gt;
> &gt;&nbsp;Zexian&nbsp;WU&nbsp;<wzx3122351...@gmail.com
> &gt;&nbsp;于2025年8月5日周二&nbsp;14:35写道:
> &gt;
> &gt;&nbsp;&gt;&nbsp;Hi&nbsp;Jiangang,
> &gt;&nbsp;&gt;
>
> &gt;&nbsp;&gt;&nbsp;Thank&nbsp;you&nbsp;very&nbsp;much&nbsp;for&nbsp;your&nbsp;valuable&nbsp;feedback&nbsp;and&nbsp;insightful&nbsp;questions.
>
> &gt;&nbsp;&gt;&nbsp;The&nbsp;two&nbsp;scenarios&nbsp;you've&nbsp;distilled&nbsp;from&nbsp;real-world&nbsp;production&nbsp;experience
>
> &gt;&nbsp;&gt;&nbsp;are&nbsp;extremely&nbsp;helpful&nbsp;for&nbsp;improving&nbsp;this&nbsp;feature.
> &gt;&nbsp;&gt;
>
> &gt;&nbsp;&gt;&nbsp;1.&nbsp;Regarding&nbsp;the&nbsp;load&nbsp;imbalance&nbsp;issue&nbsp;between&nbsp;source&nbsp;tasks:
> &gt;&nbsp;&gt;
>
> &gt;&nbsp;&gt;&nbsp;Your&nbsp;point&nbsp;about&nbsp;load&nbsp;imbalance&nbsp;between&nbsp;source&nbsp;tasks&nbsp;potentially&nbsp;leading
> &gt;&nbsp;to
>
> &gt;&nbsp;&gt;&nbsp;a&nbsp;suboptimal&nbsp;rate-limiting&nbsp;effect&nbsp;is&nbsp;absolutely&nbsp;correct.&nbsp;Ideally,&nbsp;a
>
> &gt;&nbsp;&gt;&nbsp;mechanism&nbsp;that&nbsp;can&nbsp;dynamically&nbsp;allocate&nbsp;the&nbsp;rate&nbsp;based&nbsp;on&nbsp;the&nbsp;number&nbsp;of
>
> &gt;&nbsp;&gt;&nbsp;splits&nbsp;assigned&nbsp;to&nbsp;each&nbsp;subtask&nbsp;would&nbsp;certainly&nbsp;be&nbsp;a&nbsp;superior&nbsp;solution.
> &gt;&nbsp;&gt;
>
> &gt;&nbsp;&gt;&nbsp;In&nbsp;fact,&nbsp;in&nbsp;the&nbsp;current&nbsp;design,&nbsp;we've&nbsp;introduced&nbsp;the&nbsp;notifyAddingSplit
>
> &gt;&nbsp;&gt;&nbsp;interface,&nbsp;which&nbsp;provides&nbsp;the&nbsp;possibility&nbsp;for&nbsp;some&nbsp;level&nbsp;of&nbsp;local&nbsp;dynamic
>
> &gt;&nbsp;&gt;&nbsp;adjustment.&nbsp;However,&nbsp;to&nbsp;implement&nbsp;a&nbsp;globally&nbsp;unified,&nbsp;on-demand&nbsp;dynamic
>
> &gt;&nbsp;&gt;&nbsp;allocation,&nbsp;it&nbsp;would&nbsp;indeed&nbsp;require&nbsp;introducing&nbsp;a&nbsp;new&nbsp;communication
> &gt;&nbsp;channel
>
> &gt;&nbsp;&gt;&nbsp;between&nbsp;the&nbsp;SourceReader&nbsp;and&nbsp;the&nbsp;existing&nbsp;Coordinator,&nbsp;which&nbsp;would&nbsp;add
>
> &gt;&nbsp;&gt;&nbsp;implementation&nbsp;complexity&nbsp;and&nbsp;potential&nbsp;performance&nbsp;overhead.
> &gt;&nbsp;&gt;
>
> &gt;&nbsp;&gt;&nbsp;Furthermore,&nbsp;a&nbsp;more&nbsp;direct&nbsp;optimization&nbsp;is&nbsp;to&nbsp;have&nbsp;the&nbsp;Enumerator&nbsp;in&nbsp;the
>
> &gt;&nbsp;&gt;&nbsp;Coordinator&nbsp;distribute&nbsp;splits&nbsp;as&nbsp;evenly&nbsp;as&nbsp;possible.&nbsp;I've&nbsp;noticed&nbsp;that
> &gt;&nbsp;the
>
> &gt;&nbsp;&gt;&nbsp;community&nbsp;is&nbsp;already&nbsp;working&nbsp;on&nbsp;this&nbsp;with&nbsp;the&nbsp;ongoing&nbsp;FLIP-537:
> &gt;&nbsp;Enumerator
>
> &gt;&nbsp;&gt;&nbsp;with&nbsp;Global&nbsp;Split&nbsp;Assignment&nbsp;Distribution&nbsp;for&nbsp;Balanced&nbsp;Split&nbsp;assignment.
>
> &gt;&nbsp;&gt;&nbsp;Improving&nbsp;the&nbsp;split&nbsp;distribution&nbsp;logic&nbsp;seems&nbsp;like&nbsp;a&nbsp;simpler&nbsp;and&nbsp;more
> &gt;&nbsp;direct
>
> &gt;&nbsp;&gt;&nbsp;solution&nbsp;to&nbsp;the&nbsp;root&nbsp;problem&nbsp;than&nbsp;introducing&nbsp;a&nbsp;complex&nbsp;coordination
>
> &gt;&nbsp;&gt;&nbsp;mechanism&nbsp;just&nbsp;for&nbsp;rate&nbsp;limiting.&nbsp;Therefore,&nbsp;at&nbsp;this&nbsp;stage,&nbsp;we&nbsp;have&nbsp;opted
>
> &gt;&nbsp;&gt;&nbsp;for&nbsp;a&nbsp;simpler,&nbsp;self-contained&nbsp;approach&nbsp;as&nbsp;a&nbsp;robust&nbsp;first&nbsp;step.
> &gt;&nbsp;&gt;
> &gt;&nbsp;&gt;
> &gt;&nbsp;&gt;
> &gt;&nbsp;
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
> &gt;&nbsp;&gt
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment&gt;&nbsp;&gt>
> ;
>
> &gt;&nbsp;&gt;&nbsp;2.&nbsp;Regarding&nbsp;the&nbsp;generic,&nbsp;operator-level&nbsp;configuration:
> &gt;&nbsp;&gt;
>
> &gt;&nbsp;&gt;&nbsp;Regarding&nbsp;the&nbsp;generic&nbsp;configuration&nbsp;method&nbsp;you&nbsp;mentioned—using&nbsp;a
>
> &gt;&nbsp;&gt;&nbsp;ConfigOption<Map&gt;&nbsp;to&nbsp;configure&nbsp;QPS&nbsp;for&nbsp;any&nbsp;operator—that&nbsp;is&nbsp;indeed&nbsp;a&nbsp;very
> &gt;&nbsp;&gt;&nbsp;flexible&nbsp;and&nbsp;powerful&nbsp;design.
> &gt;&nbsp;&gt;
>
> &gt;&nbsp;&gt;&nbsp;I'd&nbsp;like&nbsp;to&nbsp;confirm&nbsp;with&nbsp;you:&nbsp;is&nbsp;the&nbsp;main&nbsp;purpose&nbsp;of&nbsp;your&nbsp;proposed
> &gt;&nbsp;solution
>
> &gt;&nbsp;&gt;&nbsp;also&nbsp;to&nbsp;enable&nbsp;dynamic&nbsp;rate&nbsp;limiting&nbsp;more&nbsp;conveniently&nbsp;(for&nbsp;example,
>
> &gt;&nbsp;&gt;&nbsp;adjusting&nbsp;the&nbsp;rate&nbsp;at&nbsp;runtime&nbsp;via&nbsp;external&nbsp;signals)?&nbsp;This&nbsp;FLIP&nbsp;currently
>
> &gt;&nbsp;&gt;&nbsp;focuses&nbsp;primarily&nbsp;on&nbsp;the&nbsp;source&nbsp;side,&nbsp;as&nbsp;it&nbsp;addresses&nbsp;the&nbsp;most&nbsp;immediate
>
> &gt;&nbsp;&gt;&nbsp;pain&nbsp;point.&nbsp;Your&nbsp;idea&nbsp;is&nbsp;very&nbsp;insightful,&nbsp;and&nbsp;if&nbsp;our&nbsp;goals&nbsp;are&nbsp;aligned,
> &gt;&nbsp;it
>
> &gt;&nbsp;&gt;&nbsp;serves&nbsp;as&nbsp;an&nbsp;excellent&nbsp;reference&nbsp;for&nbsp;the&nbsp;future&nbsp;evolution&nbsp;of&nbsp;this
> &gt;&nbsp;feature.
> &gt;&nbsp;&gt;
>
> &gt;&nbsp;&gt;&nbsp;Thanks&nbsp;again&nbsp;for&nbsp;your&nbsp;insightful&nbsp;contributions;&nbsp;they&nbsp;are&nbsp;crucial&nbsp;for
> &gt;&nbsp;&gt;&nbsp;refining&nbsp;and&nbsp;evolving&nbsp;this&nbsp;feature.
> &gt;&nbsp;&gt;
> &gt;&nbsp;&gt;&nbsp;Best,
> &gt;&nbsp;&gt;&nbsp;Zexian
> &gt;&nbsp;&gt;
> &gt;&nbsp;&gt;&nbsp;Jiangang&nbsp;Liu&nbsp;<liujiangangp...@gmail.com
> &gt;&nbsp;于2025年8月4日周一&nbsp;23:13写道:
> &gt;&nbsp;&gt;
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Hello,&nbsp;Zexian.&nbsp;This&nbsp;is&nbsp;a&nbsp;great&nbsp;work&nbsp;for&nbsp;job's&nbsp;stability&nbsp;when&nbsp;failover
> &gt;&nbsp;or
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;catching&nbsp;the&nbsp;lag.&nbsp;I&nbsp;just&nbsp;have&nbsp;some&nbsp;questions:
> &gt;&nbsp;&gt;&nbsp;&gt;
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&nbsp;&nbsp;&nbsp;1.&nbsp;For&nbsp;some&nbsp;source&nbsp;tasks,&nbsp;their&nbsp;consuming&nbsp;splits&nbsp;may&nbsp;be&nbsp;more&nbsp;than
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&nbsp;&nbsp;&nbsp;others.&nbsp;The&nbsp;simple&nbsp;calculation&nbsp;QPS&nbsp;based&nbsp;on&nbsp;parallelism&nbsp;may&nbsp;cause
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&nbsp;&nbsp;&nbsp;imbalance.&nbsp;Can&nbsp;we&nbsp;support&nbsp;a&nbsp;calculation&nbsp;based&nbsp;on&nbsp;the&nbsp;split&nbsp;number
> &gt;&nbsp;as&nbsp;a
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&nbsp;&nbsp;&nbsp;common&nbsp;way?
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&nbsp;&nbsp;&nbsp;2.&nbsp;In&nbsp;our&nbsp;company,&nbsp;we&nbsp;support&nbsp;to&nbsp;config&nbsp;each&nbsp;operator's&nbsp;qps&nbsp;by
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&nbsp;&nbsp;&nbsp;ConfigOption<Map&gt;.&nbsp;This&nbsp;is&nbsp;convenient&nbsp;if&nbsp;we&nbsp;can&nbsp;get&nbsp;the&nbsp;operatorId
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;easily
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&nbsp;&nbsp;&nbsp;before&nbsp;submitting.&nbsp;But&nbsp;in&nbsp;open&nbsp;source,&nbsp;it&nbsp;may&nbsp;be&nbsp;hard&nbsp;to&nbsp;do&nbsp;so.&nbsp;I
> &gt;&nbsp;&gt;&nbsp;wonder
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&nbsp;&nbsp;&nbsp;whether&nbsp;there&nbsp;exist&nbsp;similar&nbsp;way&nbsp;so&nbsp;that&nbsp;we&nbsp;do&nbsp;not&nbsp;need&nbsp;to&nbsp;implement
> &gt;&nbsp;&gt;&nbsp;each
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&nbsp;&nbsp;&nbsp;connector.
> &gt;&nbsp;&gt;&nbsp;&gt;
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Best,
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Jiangang&nbsp;Liu
> &gt;&nbsp;&gt;&nbsp;&gt;
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Zexian&nbsp;WU&nbsp;<wzx3122351...@gmail.com
> &gt;&nbsp;于2025年8月4日周一&nbsp;14:35写道:
> &gt;&nbsp;&gt;&nbsp;&gt;
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Hi&nbsp;Shengkai,
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Thank&nbsp;you&nbsp;for&nbsp;your&nbsp;thoughtful&nbsp;feedback&nbsp;and&nbsp;excellent&nbsp;questions.&nbsp;These
> &gt;&nbsp;&gt;&nbsp;are
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;all&nbsp;great&nbsp;points.&nbsp;Here&nbsp;are&nbsp;my&nbsp;thoughts:
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Regarding&nbsp;the&nbsp;requestSize&nbsp;parameter&nbsp;and&nbsp;its&nbsp;behavior:&nbsp;requestSize&nbsp;is
> &gt;&nbsp;&gt;&nbsp;not
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;a
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;request&nbsp;for&nbsp;permission&nbsp;but&nbsp;a&nbsp;post-facto&nbsp;count&nbsp;of&nbsp;records&nbsp;that&nbsp;have
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;already
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;been&nbsp;emitted.&nbsp;If&nbsp;this&nbsp;count&nbsp;(e.g.,&nbsp;3)&nbsp;exceeds&nbsp;the&nbsp;available&nbsp;rate
> &gt;&nbsp;credit
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;(e.g.,&nbsp;for&nbsp;2&nbsp;records),&nbsp;RateLimiter#acquire&nbsp;will&nbsp;return&nbsp;a
> &gt;&nbsp;&gt;&nbsp;CompletionStage
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;that&nbsp;completes&nbsp;in&nbsp;the&nbsp;future.&nbsp;This&nbsp;creates&nbsp;a&nbsp;non-blocking&nbsp;pause&nbsp;in
> &gt;&nbsp;the
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;SourceReader,&nbsp;allowing&nbsp;it&nbsp;to&nbsp;pay&nbsp;back&nbsp;the&nbsp;"time&nbsp;debt"&nbsp;and&nbsp;ensuring
> &gt;&nbsp;the
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;average&nbsp;rate&nbsp;is&nbsp;maintained.
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Regarding&nbsp;notifyCheckpointAborted:&nbsp;You&nbsp;are&nbsp;absolutely&nbsp;right;&nbsp;this&nbsp;is
> &gt;&nbsp;a
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;critical&nbsp;point&nbsp;for&nbsp;preventing&nbsp;deadlocks.&nbsp;The&nbsp;specific&nbsp;plan&nbsp;is&nbsp;to&nbsp;add
> &gt;&nbsp;a
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;notifyCheckpointAborted&nbsp;method&nbsp;to&nbsp;the&nbsp;RateLimiter&nbsp;interface.&nbsp;Then,
> &gt;&nbsp;&gt;&nbsp;within
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;the&nbsp;SourceReader&nbsp;implementation,&nbsp;I&nbsp;will&nbsp;handle&nbsp;the&nbsp;checkpoint&nbsp;abort
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;callback&nbsp;and&nbsp;invoke&nbsp;this&nbsp;new&nbsp;method&nbsp;to&nbsp;reset&nbsp;the&nbsp;rate&nbsp;limiter's
> &gt;&nbsp;&gt;&nbsp;internal
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;state.
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Regarding&nbsp;configuration&nbsp;and&nbsp;unsupported&nbsp;connectors:&nbsp;You've&nbsp;made&nbsp;an
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;excellent&nbsp;point&nbsp;about&nbsp;this&nbsp;being&nbsp;a&nbsp;connector-specific&nbsp;feature.
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;I&nbsp;agree&nbsp;that&nbsp;scan.rate-limit.record-per-second&nbsp;is&nbsp;a&nbsp;great&nbsp;name,&nbsp;and&nbsp;I
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;will
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;propose&nbsp;it&nbsp;as&nbsp;a&nbsp;best&nbsp;practice.&nbsp;I&nbsp;plan&nbsp;to&nbsp;use&nbsp;it&nbsp;in&nbsp;our&nbsp;initial
> &gt;&nbsp;&gt;&nbsp;reference
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;implementation,&nbsp;likely&nbsp;for&nbsp;the&nbsp;Kafka&nbsp;connector,&nbsp;and&nbsp;it&nbsp;can&nbsp;serve&nbsp;as&nbsp;a
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;reference&nbsp;for&nbsp;other&nbsp;connectors&nbsp;in&nbsp;the&nbsp;future.
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;You&nbsp;are&nbsp;correct&nbsp;that&nbsp;the&nbsp;decision&nbsp;to&nbsp;support&nbsp;this&nbsp;feature&nbsp;and&nbsp;how&nbsp;to
> &gt;&nbsp;&gt;&nbsp;name
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;the&nbsp;option&nbsp;lies&nbsp;with&nbsp;each&nbsp;connector.&nbsp;The&nbsp;framework&nbsp;only&nbsp;provides&nbsp;the
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;underlying&nbsp;mechanism&nbsp;(the&nbsp;RateLimiter&nbsp;integration).&nbsp;The&nbsp;mechanism&nbsp;for
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;providing&nbsp;clear&nbsp;error&nbsp;feedback&nbsp;works&nbsp;perfectly&nbsp;with&nbsp;this&nbsp;model:&nbsp;a
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;connector
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;that&nbsp;implements&nbsp;rate&nbsp;limiting&nbsp;will&nbsp;declare&nbsp;the&nbsp;option&nbsp;in&nbsp;its
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;DynamicTableFactory.&nbsp;Connectors&nbsp;that&nbsp;don't&nbsp;support&nbsp;it&nbsp;will&nbsp;not
> &gt;&nbsp;declare
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;the
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;option.&nbsp;Consequently,&nbsp;Flink’s&nbsp;built-in&nbsp;validation&nbsp;will&nbsp;automatically
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;throw
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;a&nbsp;ValidationException&nbsp;for&nbsp;an&nbsp;unknown&nbsp;option,&nbsp;providing&nbsp;the&nbsp;desired
> &gt;&nbsp;user
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;feedback.
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Best,
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Zexian
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Shengkai&nbsp;Fang&nbsp;<
> fskm...@gmail.com&gt;&nbsp;于2025年7月28日周一&nbsp;14:57写道:
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Hi,&nbsp;Zexian.
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Thanks&nbsp;for&nbsp;your&nbsp;FLIP.&nbsp;I&nbsp;think&nbsp;rate&nbsp;litmit&nbsp;is&nbsp;very&nbsp;important&nbsp;feature
> &gt;&nbsp;&gt;&nbsp;for
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;our
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;users.&nbsp;But&nbsp;I&nbsp;have&nbsp;some&nbsp;questions&nbsp;about&nbsp;the&nbsp;FLIP:
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;1.&nbsp;How&nbsp;do&nbsp;I&nbsp;determine&nbsp;the&nbsp;input&nbsp;parameter&nbsp;`requestSize`&nbsp;for
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;`RateLimiter#acquire`?&nbsp;If&nbsp;the&nbsp;rate&nbsp;limiter&nbsp;indicates&nbsp;there&nbsp;are&nbsp;2
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;remaining
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;requests&nbsp;that&nbsp;can&nbsp;be&nbsp;emitted,&nbsp;but&nbsp;the&nbsp;`requestSize`&nbsp;is&nbsp;3,&nbsp;what&nbsp;is
> &gt;&nbsp;the
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;behavior&nbsp;here?
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;2.&nbsp;CheckpointListener&nbsp;also&nbsp;has&nbsp;a&nbsp;method&nbsp;named
> &gt;&nbsp;&gt;&nbsp;notifyCheckpointAborted,
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;I
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;think&nbsp;RateLimiter&nbsp;also&nbsp;needs&nbsp;this.&nbsp;If&nbsp;the&nbsp;checkpoint&nbsp;aborts,&nbsp;please
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;clear
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;the&nbsp;status&nbsp;of&nbsp;the&nbsp;rate&nbsp;limiter.
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;3.&nbsp;I&nbsp;think&nbsp;`scan.rate-limit.record-per-second`&nbsp;is&nbsp;better&nbsp;than
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;`scan.rate.limit.record-per-second`.&nbsp;It&nbsp;seems&nbsp;only&nbsp;FLIP-27&nbsp;source
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;supports
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;rate-limit,&nbsp;it's&nbsp;better&nbsp;we&nbsp;can&nbsp;throw&nbsp;an&nbsp;exception&nbsp;to&nbsp;notify&nbsp;users
> &gt;&nbsp;if
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;the
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;source&nbsp;doesn't&nbsp;support&nbsp;this&nbsp;feature.
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Best,
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Shengkai
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Zexian&nbsp;WU&nbsp;<
> wzx3122351...@gmail.com&gt;&nbsp;于2025年7月28日周一&nbsp;11:52写道:
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Hi&nbsp;Leonard,
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Thanks&nbsp;a&nbsp;lot&nbsp;for&nbsp;your&nbsp;support&nbsp;and&nbsp;positive&nbsp;feedback!&nbsp;I'm&nbsp;glad&nbsp;to
> &gt;&nbsp;&gt;&nbsp;hear
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;you
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;think&nbsp;the&nbsp;design&nbsp;meets&nbsp;the&nbsp;needs&nbsp;of&nbsp;most&nbsp;scenarios.
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Indeed,&nbsp;rate&nbsp;limiting&nbsp;is&nbsp;a&nbsp;fundamental&nbsp;and&nbsp;important&nbsp;feature,&nbsp;and
> &gt;&nbsp;&gt;&nbsp;I'm
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;excited&nbsp;to&nbsp;help&nbsp;fill&nbsp;this&nbsp;gap.
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;I'm&nbsp;also&nbsp;looking&nbsp;forward&nbsp;to&nbsp;more&nbsp;ideas&nbsp;and&nbsp;potential&nbsp;improvements
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;from
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;other&nbsp;community&nbsp;members.
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Thanks&nbsp;again!
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Best,
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Zexian
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;On&nbsp;2025/07/24&nbsp;12:19:39&nbsp;Leonard&nbsp;Xu&nbsp;wrote:
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Thanks&nbsp;Zexian&nbsp;for&nbsp;driving&nbsp;this&nbsp;work.
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Rate&nbsp;limiting&nbsp;is&nbsp;a&nbsp;common&nbsp;requirement,&nbsp;TBH,&nbsp;we&nbsp;should&nbsp;have
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;supported
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;it
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;in&nbsp;earlier&nbsp;stage,&nbsp;and&nbsp;the&nbsp;proposed&nbsp;design&nbsp;integrating&nbsp;it&nbsp;into&nbsp;the
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;source
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;operator&nbsp;lifecycle,&nbsp;it&nbsp;is&nbsp;already&nbsp;able&nbsp;to&nbsp;meet&nbsp;the&nbsp;vast&nbsp;majority
> &gt;&nbsp;of
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;scenarios,&nbsp;looks&nbsp;good&nbsp;from&nbsp;my&nbsp;side.
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Best,
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Leonard
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;2025&nbsp;7月&nbsp;18&nbsp;12:01,Zexian&nbsp;WU&nbsp;<
> wz...@gmail.com&gt;&nbsp;写道:
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Hi&nbsp;everyone,
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;I&nbsp;would&nbsp;like&nbsp;to&nbsp;start&nbsp;a&nbsp;discussion&nbsp;on&nbsp;a&nbsp;new&nbsp;Flink&nbsp;Improvement
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Proposal
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;(FLIP),&nbsp;FLIP-535:&nbsp;Introduce&nbsp;RateLimiter&nbsp;to&nbsp;Source.
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;The&nbsp;full&nbsp;proposal&nbsp;can&nbsp;be&nbsp;found&nbsp;on&nbsp;the&nbsp;wiki:
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
> &gt;&nbsp;&gt;&nbsp;&gt;
> &gt;&nbsp;&gt;
> &gt;&nbsp;
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-535%3A+Introduce+RateLimiter+to+Source
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Motivation
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-535%3A+Introduce+RateLimiter+to+Source&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Motivation&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt>
> ;
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;In&nbsp;many&nbsp;production&nbsp;environments,&nbsp;Flink&nbsp;sources&nbsp;read&nbsp;from
> &gt;&nbsp;shared
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;external&nbsp;systems&nbsp;(like&nbsp;Kafka,&nbsp;Pulsar,&nbsp;or&nbsp;databases)&nbsp;where
> &gt;&nbsp;resources
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;are
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;limited.&nbsp;An&nbsp;uncontrolled&nbsp;data&nbsp;ingestion&nbsp;rate&nbsp;can&nbsp;lead&nbsp;to&nbsp;critical
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;issues:
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Resource&nbsp;Contention:&nbsp;A&nbsp;high-throughput&nbsp;Flink&nbsp;job&nbsp;can
> &gt;&nbsp;saturate&nbsp;a
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;shared
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Kafka&nbsp;cluster's&nbsp;bandwidth,&nbsp;starving&nbsp;other&nbsp;essential&nbsp;services.
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;System&nbsp;Instability:&nbsp;Aggressive&nbsp;polling&nbsp;from&nbsp;a&nbsp;database&nbsp;(e.g.,
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;MySQL)
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;can&nbsp;overwhelm&nbsp;its&nbsp;IOPS,&nbsp;degrading&nbsp;performance&nbsp;for&nbsp;transactional
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;queries.
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;This&nbsp;proposal&nbsp;aims&nbsp;to&nbsp;provide&nbsp;a&nbsp;built-in,&nbsp;precise,&nbsp;and
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;easy-to-use
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;rate-limiting&nbsp;mechanism&nbsp;for&nbsp;Flink&nbsp;Sources&nbsp;to&nbsp;ensure&nbsp;system
> &gt;&nbsp;&gt;&nbsp;stability
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;and
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;fair&nbsp;resource&nbsp;sharing.
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Proposed&nbsp;Solution
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;The&nbsp;core&nbsp;idea&nbsp;is&nbsp;to&nbsp;integrate&nbsp;a&nbsp;flexible,&nbsp;record-based
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;rate-limiting
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;mechanism&nbsp;directly&nbsp;into&nbsp;the&nbsp;SourceReaderBase,&nbsp;making&nbsp;it&nbsp;available
> &gt;&nbsp;&gt;&nbsp;to
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;all
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;connectors&nbsp;built&nbsp;on&nbsp;the&nbsp;new&nbsp;source&nbsp;interface.
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Key&nbsp;Changes:
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Seamless&nbsp;Integration&nbsp;via&nbsp;SourceReaderBase:
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;New&nbsp;constructors&nbsp;accepting&nbsp;a&nbsp;RateLimiterStrategy&nbsp;will&nbsp;be
> &gt;&nbsp;added
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;directly&nbsp;to&nbsp;SourceReaderBase.&nbsp;This&nbsp;allows&nbsp;any&nbsp;connector&nbsp;built&nbsp;on
> &gt;&nbsp;&gt;&nbsp;the
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;modern
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;source&nbsp;interface&nbsp;(like&nbsp;Kafka&nbsp;or&nbsp;Pulsar)&nbsp;to&nbsp;enable&nbsp;rate&nbsp;limiting
> &gt;&nbsp;&gt;&nbsp;with
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;minimal&nbsp;code&nbsp;changes.
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Accurate,&nbsp;Post-Emission&nbsp;Throttling:
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;To&nbsp;support&nbsp;sources&nbsp;with&nbsp;unpredictable&nbsp;batch&nbsp;sizes&nbsp;(e.g.,
> &gt;&nbsp;&gt;&nbsp;Kafka),
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;rate
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;limiting&nbsp;is&nbsp;applied&nbsp;after&nbsp;records&nbsp;are&nbsp;emitted.&nbsp;The&nbsp;reader&nbsp;counts
> &gt;&nbsp;&gt;&nbsp;the
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;records&nbsp;after&nbsp;each&nbsp;recordEmitter.emitRecord&nbsp;method&nbsp;call,&nbsp;counts
> &gt;&nbsp;the
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;records,&nbsp;and&nbsp;only&nbsp;then&nbsp;consults&nbsp;the&nbsp;RateLimiter.&nbsp;This&nbsp;ensures
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;throttling
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;is
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;based&nbsp;on&nbsp;the&nbsp;precise&nbsp;number&nbsp;of&nbsp;records&nbsp;processed.
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Fully&nbsp;Non-Blocking&nbsp;Operation:
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;The&nbsp;entire&nbsp;mechanism&nbsp;is&nbsp;asynchronous&nbsp;and&nbsp;non-blocking.&nbsp;If&nbsp;a
> &gt;&nbsp;&gt;&nbsp;rate
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;limit
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;is&nbsp;hit,&nbsp;the&nbsp;reader&nbsp;pauses&nbsp;by&nbsp;returning
> &gt;&nbsp;InputStatus.MORE_AVAILABLE.
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;This
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;yields&nbsp;control&nbsp;to&nbsp;the&nbsp;Flink&nbsp;task's&nbsp;event&nbsp;loop&nbsp;without&nbsp;blocking
> &gt;&nbsp;the
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;processing&nbsp;thread,&nbsp;ensuring&nbsp;that&nbsp;critical&nbsp;operations&nbsp;like
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;checkpointing
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;are
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;never&nbsp;delayed.
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Unified&nbsp;Configuration&nbsp;via&nbsp;SQL/Table&nbsp;API:
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Users&nbsp;can&nbsp;configure&nbsp;rate&nbsp;limits&nbsp;consistently&nbsp;across&nbsp;different
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;sources
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;using&nbsp;a&nbsp;simple&nbsp;SQL&nbsp;table&nbsp;option,&nbsp;such&nbsp;as&nbsp;WITH&nbsp;('scan.rate.limit'
> &gt;&nbsp;=
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;'1000').
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;This&nbsp;provides&nbsp;a&nbsp;unified&nbsp;and&nbsp;user-friendly&nbsp;experience&nbsp;for&nbsp;pipeline
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;tuning.
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;The&nbsp;design&nbsp;is&nbsp;backward-compatible,&nbsp;and&nbsp;existing&nbsp;custom
> &gt;&nbsp;sources
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;will
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;continue&nbsp;to&nbsp;work&nbsp;without&nbsp;any&nbsp;modification.
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;I&nbsp;believe&nbsp;this&nbsp;feature&nbsp;will&nbsp;be&nbsp;a&nbsp;valuable&nbsp;addition&nbsp;for&nbsp;Flink
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;users
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;operating&nbsp;in&nbsp;complex,&nbsp;multi-tenant&nbsp;environments.&nbsp;I'm&nbsp;looking
> &gt;&nbsp;&gt;&nbsp;forward
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;to
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;your&nbsp;feedback,&nbsp;suggestions,&nbsp;and&nbsp;any&nbsp;potential&nbsp;concerns&nbsp;you&nbsp;might
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;have.
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Thanks,
>
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Zexian&nbsp;Wu
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
> &gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
> &gt;&nbsp;&gt;&nbsp;&gt;
> &gt;&nbsp;&gt;
> &gt;

Reply via email to