Hello Lari, replies inline. It's festive season here so I might be late in
the next reply.


On Fri, Nov 10, 2023 at 4:51 PM Lari Hotari <lhot...@apache.org> wrote:

> Hi Girish,
>
> >
> >
> > > tokens would be added to the bucket as are consumed by the actual
> > > traffic. The left over tokens 10MB - actual rate would go to a
> > > separate filling bucket that gets poured into the actual bucket every
> > > 10 minutes.
> > > This first bucket with this separate "filling bucket" would handle the
> > > bursting up to 1800MB.
> > >
> >
> > But this isn't the requirement? Let's assume that the actual traffic has
> > been 5MB for a while and this 1800MB capacity bucket is all filled up
> now..
> > What's the real use here for that at all?
>
> How would the rate limiter know if 5MB traffic is degraded traffic
> which would need to be allowed to burst?
>

That's not what I was implying. I was trying to question the need for
1800MB worth of capacity. I am assuming this is to allow a 2 minute burst
of 15MBps? But isn't this bucket only taking are of the delta beyond 10MBps?
Moreover, once 2 minutes are elapsed, which bucket is ensuring that the
rate is only allowed to go upto 10MBps?


> > I think this approach of thinking about rate limiter - "earning the right
> > to burst by letting tokens remain into the bucket, (by doing lower than
> > 10MB for a while)" doesn't not fit well in a messaging use case in real
> > world, or theoretic.
>
> It might feel like it doesn't fit well, but this is how most rate
> limiters work. The model works very well in practice.
> Without "earning the right to burst", there would have to be some
> other way to detect whether there's a need to burst.
>

Why does the rate limiter need to decide if there is a need to burst? It is
dependent on the incoming message rate. Since the rate limiter has no
knowledge of what is going to happen in future, it cannot assume that the
messages beyond a fixed rate (10MB in our example) can be held/paused until
next second - thus deciding that there is no need to burst right now.

While I understand that earning the right to burst model works well in
practice, another approach to think about this is that the bucket,
initially, starts filled. Moreover, the tokens here are being filled into
the bucket due to the available disk, cpu and network bandwidth.. The
general approach of where the tokens are initially required to be earned
might be helpful to tackle cold starts, but beyond that, a topic doing
5MBps to accumulate enough tokens to burst for a few minutes in the future
doesn't really translate to the physical world, does it? The 1:1
translation here is to an SSD where the topic's data is actually being
written to - So my bursting upto 15MBps doesn't really depend on the fact
that I was only doing 5 MBps in the last few minutes (and thus,
accumulating the remaining 5MBps worth tokens towards the burst) - now does
it? The SSD won't really gain the ability to allow for burst just because
there was low throughput in the last few minutes. Not even from a space POV
either.



> The need to burst could be detected by calculating the time that the
> message to be sent has been in queues until it's about to be sent.
> In other words, from the end-to-end latency.
>

In practice this level of cross component coordination would never result
in a responsive and spontaneous system.. unless each message comes along
with a "I waited in queue for this long" time and on the server side, we
now read the netty channel, parse the message and figure this out before
checking for rate limiting.. At which point, rate limiting isn' treally
doing anything if the broker is reading every message anyway. This may also
require prioritization of messages as some messages from a different
producer to the same partition may have waited longer than others.. This is
out of scope of a rate limiter at this point.

> Imagine a 100 of such topics going around with similar configuration of
> > fixed+burst limits and were doing way lower than the fixed rate for the
> > past couple of hours. Now that they've earned enough tokens, if they all
> > start bursting, this will bring down the system, which is probably not
> > capable of supporting simultaneous peaks of all possible topics at all.
>
> This is the challenge of capacity management and end-to-end flow
> control and backpressure.
>

Rate limiter is a major facilitator and guardian of capacity planning. So,
it does relate here.


> With proper system wide capacity management and end-to-end back
> pressure, the system won't collapse.
>

At no point I am trying to go beyond the purview of a single broker here.
Unless, by system, you meant a single broker itself. For which, I talk
about a broker level rate limiter further below in the example.


> In Pulsar, we have "PIP 82: Tenant and namespace level rate limiting"
> [4] which introduced the "resource group" concept. There is the
> resourcegroup resource in the Admin REST API [5]. There's also a
> resource called "resource-quota" in the Admin REST API [6], but that
> seems to be abandoned.
>
> I am well aware of resource groups. I personally do not want to go into
discussions about resource groups here as I am only talking about rate
limiting at a partition level, which does not need intra broker
communication or resource group feature for it to work fully.

Just as a side note, in practice, resource group level rate limiting is
very very imprecise due to the inherent nature that it has to sync broker
level data between multiple brokers are a frequency.. thus the data is
always stale and the produce goes way beyond the set limit, even when
resource groups use the precise rate limiter today.


> > Now of course we can utilize a broker level fixed rate limiter to not
> allow
> > the overall throughput of the system to go beyond a number, but at that
> > point - all the earning semantic goes for a toss anyway since the
> behavior
> > would be unknown wrt which topics are now going through with bursting and
> > which are being blocked due to the broker level fixed rate limiting.
> >
> > As such, letting topics loose would not sit well with any sort of SLA
> > guarantees to the end user.
> >
> > Moreover, contrary to the earning tokens logic, in reality a topic
> _should_
> > be allowed to burst upto the SOP/SLA as soon as produce starts in the
> > topic. It shouldn't _have_ to wait for tokens to fill up as it does
> > below-fixed-rate for a while before it is allowed to burst. This is
> because
> > there is no real benefit or reason to not let the topic do such as the
> > hardware is already present and the topic is already provisioned
> > (partitions, broker spread) accordingly, assuming the burst.
> >
> > In an algorithmic/academic/literature setting, token bucket sounds really
> > promising.. but a platform with SLA to users would not run like that.
>
> AWS DynamoDB uses token bucket based algorithms. Please check the
> video I referred in one of the previous comments.
> at 10:26 [1] "The simplest way to do this workload isolation is the
> token bucket"
> at 13:05 [2] "partitions will get throttled because they're constantly
> running out of tokens from their bucket even though they had burst
> capacity because after the burst burst capacity is over you had to
> still take time to fill capacity in the token bucket while other
> partitions actually had capacity in their token buckets so..."
>

In the very next sentence it also says "so a key takeaway from this was
though bursting was great it solved some of the customers problems um we
had it wasn't sufficient enough and we had tightly coupled how partition
level capacity to admission control right so we realized that removing the
admission control from partition from partition level would be the most
beneficial thing for customers and for us as well and letting the partition
burst always so we implemented something called as a global admission
control where we moved our mission control up"
Looks like the fact that the partition wasn't able to work due to lack of
tokens was a problem for them? I admit I haven't gone through the full
video yet.


> at 13:53 [3] "GAC (Global Admission Control) is a service that builds
> on the same token bucket mechanism so when a request router receives a
> request it basically requests the GAC to kind of get some tokens and
> once request router uses the local tokens to make admission control
> decisions if it runs out of tokens it can request for more tokens"
>
> 1 - https://www.youtube.com/watch?v=9AkgiEJ_dA4&t=10m26s
> 2 - https://www.youtube.com/watch?v=9AkgiEJ_dA4&t=13m05s
> 3 - https://www.youtube.com/watch?v=9AkgiEJ_dA4&t=13m54s
>
> Token bucket based solutions are widely used in the industry. I don't
> see a reason why we should be choosing some other conceptual model as
> the basis.
>

Not another conceptual model, but we should be using the token bucket in a
way that works for real world situations..


> >
> >
> >
> > > In the current rate limiters in Pulsar, the implementation is not
> > > optimized to how Pulsar uses rate limiting. There's no need to use a
> > > scheduler for adding "permits" as it's called in the current rate
> > > limiter. The new tokens to add can be calculated based on the elapsed
> > > time. Resuming from the blocking state (auto read disabled -> enabled)
> > > requires a scheduler. The duration of the pause should be calculated
> > >
> >
> > The precise one already does this. There is no scheduler for adding the
> > tokens, only scheduler to enable back the auto-read on netty channel.
>
> Please check the source code again [1]. The scheduler is used to do
> something that is equivalent of "adding the tokens".
> The "createTask" method schedules the task to call the "renew" method.
>
> 1 -
> https://github.com/apache/pulsar/blob/3c067ce28025e116146977118312a1471ba284f5/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java#L260-L278
>
> > It looks like your inclination is to simply remove the default "poller"
> > implementation and just let precise one in the code base?
>
> No it's not. It's about replacing the solution with an implementation
> that is using the conceptual model of the token bucket. This will make
> the implementation more maintainable and we will be able to clean up
> the abstractions. Concepts and abstractions are the key of interface
> design.
> In the current state, the rate limiter is very hard to reason about.
> There's no reason to keep it in the code base. I'll explain more about
> that later.
> Besides the maintainability, another clear problem in the current rate
> limiter is the heavy usage of synchronization and the already
> mentioned unnecessary use of the scheduler.
>

I agree with you here. That's why I mentioned the ideal state of rate
limiter in pulsar OSS version. I see that you did not comment on that in
your reply though..


>
> > This part is a bit unclear to me. When are you suggesting to enable back
> > the auto-read on the channel? Ideally, it should open back up the next
> > second. Time to the next second could vary from 1ms to 999ms.. I do not
> > think that starting an on demand scheduler of delta ms would be accurate
> > enough to achieve this.. A newly created scheduled task is anyways not
> this
> > accurate.. With Java virtual threads coming up, I believe having one, or
> N,
> > constantly running scheduled tasks of 1 second interval would not be
> heavy
> > at all.
> > In fact, even in Java 8, as per my benchmarks, it doesn't seem to take up
> > much frames at all.
>
> Optimally the pause would be such a duration that there's enough
> tokens to send the next message.
> One possible way to estimate the size of the next message is to use
> the average size or a rolling average size as the basis of the
> calculation.
> It might be as short as the minimum time to pause for the timer.
> Instead of using a generic scheduler for pausing, Netty's scheduler
> should be used.
> It is very efficient for high volumes. If there's a bottleneck, it's
> possible to have some minimum duration to pause.
>

I still don't see any advantage here. What are we trying to save here?
Honestly, any kind of guessing inside the code will lead to weird anomalies
when running in a big system with various different usecases.


>
> > "10 minutes" was just an example. Part of the problem is explained
> above..
> > If a topic is allowed to just burst forever (irrespective of the fact
> that
> > it has earned the tokens or not) - it will start competing with other
> > topics that are also bursting. This is a problem from a capacity planning
> > point of view. Generally this is how capacity planning works
> >
> >    1. We will first do redline testing to determine how much MBps a set
> of
> >    single broker and an ensemble of bookies can achieve given a hardware
> >    configuration of broker and bookies.
> >    2. Assuming linear horizontal scaling, we then setup enough brokers
> and
> >    bookies in order to sustain a required throughput in the cluster.
> Suppose,
> >    for this example, the cluster can now do 300MBps
> >
> > Now, assuming 50% bursting is allowed in a topic - does that mean the sum
> > of fixed rate of all topics in the cluster should not exceed 200MBps? -
> If
> > yes - then there is literally no difference in this situation vs just
> > having a 50% elevated fixed rate of all the topic with 0 bursting
> support.
> > To actually take advantage of this bursting feature, the capacity
> reserved
> > to handle topic bursts would not be equal to the allowed bursting, but
> much
> > less. For example, in this cluster that can do 300MBps, we can let the
> sum
> > of fixed rate of all topics go up to 270MBps and keep just 10% of the
> > hardware (30MBps) for bursting needs. This way, we are able to sustain
> more
> > topics in the same cluster, with support of burst. This would only work
> if
> > all topics do not burst together at the same time - and that's where the
> > need to restrict duration and frequency of burst comes into play.
> >
> > Ofcourse, there can be other ways to model this problem and requirement..
> > and additional checks in place, but I believe this approach is the most
> > deterministic approach with a good balance between efficient use of
> > hardware and not being completely stringent wrt throughput on the topics.
>
> I commented on these capacity management aspects in one of my previous
> emails as well as in an earlier comment in this email.
> Based on what you are describing as the scenario, I think you are
> looking for system wide capacity management.
> The Pulsar Resource Groups (PIP-82) [1] already has solutions in this
> area and could be developed further.
> AWS DynamoDb's Global Admission Control is a good example from the
> industry how this problem was solve there [2].
> It might be worth improving and updating the PIP-82 design based on
> the learnings from Confluent Kora paper and AWS DynamoDB Admission
> control paper and presentation.
> PIP-82 focuses a lot on rate limiting. However, there's also a need
> for something what Confluent Kora paper explains in "5.2.2 Dynamic
> Quota Management" with  "The tenant quotas are auto-tuned
> proportionally to their total quota allocation on the broker. This
> mechanism ensures fair sharing of resources among tenants during
> temporary overload and re-uses the quota enforcement mechanism for
> backpressure."
>
> 1 -
> https://github.com/apache/pulsar/wiki/PIP-82:-Tenant-and-namespace-level-rate-limiting
> 2 - https://www.youtube.com/watch?v=9AkgiEJ_dA4&t=13m54s
>
>
I did not want to confuse you here by bringing in more than one broker into
picture. In any case, resource groups are not a solution here. What you are
suggesting is that we put a check on a namespace or tenant by putting in a
resource group level rate limiting - all that does is restrict the scope of
contention of capacity within that tenant or namespace. Moreover, resource
group level rate limiting is not precise. In another internal discussion
about finalizing the requirements from our side, we found that the example
i provided where multiple topics are bursting at the same time and are
contending for the capacity (globally or within a resource group), there
will be situation where the contention is now leading to throttling of a
topic which is not even trying to burst - since the broker level/resource
group level rate limiter will just block any and all produce requests after
the permits have been exhausted.. the bursting topics could have very well
exhausted those in the first 800ms of a second while a non bursting topic
actually should have had the right to produce in the remaining 200ms of a
second.

That is the one of the reasons for putting a strict check on the amount,
duration and frequency of bursting.


> It's not a breaking change to replace the existing rate limiters.
> The rate limiter implementation is an internal implementation detail.
> It would become a breaking change if the feature "contract" would be
> changed drastically.
> In this case, it's possible to make the "precise" option "precise" and
> perhaps simulate the
> way how the default rate limiter works.
> I think that besides CPU consumption, the problem with the default
> rate limiter is simply that it
> contains a bug. The bug is in the tryAcquire method [1]. The default
> handling should simply match what there is for
> isDispatchOrPrecisePublishRateLimiter.
> After this, the only difference between precise and default is in the
> renew method where there's this calculation:
>

That's not the only difference, even ignoring the bug.. the default one has
a scheduler running at a lower than 1 second interval. It's only in this
scheduled method that it checks if the limit has breached and blocks the
auto-read.. So if a produce is able to produce whatever they like before
the first trigger of this scheduler, then there is practically no rate
limiting. Now, I don't know if we can call this as contact.. its more of an
implicit one .. or rather, if there is any rate limiting contract in
documentation, this default one certainly breaks that.


> > I know and almost all OSS projects around the messaging world have very
> > very basic rate limiters. That is exactly why I am originally requesting
> a
> > pluggable rate limiter to support our use case as that's not a common
> thing
> > to have (the level of complexity) in the OSS project itself. Although, I
> > also acknowledge the need of a lot of refactoring and changes in the
> based
> > design as we have been discussing here.
>
> I appreciate your efforts on initiating PIP-310 and getting the ball
> rolling. Since you are operating Pulsar at scale, your contributions
> and feedback are very valuable in improving Pulsar's capacity manage.
> I happen to have a different view of how a custom rate limiter
> implemented with the possible pluggable interface could help with
> overall capacity management in Pulsar.
> We need to go beyond PIP-310 in solving multi-tenant capacity
> management/SOP/SLA challenges with Pulsar. The resource groups work
> started with PIP-81 is a good start point, but there's a need to
> improve and revisit the design to be able to meet the competition, the
> closed source Confluent Kora.
>

As mentioned in the comment above, resource groups shouldn't be discussed
here.. they are out of scope of the discussion involving partition level
rate limiting and I did not intend to bring them into the discussion.
I would like to see a comment on my proposal about the state of pulsar rate
limiter. I believe that's true "meeting in the middle".


>
> Thanks for providing such detailed and useful feedback! I think that
> this has already been a valuable interaction.
>

Thank you for painstakingly replying to these long emails.. This probably
is the longest thread in pulsar ML in recent times :)


>
> The improvements happen one step at a time. We can make things happen
> when we work together. I'm looking forward to that!
>

Would love to hear a plan from your point of view and what you envision.

>
> -Lari
>


-- 
Girish Sharma

Reply via email to