Unsubscribe

魏宪党
1210798...@qq.com



        



         原始邮件
         
       
发件人:Zexian WU <wzx3122351...@gmail.com&gt;
发件时间:2025年8月5日 14:34
收件人:dev <dev@flink.apache.org&gt;
主题:Re: Re: [DISCUSS] FLIP-535:Introduce RateLimiter to Source



       Hi&nbsp;Jiangang,

Thank&nbsp;you&nbsp;very&nbsp;much&nbsp;for&nbsp;your&nbsp;valuable&nbsp;feedback&nbsp;and&nbsp;insightful&nbsp;questions.
The&nbsp;two&nbsp;scenarios&nbsp;you've&nbsp;distilled&nbsp;from&nbsp;real-world&nbsp;production&nbsp;experience
are&nbsp;extremely&nbsp;helpful&nbsp;for&nbsp;improving&nbsp;this&nbsp;feature.

1.&nbsp;Regarding&nbsp;the&nbsp;load&nbsp;imbalance&nbsp;issue&nbsp;between&nbsp;source&nbsp;tasks:

Your&nbsp;point&nbsp;about&nbsp;load&nbsp;imbalance&nbsp;between&nbsp;source&nbsp;tasks&nbsp;potentially&nbsp;leading&nbsp;to
a&nbsp;suboptimal&nbsp;rate-limiting&nbsp;effect&nbsp;is&nbsp;absolutely&nbsp;correct.&nbsp;Ideally,&nbsp;a
mechanism&nbsp;that&nbsp;can&nbsp;dynamically&nbsp;allocate&nbsp;the&nbsp;rate&nbsp;based&nbsp;on&nbsp;the&nbsp;number&nbsp;of
splits&nbsp;assigned&nbsp;to&nbsp;each&nbsp;subtask&nbsp;would&nbsp;certainly&nbsp;be&nbsp;a&nbsp;superior&nbsp;solution.

In&nbsp;fact,&nbsp;in&nbsp;the&nbsp;current&nbsp;design,&nbsp;we've&nbsp;introduced&nbsp;the&nbsp;notifyAddingSplit
interface,&nbsp;which&nbsp;provides&nbsp;the&nbsp;possibility&nbsp;for&nbsp;some&nbsp;level&nbsp;of&nbsp;local&nbsp;dynamic
adjustment.&nbsp;However,&nbsp;to&nbsp;implement&nbsp;a&nbsp;globally&nbsp;unified,&nbsp;on-demand&nbsp;dynamic
allocation,&nbsp;it&nbsp;would&nbsp;indeed&nbsp;require&nbsp;introducing&nbsp;a&nbsp;new&nbsp;communication&nbsp;channel
between&nbsp;the&nbsp;SourceReader&nbsp;and&nbsp;the&nbsp;existing&nbsp;Coordinator,&nbsp;which&nbsp;would&nbsp;add
implementation&nbsp;complexity&nbsp;and&nbsp;potential&nbsp;performance&nbsp;overhead.

Furthermore,&nbsp;a&nbsp;more&nbsp;direct&nbsp;optimization&nbsp;is&nbsp;to&nbsp;have&nbsp;the&nbsp;Enumerator&nbsp;in&nbsp;the
Coordinator&nbsp;distribute&nbsp;splits&nbsp;as&nbsp;evenly&nbsp;as&nbsp;possible.&nbsp;I've&nbsp;noticed&nbsp;that&nbsp;the
community&nbsp;is&nbsp;already&nbsp;working&nbsp;on&nbsp;this&nbsp;with&nbsp;the&nbsp;ongoing&nbsp;FLIP-537:&nbsp;Enumerator
with&nbsp;Global&nbsp;Split&nbsp;Assignment&nbsp;Distribution&nbsp;for&nbsp;Balanced&nbsp;Split&nbsp;assignment.
Improving&nbsp;the&nbsp;split&nbsp;distribution&nbsp;logic&nbsp;seems&nbsp;like&nbsp;a&nbsp;simpler&nbsp;and&nbsp;more&nbsp;direct
solution&nbsp;to&nbsp;the&nbsp;root&nbsp;problem&nbsp;than&nbsp;introducing&nbsp;a&nbsp;complex&nbsp;coordination
mechanism&nbsp;just&nbsp;for&nbsp;rate&nbsp;limiting.&nbsp;Therefore,&nbsp;at&nbsp;this&nbsp;stage,&nbsp;we&nbsp;have&nbsp;opted
for&nbsp;a&nbsp;simpler,&nbsp;self-contained&nbsp;approach&nbsp;as&nbsp;a&nbsp;robust&nbsp;first&nbsp;step.

https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment

2.&nbsp;Regarding&nbsp;the&nbsp;generic,&nbsp;operator-level&nbsp;configuration:

Regarding&nbsp;the&nbsp;generic&nbsp;configuration&nbsp;method&nbsp;you&nbsp;mentioned—using&nbsp;a
ConfigOption<Map&gt;&nbsp;to&nbsp;configure&nbsp;QPS&nbsp;for&nbsp;any&nbsp;operator—that&nbsp;is&nbsp;indeed&nbsp;a&nbsp;very
flexible&nbsp;and&nbsp;powerful&nbsp;design.

I'd&nbsp;like&nbsp;to&nbsp;confirm&nbsp;with&nbsp;you:&nbsp;is&nbsp;the&nbsp;main&nbsp;purpose&nbsp;of&nbsp;your&nbsp;proposed&nbsp;solution
also&nbsp;to&nbsp;enable&nbsp;dynamic&nbsp;rate&nbsp;limiting&nbsp;more&nbsp;conveniently&nbsp;(for&nbsp;example,
adjusting&nbsp;the&nbsp;rate&nbsp;at&nbsp;runtime&nbsp;via&nbsp;external&nbsp;signals)?&nbsp;This&nbsp;FLIP&nbsp;currently
focuses&nbsp;primarily&nbsp;on&nbsp;the&nbsp;source&nbsp;side,&nbsp;as&nbsp;it&nbsp;addresses&nbsp;the&nbsp;most&nbsp;immediate
pain&nbsp;point.&nbsp;Your&nbsp;idea&nbsp;is&nbsp;very&nbsp;insightful,&nbsp;and&nbsp;if&nbsp;our&nbsp;goals&nbsp;are&nbsp;aligned,&nbsp;it
serves&nbsp;as&nbsp;an&nbsp;excellent&nbsp;reference&nbsp;for&nbsp;the&nbsp;future&nbsp;evolution&nbsp;of&nbsp;this&nbsp;feature.

Thanks&nbsp;again&nbsp;for&nbsp;your&nbsp;insightful&nbsp;contributions;&nbsp;they&nbsp;are&nbsp;crucial&nbsp;for
refining&nbsp;and&nbsp;evolving&nbsp;this&nbsp;feature.

Best,
Zexian

Jiangang&nbsp;Liu&nbsp;<liujiangangp...@gmail.com&gt;&nbsp;于2025年8月4日周一&nbsp;23:13写道:

&gt;&nbsp;Hello,&nbsp;Zexian.&nbsp;This&nbsp;is&nbsp;a&nbsp;great&nbsp;work&nbsp;for&nbsp;job's&nbsp;stability&nbsp;when&nbsp;failover&nbsp;or
&gt;&nbsp;catching&nbsp;the&nbsp;lag.&nbsp;I&nbsp;just&nbsp;have&nbsp;some&nbsp;questions:
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;1.&nbsp;For&nbsp;some&nbsp;source&nbsp;tasks,&nbsp;their&nbsp;consuming&nbsp;splits&nbsp;may&nbsp;be&nbsp;more&nbsp;than
&gt;&nbsp;&nbsp;&nbsp;&nbsp;others.&nbsp;The&nbsp;simple&nbsp;calculation&nbsp;QPS&nbsp;based&nbsp;on&nbsp;parallelism&nbsp;may&nbsp;cause
&gt;&nbsp;&nbsp;&nbsp;&nbsp;imbalance.&nbsp;Can&nbsp;we&nbsp;support&nbsp;a&nbsp;calculation&nbsp;based&nbsp;on&nbsp;the&nbsp;split&nbsp;number&nbsp;as&nbsp;a
&gt;&nbsp;&nbsp;&nbsp;&nbsp;common&nbsp;way?
&gt;&nbsp;&nbsp;&nbsp;&nbsp;2.&nbsp;In&nbsp;our&nbsp;company,&nbsp;we&nbsp;support&nbsp;to&nbsp;config&nbsp;each&nbsp;operator's&nbsp;qps&nbsp;by
&gt;&nbsp;&nbsp;&nbsp;&nbsp;ConfigOption<Map&gt;.&nbsp;This&nbsp;is&nbsp;convenient&nbsp;if&nbsp;we&nbsp;can&nbsp;get&nbsp;the&nbsp;operatorId
&gt;&nbsp;easily
&gt;&nbsp;&nbsp;&nbsp;&nbsp;before&nbsp;submitting.&nbsp;But&nbsp;in&nbsp;open&nbsp;source,&nbsp;it&nbsp;may&nbsp;be&nbsp;hard&nbsp;to&nbsp;do&nbsp;so.&nbsp;I&nbsp;wonder
&gt;&nbsp;&nbsp;&nbsp;&nbsp;whether&nbsp;there&nbsp;exist&nbsp;similar&nbsp;way&nbsp;so&nbsp;that&nbsp;we&nbsp;do&nbsp;not&nbsp;need&nbsp;to&nbsp;implement&nbsp;each
&gt;&nbsp;&nbsp;&nbsp;&nbsp;connector.
&gt;
&gt;&nbsp;Best,
&gt;&nbsp;Jiangang&nbsp;Liu
&gt;
&gt;&nbsp;Zexian&nbsp;WU&nbsp;<wzx3122351...@gmail.com&gt;&nbsp;于2025年8月4日周一&nbsp;14:35写道:
&gt;
&gt;&nbsp;&gt;&nbsp;Hi&nbsp;Shengkai,
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;Thank&nbsp;you&nbsp;for&nbsp;your&nbsp;thoughtful&nbsp;feedback&nbsp;and&nbsp;excellent&nbsp;questions.&nbsp;These&nbsp;are
&gt;&nbsp;&gt;&nbsp;all&nbsp;great&nbsp;points.&nbsp;Here&nbsp;are&nbsp;my&nbsp;thoughts:
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;Regarding&nbsp;the&nbsp;requestSize&nbsp;parameter&nbsp;and&nbsp;its&nbsp;behavior:&nbsp;requestSize&nbsp;is&nbsp;not
&gt;&nbsp;a
&gt;&nbsp;&gt;&nbsp;request&nbsp;for&nbsp;permission&nbsp;but&nbsp;a&nbsp;post-facto&nbsp;count&nbsp;of&nbsp;records&nbsp;that&nbsp;have
&gt;&nbsp;already
&gt;&nbsp;&gt;&nbsp;been&nbsp;emitted.&nbsp;If&nbsp;this&nbsp;count&nbsp;(e.g.,&nbsp;3)&nbsp;exceeds&nbsp;the&nbsp;available&nbsp;rate&nbsp;credit
&gt;&nbsp;&gt;&nbsp;(e.g.,&nbsp;for&nbsp;2&nbsp;records),&nbsp;RateLimiter#acquire&nbsp;will&nbsp;return&nbsp;a&nbsp;CompletionStage
&gt;&nbsp;&gt;&nbsp;that&nbsp;completes&nbsp;in&nbsp;the&nbsp;future.&nbsp;This&nbsp;creates&nbsp;a&nbsp;non-blocking&nbsp;pause&nbsp;in&nbsp;the
&gt;&nbsp;&gt;&nbsp;SourceReader,&nbsp;allowing&nbsp;it&nbsp;to&nbsp;pay&nbsp;back&nbsp;the&nbsp;"time&nbsp;debt"&nbsp;and&nbsp;ensuring&nbsp;the
&gt;&nbsp;&gt;&nbsp;average&nbsp;rate&nbsp;is&nbsp;maintained.
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;Regarding&nbsp;notifyCheckpointAborted:&nbsp;You&nbsp;are&nbsp;absolutely&nbsp;right;&nbsp;this&nbsp;is&nbsp;a
&gt;&nbsp;&gt;&nbsp;critical&nbsp;point&nbsp;for&nbsp;preventing&nbsp;deadlocks.&nbsp;The&nbsp;specific&nbsp;plan&nbsp;is&nbsp;to&nbsp;add&nbsp;a
&gt;&nbsp;&gt;&nbsp;notifyCheckpointAborted&nbsp;method&nbsp;to&nbsp;the&nbsp;RateLimiter&nbsp;interface.&nbsp;Then,&nbsp;within
&gt;&nbsp;&gt;&nbsp;the&nbsp;SourceReader&nbsp;implementation,&nbsp;I&nbsp;will&nbsp;handle&nbsp;the&nbsp;checkpoint&nbsp;abort
&gt;&nbsp;&gt;&nbsp;callback&nbsp;and&nbsp;invoke&nbsp;this&nbsp;new&nbsp;method&nbsp;to&nbsp;reset&nbsp;the&nbsp;rate&nbsp;limiter's&nbsp;internal
&gt;&nbsp;&gt;&nbsp;state.
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;Regarding&nbsp;configuration&nbsp;and&nbsp;unsupported&nbsp;connectors:&nbsp;You've&nbsp;made&nbsp;an
&gt;&nbsp;&gt;&nbsp;excellent&nbsp;point&nbsp;about&nbsp;this&nbsp;being&nbsp;a&nbsp;connector-specific&nbsp;feature.
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;I&nbsp;agree&nbsp;that&nbsp;scan.rate-limit.record-per-second&nbsp;is&nbsp;a&nbsp;great&nbsp;name,&nbsp;and&nbsp;I
&gt;&nbsp;will
&gt;&nbsp;&gt;&nbsp;propose&nbsp;it&nbsp;as&nbsp;a&nbsp;best&nbsp;practice.&nbsp;I&nbsp;plan&nbsp;to&nbsp;use&nbsp;it&nbsp;in&nbsp;our&nbsp;initial&nbsp;reference
&gt;&nbsp;&gt;&nbsp;implementation,&nbsp;likely&nbsp;for&nbsp;the&nbsp;Kafka&nbsp;connector,&nbsp;and&nbsp;it&nbsp;can&nbsp;serve&nbsp;as&nbsp;a
&gt;&nbsp;&gt;&nbsp;reference&nbsp;for&nbsp;other&nbsp;connectors&nbsp;in&nbsp;the&nbsp;future.
&gt;&nbsp;&gt;&nbsp;You&nbsp;are&nbsp;correct&nbsp;that&nbsp;the&nbsp;decision&nbsp;to&nbsp;support&nbsp;this&nbsp;feature&nbsp;and&nbsp;how&nbsp;to&nbsp;name
&gt;&nbsp;&gt;&nbsp;the&nbsp;option&nbsp;lies&nbsp;with&nbsp;each&nbsp;connector.&nbsp;The&nbsp;framework&nbsp;only&nbsp;provides&nbsp;the
&gt;&nbsp;&gt;&nbsp;underlying&nbsp;mechanism&nbsp;(the&nbsp;RateLimiter&nbsp;integration).&nbsp;The&nbsp;mechanism&nbsp;for
&gt;&nbsp;&gt;&nbsp;providing&nbsp;clear&nbsp;error&nbsp;feedback&nbsp;works&nbsp;perfectly&nbsp;with&nbsp;this&nbsp;model:&nbsp;a
&gt;&nbsp;connector
&gt;&nbsp;&gt;&nbsp;that&nbsp;implements&nbsp;rate&nbsp;limiting&nbsp;will&nbsp;declare&nbsp;the&nbsp;option&nbsp;in&nbsp;its
&gt;&nbsp;&gt;&nbsp;DynamicTableFactory.&nbsp;Connectors&nbsp;that&nbsp;don't&nbsp;support&nbsp;it&nbsp;will&nbsp;not&nbsp;declare
&gt;&nbsp;the
&gt;&nbsp;&gt;&nbsp;option.&nbsp;Consequently,&nbsp;Flink’s&nbsp;built-in&nbsp;validation&nbsp;will&nbsp;automatically
&gt;&nbsp;throw
&gt;&nbsp;&gt;&nbsp;a&nbsp;ValidationException&nbsp;for&nbsp;an&nbsp;unknown&nbsp;option,&nbsp;providing&nbsp;the&nbsp;desired&nbsp;user
&gt;&nbsp;&gt;&nbsp;feedback.
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;Best,
&gt;&nbsp;&gt;&nbsp;Zexian
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;Shengkai&nbsp;Fang&nbsp;<fskm...@gmail.com&gt;&nbsp;于2025年7月28日周一&nbsp;14:57写道:
&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Hi,&nbsp;Zexian.
&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Thanks&nbsp;for&nbsp;your&nbsp;FLIP.&nbsp;I&nbsp;think&nbsp;rate&nbsp;litmit&nbsp;is&nbsp;very&nbsp;important&nbsp;feature&nbsp;for
&gt;&nbsp;&gt;&nbsp;our
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;users.&nbsp;But&nbsp;I&nbsp;have&nbsp;some&nbsp;questions&nbsp;about&nbsp;the&nbsp;FLIP:
&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;1.&nbsp;How&nbsp;do&nbsp;I&nbsp;determine&nbsp;the&nbsp;input&nbsp;parameter&nbsp;`requestSize`&nbsp;for
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;`RateLimiter#acquire`?&nbsp;If&nbsp;the&nbsp;rate&nbsp;limiter&nbsp;indicates&nbsp;there&nbsp;are&nbsp;2
&gt;&nbsp;&gt;&nbsp;remaining
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;requests&nbsp;that&nbsp;can&nbsp;be&nbsp;emitted,&nbsp;but&nbsp;the&nbsp;`requestSize`&nbsp;is&nbsp;3,&nbsp;what&nbsp;is&nbsp;the
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;behavior&nbsp;here?
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;2.&nbsp;CheckpointListener&nbsp;also&nbsp;has&nbsp;a&nbsp;method&nbsp;named&nbsp;notifyCheckpointAborted,
&gt;&nbsp;I
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;think&nbsp;RateLimiter&nbsp;also&nbsp;needs&nbsp;this.&nbsp;If&nbsp;the&nbsp;checkpoint&nbsp;aborts,&nbsp;please
&gt;&nbsp;clear
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;the&nbsp;status&nbsp;of&nbsp;the&nbsp;rate&nbsp;limiter.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;3.&nbsp;I&nbsp;think&nbsp;`scan.rate-limit.record-per-second`&nbsp;is&nbsp;better&nbsp;than
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;`scan.rate.limit.record-per-second`.&nbsp;It&nbsp;seems&nbsp;only&nbsp;FLIP-27&nbsp;source
&gt;&nbsp;&gt;&nbsp;supports
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;rate-limit,&nbsp;it's&nbsp;better&nbsp;we&nbsp;can&nbsp;throw&nbsp;an&nbsp;exception&nbsp;to&nbsp;notify&nbsp;users&nbsp;if
&gt;&nbsp;the
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;source&nbsp;doesn't&nbsp;support&nbsp;this&nbsp;feature.
&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Best,
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Shengkai
&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Zexian&nbsp;WU&nbsp;<wzx3122351...@gmail.com&gt;&nbsp;于2025年7月28日周一&nbsp;11:52写道:
&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Hi&nbsp;Leonard,
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Thanks&nbsp;a&nbsp;lot&nbsp;for&nbsp;your&nbsp;support&nbsp;and&nbsp;positive&nbsp;feedback!&nbsp;I'm&nbsp;glad&nbsp;to&nbsp;hear
&gt;&nbsp;&gt;&nbsp;you
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;think&nbsp;the&nbsp;design&nbsp;meets&nbsp;the&nbsp;needs&nbsp;of&nbsp;most&nbsp;scenarios.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Indeed,&nbsp;rate&nbsp;limiting&nbsp;is&nbsp;a&nbsp;fundamental&nbsp;and&nbsp;important&nbsp;feature,&nbsp;and&nbsp;I'm
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;excited&nbsp;to&nbsp;help&nbsp;fill&nbsp;this&nbsp;gap.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;I'm&nbsp;also&nbsp;looking&nbsp;forward&nbsp;to&nbsp;more&nbsp;ideas&nbsp;and&nbsp;potential&nbsp;improvements
&gt;&nbsp;from
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;other&nbsp;community&nbsp;members.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Thanks&nbsp;again!
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Best,
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Zexian
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;On&nbsp;2025/07/24&nbsp;12:19:39&nbsp;Leonard&nbsp;Xu&nbsp;wrote:
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Thanks&nbsp;Zexian&nbsp;for&nbsp;driving&nbsp;this&nbsp;work.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Rate&nbsp;limiting&nbsp;is&nbsp;a&nbsp;common&nbsp;requirement,&nbsp;TBH,&nbsp;we&nbsp;should&nbsp;have
&gt;&nbsp;supported
&gt;&nbsp;&gt;&nbsp;it
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;in&nbsp;earlier&nbsp;stage,&nbsp;and&nbsp;the&nbsp;proposed&nbsp;design&nbsp;integrating&nbsp;it&nbsp;into&nbsp;the
&gt;&nbsp;&gt;&nbsp;source
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;operator&nbsp;lifecycle,&nbsp;it&nbsp;is&nbsp;already&nbsp;able&nbsp;to&nbsp;meet&nbsp;the&nbsp;vast&nbsp;majority&nbsp;of
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;scenarios,&nbsp;looks&nbsp;good&nbsp;from&nbsp;my&nbsp;side.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Best,
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Leonard
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;2025&nbsp;7月&nbsp;18&nbsp;12:01,Zexian&nbsp;WU&nbsp;<wz...@gmail.com&gt;&nbsp;写道:
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Hi&nbsp;everyone,
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;I&nbsp;would&nbsp;like&nbsp;to&nbsp;start&nbsp;a&nbsp;discussion&nbsp;on&nbsp;a&nbsp;new&nbsp;Flink&nbsp;Improvement
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Proposal
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;(FLIP),&nbsp;FLIP-535:&nbsp;Introduce&nbsp;RateLimiter&nbsp;to&nbsp;Source.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;The&nbsp;full&nbsp;proposal&nbsp;can&nbsp;be&nbsp;found&nbsp;on&nbsp;the&nbsp;wiki:
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;
&gt;&nbsp;https://cwiki.apache.org/confluence/display/FLINK/FLIP-535%3A+Introduce+RateLimiter+to+Source
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Motivation
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;In&nbsp;many&nbsp;production&nbsp;environments,&nbsp;Flink&nbsp;sources&nbsp;read&nbsp;from&nbsp;shared
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;external&nbsp;systems&nbsp;(like&nbsp;Kafka,&nbsp;Pulsar,&nbsp;or&nbsp;databases)&nbsp;where&nbsp;resources
&gt;&nbsp;are
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;limited.&nbsp;An&nbsp;uncontrolled&nbsp;data&nbsp;ingestion&nbsp;rate&nbsp;can&nbsp;lead&nbsp;to&nbsp;critical
&gt;&nbsp;&gt;&nbsp;issues:
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Resource&nbsp;Contention:&nbsp;A&nbsp;high-throughput&nbsp;Flink&nbsp;job&nbsp;can&nbsp;saturate&nbsp;a
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;shared
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Kafka&nbsp;cluster's&nbsp;bandwidth,&nbsp;starving&nbsp;other&nbsp;essential&nbsp;services.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;System&nbsp;Instability:&nbsp;Aggressive&nbsp;polling&nbsp;from&nbsp;a&nbsp;database&nbsp;(e.g.,
&gt;&nbsp;&gt;&nbsp;MySQL)
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;can&nbsp;overwhelm&nbsp;its&nbsp;IOPS,&nbsp;degrading&nbsp;performance&nbsp;for&nbsp;transactional
&gt;&nbsp;&gt;&nbsp;queries.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;This&nbsp;proposal&nbsp;aims&nbsp;to&nbsp;provide&nbsp;a&nbsp;built-in,&nbsp;precise,&nbsp;and
&gt;&nbsp;easy-to-use
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;rate-limiting&nbsp;mechanism&nbsp;for&nbsp;Flink&nbsp;Sources&nbsp;to&nbsp;ensure&nbsp;system&nbsp;stability
&gt;&nbsp;&gt;&nbsp;and
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;fair&nbsp;resource&nbsp;sharing.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Proposed&nbsp;Solution
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;The&nbsp;core&nbsp;idea&nbsp;is&nbsp;to&nbsp;integrate&nbsp;a&nbsp;flexible,&nbsp;record-based
&gt;&nbsp;&gt;&nbsp;rate-limiting
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;mechanism&nbsp;directly&nbsp;into&nbsp;the&nbsp;SourceReaderBase,&nbsp;making&nbsp;it&nbsp;available&nbsp;to
&gt;&nbsp;&gt;&nbsp;all
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;connectors&nbsp;built&nbsp;on&nbsp;the&nbsp;new&nbsp;source&nbsp;interface.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Key&nbsp;Changes:
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Seamless&nbsp;Integration&nbsp;via&nbsp;SourceReaderBase:
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;New&nbsp;constructors&nbsp;accepting&nbsp;a&nbsp;RateLimiterStrategy&nbsp;will&nbsp;be&nbsp;added
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;directly&nbsp;to&nbsp;SourceReaderBase.&nbsp;This&nbsp;allows&nbsp;any&nbsp;connector&nbsp;built&nbsp;on&nbsp;the
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;modern
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;source&nbsp;interface&nbsp;(like&nbsp;Kafka&nbsp;or&nbsp;Pulsar)&nbsp;to&nbsp;enable&nbsp;rate&nbsp;limiting&nbsp;with
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;minimal&nbsp;code&nbsp;changes.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Accurate,&nbsp;Post-Emission&nbsp;Throttling:
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;To&nbsp;support&nbsp;sources&nbsp;with&nbsp;unpredictable&nbsp;batch&nbsp;sizes&nbsp;(e.g.,&nbsp;Kafka),
&gt;&nbsp;&gt;&nbsp;rate
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;limiting&nbsp;is&nbsp;applied&nbsp;after&nbsp;records&nbsp;are&nbsp;emitted.&nbsp;The&nbsp;reader&nbsp;counts&nbsp;the
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;records&nbsp;after&nbsp;each&nbsp;recordEmitter.emitRecord&nbsp;method&nbsp;call,&nbsp;counts&nbsp;the
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;records,&nbsp;and&nbsp;only&nbsp;then&nbsp;consults&nbsp;the&nbsp;RateLimiter.&nbsp;This&nbsp;ensures
&gt;&nbsp;&gt;&nbsp;throttling
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;is
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;based&nbsp;on&nbsp;the&nbsp;precise&nbsp;number&nbsp;of&nbsp;records&nbsp;processed.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Fully&nbsp;Non-Blocking&nbsp;Operation:
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;The&nbsp;entire&nbsp;mechanism&nbsp;is&nbsp;asynchronous&nbsp;and&nbsp;non-blocking.&nbsp;If&nbsp;a&nbsp;rate
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;limit
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;is&nbsp;hit,&nbsp;the&nbsp;reader&nbsp;pauses&nbsp;by&nbsp;returning&nbsp;InputStatus.MORE_AVAILABLE.
&gt;&nbsp;This
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;yields&nbsp;control&nbsp;to&nbsp;the&nbsp;Flink&nbsp;task's&nbsp;event&nbsp;loop&nbsp;without&nbsp;blocking&nbsp;the
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;processing&nbsp;thread,&nbsp;ensuring&nbsp;that&nbsp;critical&nbsp;operations&nbsp;like
&gt;&nbsp;checkpointing
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;are
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;never&nbsp;delayed.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Unified&nbsp;Configuration&nbsp;via&nbsp;SQL/Table&nbsp;API:
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Users&nbsp;can&nbsp;configure&nbsp;rate&nbsp;limits&nbsp;consistently&nbsp;across&nbsp;different
&gt;&nbsp;&gt;&nbsp;sources
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;using&nbsp;a&nbsp;simple&nbsp;SQL&nbsp;table&nbsp;option,&nbsp;such&nbsp;as&nbsp;WITH&nbsp;('scan.rate.limit'&nbsp;=
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;'1000').
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;This&nbsp;provides&nbsp;a&nbsp;unified&nbsp;and&nbsp;user-friendly&nbsp;experience&nbsp;for&nbsp;pipeline
&gt;&nbsp;&gt;&nbsp;tuning.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;The&nbsp;design&nbsp;is&nbsp;backward-compatible,&nbsp;and&nbsp;existing&nbsp;custom&nbsp;sources
&gt;&nbsp;will
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;continue&nbsp;to&nbsp;work&nbsp;without&nbsp;any&nbsp;modification.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;I&nbsp;believe&nbsp;this&nbsp;feature&nbsp;will&nbsp;be&nbsp;a&nbsp;valuable&nbsp;addition&nbsp;for&nbsp;Flink
&gt;&nbsp;users
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;operating&nbsp;in&nbsp;complex,&nbsp;multi-tenant&nbsp;environments.&nbsp;I'm&nbsp;looking&nbsp;forward
&gt;&nbsp;to
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;your&nbsp;feedback,&nbsp;suggestions,&nbsp;and&nbsp;any&nbsp;potential&nbsp;concerns&nbsp;you&nbsp;might
&gt;&nbsp;have.
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Thanks,
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;Zexian&nbsp;Wu
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;&nbsp;&gt;
&gt;&nbsp;&gt;
&gt;

Reply via email to