Hi Ryanne, Thanks for the KIP. I can see this would be useful.
1. Can you elaborate on the life cycle of the RateLimiter interface (in the Javadoc)? In particular it's not clear to me how calls to accumulate() and throttleTime() can be interleaved (I assume arbitrarily). 2. It's a bit weird that there's a separate start(Time) method in addition to the configure() inherited from Configurable. Perhaps passing the Time to accumulate() would be simpler than needing a two stage configuration step, even if it would be the same instance on every call. If start() really is needed you should document that it's called after configure(). 3. Maybe including the unit in the method name, i.e. throttleTimeMs(), to avoid any ambiguity about how the result is interpreted? 4. The metrics: Are they windowed over some time period, if so, what? 5. No metrics for batch rates? 6. It doesn't seem to be stated, but I assume the throttle time used is the maximum of the throttleTime() returned by all the limiters. 7. The configuration uses a different mechanism than for SMTs and also requires to add three common configs (with a risk of collision with any connector which already defines configs with these names). I think it might be nicer to have a consistent configuration mechanism, so for example rate.limiters=record,batch rate.limiter.record.type=RecordRateLimiter rate.limiter.record.limit=123 rate.limiter.batch.type=RecordBatchRateLimiter rate.limiter.batch.limit=456 This means there's only a single new common config, as the others depend on the aliases used, so further collisions can be avoided. 8. A cluster where every connector has a quota could end up being underutilised, yet a subset of connectors could be running at their limit. While this makes sense for the firehose problem it seems to be problematic for the noisy neighbour case, where the spare capacity could be shared between all the throttled tasks on the worker. While I'm not suggesting you need to implement this as part of the KIP, maybe the API could accommodate it being added later. Perhaps this could be as simple as using hard.rate.limiters rather than just rate.limiters, so that soft.rate.limiters could be added later, though maybe there are use cases where a single limiter needs to supply both soft and hard limits. Thanks again, Tom On Fri, May 14, 2021 at 6:26 PM Ryanne Dolan <ryannedo...@gmail.com> wrote: > Hey y'all, I've expanded the scope of this KIP slightly to include a > pluggable interface, RateLimiter. > > After implementing this a few different ways, it's clear that the > configuration story is actually simpler with a pluggable model. > Out-of-the-box, we have just two configuration properties to tweak: > record.rate.limit and record.batch.rate.limit (subj to change ofc). These > are provided by built-in RecordRateLimiter and RecordBatchRateLimiter > impls. > > From there, additional custom RateLimiters can be enabled with whatever > configuration they need. This is essentially the same pattern taken with > MetricsReporters and others. > > I had originally envisioned that the set of built-in limits would expand > over time, eg individual put/poll/commit/flush limits. However, these can > all be throttled adequately with the proposed API by limiting overall > record and batch thruput. > > Please let me know what you think. The voting thread is open. > > Ryanne > > On Fri, Apr 9, 2021, 1:41 PM Ryanne Dolan <ryannedo...@gmail.com> wrote: > > > Hey y'all, I'd like to draw you attention to a new KIP: > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-731%3A+Record+Rate+Limiting+for+Kafka+Connect > > > > Lemme know what you think. Thanks! > > > > Ryanne > > >