Thanks Mickael.

> 1) if you set rate.limiters to MyRateLimiter you would
> not have to set these 2 configs.
>

True. That may be motivation to adopt Tom's suggestion re config.


> 2) Should we have a
> ByteRateLimiter?
>

That would be ideal, but it is difficult in practice, since ConnectRecord
doesn't have a concept of size. And what would the size be, anyway? The
true size over the wire is different for each source/sink system. We could
use the size of the Kafka record as a universal approximation, but that is
only indirectly related to the true size, esp if there are SMTs in play.

My thinking is that a good solution here would need to wait for a way for
Connectors to report custom metrics, including bytes in/out. Even so, we've
found that Connectors/Tasks often have no idea how many bytes are going
over the wire. So we'd need a solution that provides a default
approximation (e.g. Kafka record size) but let's connectors refine that, if
they are able.


> 3) what happens if multiple RateLimiters are enabled?
>

The impl waits for the max throttle time. We were using custom SMTs to
implement rate limiting, but delay from multiple such SMTs would be
additive. The idea behind the KIP is to have an SMT-like plug-in that would
not be additive.

4) reusing the transformation/config
> provider syntax for configurations may be nicer for consistency.
>

I have both approaches impl'd and had slight preference for the one
presented. Happy to defer to consensus.


> 6) Is the rate applied before transformations?
>

For source connectors, we should apply the limits before SMTs, and for sink
connectors, after. This way we get as close to the external systems as
possible, which is ultimately what we're trying to protect.

I don't recall how my POC handles this, but we'd need to get this right for
it to make sense.

Ryanne


> Thanks,
> Mickael
>
> On Fri, Jun 4, 2021 at 9:23 PM Ryanne Dolan <ryannedo...@gmail.com> wrote:
> >
> > Hey Tom, thanks for taking a look.
> >
> > > It's a bit weird that there's a separate start(Time) method
> >
> > Good call, I think we can use a second constructor instead.
> >
> > > No metrics for batch rates?
> >
> > Good call. TBH I assumed there would already be put/poll rates, but
> looking
> > again I don't see them. Will add to the KIP.
> >
> > > I think it might be nicer to have a consistent configuration mechanism
> >
> > I had previously implemented this as you propose (same as SMTs), but
> found
> > it to be a little heavy for the common use-cases. I didn't like how users
> > needed to specify the classnames in order to use the built-in rate
> limiters.
> >
> > But thinking again about this, if we include default values for
> > rate.limiters, rate.limiter.record.type, and rate.limiter.batch.type,
> we'd
> > get the same effect. Namely, most users would just need to
> > specify rate.limiter.record.limit or rate.limiter.batch.limit.
> >
> > So I think you're right -- the common use-cases don't necessarily suffer,
> > and custom rate limiters would definitely benefit. I'll fix.
> >
> > > hard.rate.limiters [..vs..] rate.limiters
> >
> > I think the difference may be immaterial. As implemented currently,
> > RecordRateLimiter and RecordBatchRateLimiter are very "soft" in that they
> > don't define a window of time in which a max number of records or batches
> > can be processed. Instead, they just tap the breaks when the
> instantaneous
> > rate is observed to be too high. But a "hard" rate limiter could be
> > implemented with the same interface, e.g. by sleeping until the end of
> the
> > current window.
> >
> > Ryanne
> >
> > On Fri, May 21, 2021 at 7:10 AM Tom Bentley <tbent...@redhat.com> wrote:
> >
> > > Hi Ryanne,
> > >
> > > Thanks for the KIP. I can see this would be useful.
> > >
> > > 1. Can you elaborate on the life cycle of the RateLimiter interface
> (in the
> > > Javadoc)? In particular it's not clear to me how calls to accumulate()
> and
> > > throttleTime() can be interleaved (I assume arbitrarily).
> > >
> > > 2. It's a bit weird that there's a separate start(Time) method in
> addition
> > > to the configure() inherited from Configurable. Perhaps passing the
> Time to
> > > accumulate() would be simpler than needing a two stage configuration
> step,
> > > even if it would be the same instance on every call. If start() really
> is
> > > needed you should document that it's called after configure().
> > >
> > > 3. Maybe including the unit in the method name, i.e. throttleTimeMs(),
> to
> > > avoid any ambiguity about how the result is interpreted?
> > >
> > > 4. The metrics: Are they windowed over some time period, if so, what?
> > >
> > > 5. No metrics for batch rates?
> > >
> > > 6. It doesn't seem to be stated, but I assume the throttle time used
> is the
> > > maximum of the throttleTime() returned by all the limiters.
> > >
> > > 7. The configuration uses a different mechanism than for SMTs and also
> > > requires to add three common configs (with a risk of collision with any
> > > connector which already defines configs with these names). I think it
> might
> > > be nicer to have a consistent configuration mechanism, so for example
> > >   rate.limiters=record,batch
> > >   rate.limiter.record.type=RecordRateLimiter
> > >   rate.limiter.record.limit=123
> > >   rate.limiter.batch.type=RecordBatchRateLimiter
> > >   rate.limiter.batch.limit=456
> > > This means there's only a single new common config, as the others
> depend on
> > > the aliases used, so further collisions can be avoided.
> > >
> > > 8. A cluster where every connector has a quota could end up being
> > > underutilised, yet a subset of connectors could be running at their
> limit.
> > > While this makes sense for the firehose problem it seems to be
> problematic
> > > for the noisy neighbour case, where the spare capacity could be shared
> > > between all the throttled tasks on the worker. While I'm not
> suggesting you
> > > need to implement this as part of the KIP, maybe the API could
> accommodate
> > > it being added later. Perhaps this could be as simple as using
> > > hard.rate.limiters rather than just rate.limiters, so that
> > > soft.rate.limiters could be added later, though maybe there are use
> cases
> > > where a single limiter needs to supply both soft and hard limits.
> > >
> > > Thanks again,
> > >
> > > Tom
> > >
> > > On Fri, May 14, 2021 at 6:26 PM Ryanne Dolan <ryannedo...@gmail.com>
> > > wrote:
> > >
> > > > Hey y'all, I've expanded the scope of this KIP slightly to include a
> > > > pluggable interface, RateLimiter.
> > > >
> > > > After implementing this a few different ways, it's clear that the
> > > > configuration story is actually simpler with a pluggable model.
> > > > Out-of-the-box, we have just two configuration properties to tweak:
> > > > record.rate.limit and record.batch.rate.limit (subj to change ofc).
> These
> > > > are provided by built-in RecordRateLimiter and RecordBatchRateLimiter
> > > > impls.
> > > >
> > > > From there, additional custom RateLimiters can be enabled with
> whatever
> > > > configuration they need. This is essentially the same pattern taken
> with
> > > > MetricsReporters and others.
> > > >
> > > > I had originally envisioned that the set of built-in limits would
> expand
> > > > over time, eg individual put/poll/commit/flush limits. However,
> these can
> > > > all be throttled adequately with the proposed API by limiting overall
> > > > record and batch thruput.
> > > >
> > > > Please let me know what you think. The voting thread is open.
> > > >
> > > > Ryanne
> > > >
> > > > On Fri, Apr 9, 2021, 1:41 PM Ryanne Dolan <ryannedo...@gmail.com>
> wrote:
> > > >
> > > > > Hey y'all, I'd like to draw you attention to a new KIP:
> > > > >
> > > > >
> > > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-731%3A+Record+Rate+Limiting+for+Kafka+Connect
> > > > >
> > > > > Lemme know what you think. Thanks!
> > > > >
> > > > > Ryanne
> > > > >
> > > >
> > >
>

Reply via email to