Hi Girish,

replies inline.

On Thu, 9 Nov 2023 at 00:29, Girish Sharma <scrapmachi...@gmail.com> wrote:
> While dual-rate dual token bucket looks promising, there is still some
> challenge with respect to allowing a certain peak burst for/up to a bigger
> duration. I am explaining it below:

> Assume a 10MBps topic. Bursting support of 1.5x upto 2 minutes, once every
> 10 minute interval.

It's possible to have many ways to model a dual token buckets.
When there are tokens in the bucket, they are consumed as fast as
possible. This is why there is a need for the second token bucket
which is used to rate limit the traffic to the absolute maximum rate.
Technically the second bucket rate limits the average rate for a short
time window.

I'd pick the first bucket for handling the 10MB rate.
The capacity of the first bucket would be 15MB * 120=1800MB. The fill
would happen in special way. I'm not sure if Bucket4J has this at all.
So describing the way of adding tokens to the bucket: the tokens in
the bucket would remain the same when the rate is <10MB. As many
tokens would be added to the bucket as are consumed by the actual
traffic. The left over tokens 10MB - actual rate would go to a
separate filling bucket that gets poured into the actual bucket every
10 minutes.
This first bucket with this separate "filling bucket" would handle the
bursting up to 1800MB.
The second bucket would solely enforce the 1.5x limit of 15MB rate
with a small capacity bucket which enforces the average rate for a
short time window.
There's one nuance here. The bursting support will only allow bursting
if the average rate has been lower than 10MBps for the tokens to use
for the bursting to be usable.
It would be possible that for example 50% of the tokens would be
immediately available and 50% of the tokens are made available in the
"filling bucket" that gets poured into the actual bucket every 10
minutes. Without having some way to earn the burst, I don't think that
there's a reasonable way to make things usable. The 10MB limit
wouldn't have an actual meaning unless that is used to "earn" the
tokens to be used for the burst.

One other detail of topic publishing throttling in Pulsar is that the
actual throttling happens after the limit has been exceeded.
This is due to the fact that Pulsar's network handling uses Netty
where you cannot block. When using the token bucket concepts,
the tokens are always first consumed and after that there's a chance
to pause message publishing.
In code, you can find this at
https://github.com/apache/pulsar/blob/c0eec1e46edeb46c888fa28f27b199ea7e7a1574/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1794
and digging down from there.

In the current rate limiters in Pulsar, the implementation is not
optimized to how Pulsar uses rate limiting. There's no need to use a
scheduler for adding "permits" as it's called in the current rate
limiter. The new tokens to add can be calculated based on the elapsed
time. Resuming from the blocking state (auto read disabled -> enabled)
requires a scheduler. The duration of the pause should be calculated
based on the rate and the average message size. Because of the nature
of asynchronous rate limiting in Pulsar topic publish throttling, the
tokens can go to a negative value and it also does. The calculation of
the pause could also take this into count. The rate limiting will be
very accurate and efficient in this way since the scheduler will only
be needed when the token bucket runs out of tokens and there's really
a need to throttle. This is the change that I would do to the current
implementation in an experiment and see how things behave with the
revisited solution. Eliminating the precise rate limiter and having
just a single rate limiter would be part of this.
I think that the code base has multiple usages of the rate limiter.
The dispatching rate limiting might require some variation. IIRC, Rate
limiting is also used for some other reasons in the code base in a
blocking manner. For example, unloading of bundles is rate limited.
Working on the code base will reveal this.

> While the number of events in the system topic would be fairly low per
> namespace, the issue is that this system topic lies on the same broker
> where the actual topic/partitions exist and those partitions are leading to
> degradation of this particular broker.

Yes, that could be possible but perhaps unlikely.

> Agreed, there is a challenge, as it's not as straightforward as I've
> demonstrated in the example above.

Yes, it might require some rounds of experimentation. Although in
general, I think it's a fairly straightforward problem to solve as
long as the requirements could be adapted to some small details that
make sense for bursting in the context of rate limiting. The detail is
that the long time average rate shouldn't go over the configured rate
even with the bursts. That's why the tokens usable for the burst
should be "earned". I'm not sure if it even is necessary to enforce
that a burst can happen once in every 10 minutes. Why does it matter
if the average rate limit is controlled?

> There is clearly a gap with respect to understanding what the common
> requirements are. I am here speaking only of my requirements. Some of these
> may contradict requirements of others who haven't spoken up so far. Would
> we be changing the rate limiter once again when someone else comes up with

I'll just repeat that Pulsar isn't that special that it would require
very special rate limiting. The basics should be sufficient like it's
for most of other similar projects.

> new requirements? For instance, the "exponential decay" mentioned by Rajan
> is completely different from any token based approach.

"exponential decay" is usually mentioned in the context of rate
limiting retries. for exampl, the exponential back off.
I'd like to see examples of "exponential decay" in the context of
actual messaging / traffic rate limiting before we FOMO about this.

> I think there is a need for a new protocol message from server to client
> indicating rate-limiting (say, a 429 response, with an optional
> retry-after). That should handle a lot of these things. For instance, we
> can then build configurable logic in the client to stop producing all
> together after a sufficient number of 429 received from the server. There
> can also be a "BLOCKED" protocol that the server could indicate to the
> client.

Yes. I think Kafka has something like that. It's mentioned in the
Kafka Quotas [1] doc.
I believe that the needed explicit producer flow control could cover
this. In Pulsar, we need to add explicit producer flow control to
address the connection multiplexing problem.

1 - https://docs.confluent.io/kafka/design/quotas.html

> From our experience, inferring these things via client-broker clock sync
> assumption would not hold true. We regularly notice clock differences
> between clients and brokers in our setup.

The Pulsar binary protocol could have a way to calculate the
approximate clock difference between the client and the broker so that
end-to-end latencies could be calculated on the broker side from
client side timestamps.

> When you say - adding a permit-based flow control - even if this is
> implemented, multiplexing is still an issue as the tcp/ip channel itself is
> put on pause at the netty level. Is there any other way of rate limiting
> and rejecting packets from a channel selectively so as to contain the rate
> limiting effect only to the specific partition out of all the partitions
> being shared in the channel?

The TCP/IP channel wouldn't be put on pause in that solution as the
primary way to handle flow control.
Yes, there are alternatives. It's the permit-based flow control. The
server would send out manageable amount of permits at a time. There
are challenges in making this work efficiently with low amounts of
memory. That's one of the tradeoffs.

> When I was doing poller vs precise testing wrt CPU, network, broker
> latencies etc, one of the reason precise was much more CPU efficient and
> had minimal impact on broker latencies was due to the fact that the netty
> channel was being paused precisely.

Yes. I have similar experiences and this has also been reported. btw.
precise rate limiter had some bugs in the past, but there were some
contributions from the community that made it useable. I also
contributed to that are in reviews and some minor PRs. One of the big
challenges to overcome was to understand how the solution works.
That's why I think that the concepts should match common reference
points so that it would be maintainable.

> > control for producers and revisiting the broker side backpressure/flow
> > control and therefore it won't happen quickly.
> > Please go ahead and create a GH issue and share your context. That
> > will be very helpful.
> >
> >
> I will do so. Does this need a PIP? To reiterate, I will be opening a GH
> issue on the lines of "don't share connection to the broker across producer
> objects"

I think it could start with an issue that describes the problem. The
PIP is eventually needed to add the new option to the producer builder
and consumer builder interfaces.


> Are you suggesting that we first go ahead and convert the rate limiter in
> pulsar to a simple, single-token based approach?
> I personally do not see any benefit in this apart from code refactoring.
> The precise rate limiter is basically doing that already- all be it
> refilling only every 1 second, rather than distributing the tokens across
> that second.

I have described the benefits in some former comments. The code is not
maintainable at the moment other than by the people that have worked
with the code. Another point is getting rid of the separate "precise"
option. That leaks implementation details all the way to Pulsar users
that don't need to know what a "precise rate limiter" is. The default
Pulsar core rate limiter should do what it promises to do. That would
be the starting point to match the existing feature set. After that
it's possible to make it the dual token bucket internally so that it's
possible to set a separate max rate since when the bucket is larger,
the bursts don't have a limit in a single token bucket (where the
bucket has a large capacity, therefore allows bursts when traffic
temporarily drops below configured rate). Yes, the precise rate
limiter is something like having a bucket capacity for 1 second and
that's why it doesn't allow bursts.

> > algorithm is. That could be a starting point until we start covering
> > the advanced cases which require a dual token bucket algorithm.
> > If someone has the bandwidth, it would be fine to start experimenting
> > in this area with code to learn more.
> >
> >
> I think this is a big assumption. Since this is a critical use case in my
> organisation, I will have to contribute everything here myself. Now I do
> understand that reviews can take time in the OSS world, but this can't be
> left at "simple token based approach and then letting anyone pick and
> explore to extend/enhance it to dual token approach". This probably is one
> of the main reasons why pluggability is important here :)

If you check many other projects, most of them use token bucket based
approaches. That's why I'm confident that it is a feasible solution.
It's always possible to be creative and make a variation that covers
the majority of Pulsar use case. Pulsar is OSS and if there's a gap in
the solution, you don't even need pluggability. :)

We are soon at the point where it's time to deliver. :) As mentioned
before, I am busy with other things in the upcoming weeks, so I'll
leave the exact schedule for my contributions open for now.
Thanks for the great conversations. It has been helpful in many ways in making
progress. Looking forward to the next step and what that could
possibly be.


-Lari

Reply via email to