Hi Ryanne,

Thanks for the KIP. I can see this would be useful.

1. Can you elaborate on the life cycle of the RateLimiter interface (in the
Javadoc)? In particular it's not clear to me how calls to accumulate() and
throttleTime() can be interleaved (I assume arbitrarily).

2. It's a bit weird that there's a separate start(Time) method in addition
to the configure() inherited from Configurable. Perhaps passing the Time to
accumulate() would be simpler than needing a two stage configuration step,
even if it would be the same instance on every call. If start() really is
needed you should document that it's called after configure().

3. Maybe including the unit in the method name, i.e. throttleTimeMs(), to
avoid any ambiguity about how the result is interpreted?

4. The metrics: Are they windowed over some time period, if so, what?

5. No metrics for batch rates?

6. It doesn't seem to be stated, but I assume the throttle time used is the
maximum of the throttleTime() returned by all the limiters.

7. The configuration uses a different mechanism than for SMTs and also
requires to add three common configs (with a risk of collision with any
connector which already defines configs with these names). I think it might
be nicer to have a consistent configuration mechanism, so for example
  rate.limiters=record,batch
  rate.limiter.record.type=RecordRateLimiter
  rate.limiter.record.limit=123
  rate.limiter.batch.type=RecordBatchRateLimiter
  rate.limiter.batch.limit=456
This means there's only a single new common config, as the others depend on
the aliases used, so further collisions can be avoided.

8. A cluster where every connector has a quota could end up being
underutilised, yet a subset of connectors could be running at their limit.
While this makes sense for the firehose problem it seems to be problematic
for the noisy neighbour case, where the spare capacity could be shared
between all the throttled tasks on the worker. While I'm not suggesting you
need to implement this as part of the KIP, maybe the API could accommodate
it being added later. Perhaps this could be as simple as using
hard.rate.limiters rather than just rate.limiters, so that
soft.rate.limiters could be added later, though maybe there are use cases
where a single limiter needs to supply both soft and hard limits.

Thanks again,

Tom

On Fri, May 14, 2021 at 6:26 PM Ryanne Dolan <ryannedo...@gmail.com> wrote:

> Hey y'all, I've expanded the scope of this KIP slightly to include a
> pluggable interface, RateLimiter.
>
> After implementing this a few different ways, it's clear that the
> configuration story is actually simpler with a pluggable model.
> Out-of-the-box, we have just two configuration properties to tweak:
> record.rate.limit and record.batch.rate.limit (subj to change ofc). These
> are provided by built-in RecordRateLimiter and RecordBatchRateLimiter
> impls.
>
> From there, additional custom RateLimiters can be enabled with whatever
> configuration they need. This is essentially the same pattern taken with
> MetricsReporters and others.
>
> I had originally envisioned that the set of built-in limits would expand
> over time, eg individual put/poll/commit/flush limits. However, these can
> all be throttled adequately with the proposed API by limiting overall
> record and batch thruput.
>
> Please let me know what you think. The voting thread is open.
>
> Ryanne
>
> On Fri, Apr 9, 2021, 1:41 PM Ryanne Dolan <ryannedo...@gmail.com> wrote:
>
> > Hey y'all, I'd like to draw you attention to a new KIP:
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-731%3A+Record+Rate+Limiting+for+Kafka+Connect
> >
> > Lemme know what you think. Thanks!
> >
> > Ryanne
> >
>

Reply via email to