Hello Lari, replies inline On Thu, Nov 9, 2023 at 6:50 AM Lari Hotari <lhot...@apache.org> wrote:
> Hi Girish, > > replies inline. > > On Thu, 9 Nov 2023 at 00:29, Girish Sharma <scrapmachi...@gmail.com> > wrote: > > While dual-rate dual token bucket looks promising, there is still some > > challenge with respect to allowing a certain peak burst for/up to a > bigger > > duration. I am explaining it below: > > > Assume a 10MBps topic. Bursting support of 1.5x upto 2 minutes, once > every > > 10 minute interval. > > It's possible to have many ways to model a dual token buckets. > When there are tokens in the bucket, they are consumed as fast as > possible. This is why there is a need for the second token bucket > which is used to rate limit the traffic to the absolute maximum rate. > Technically the second bucket rate limits the average rate for a short > time window. > > I'd pick the first bucket for handling the 10MB rate. > The capacity of the first bucket would be 15MB * 120=1800MB. The fill > would happen in special way. I'm not sure if Bucket4J has this at all. > So describing the way of adding tokens to the bucket: the tokens in > the bucket would remain the same when the rate is <10MB. As many > How is this special behavior (tokens in bucket remaining the same when rate is <10MB) achieved? I would assume that to even figure out that the rate is less than 10MB, there is some counter going around? > tokens would be added to the bucket as are consumed by the actual > traffic. The left over tokens 10MB - actual rate would go to a > separate filling bucket that gets poured into the actual bucket every > 10 minutes. > This first bucket with this separate "filling bucket" would handle the > bursting up to 1800MB. > But this isn't the requirement? Let's assume that the actual traffic has been 5MB for a while and this 1800MB capacity bucket is all filled up now.. What's the real use here for that at all? > The second bucket would solely enforce the 1.5x limit of 15MB rate > with a small capacity bucket which enforces the average rate for a > short time window. > There's one nuance here. The bursting support will only allow bursting > if the average rate has been lower than 10MBps for the tokens to use > for the bursting to be usable. > It would be possible that for example 50% of the tokens would be > immediately available and 50% of the tokens are made available in the > "filling bucket" that gets poured into the actual bucket every 10 > minutes. Without having some way to earn the burst, I don't think that > there's a reasonable way to make things usable. The 10MB limit > wouldn't have an actual meaning unless that is used to "earn" the > tokens to be used for the burst. > > I think this approach of thinking about rate limiter - "earning the right to burst by letting tokens remain into the bucket, (by doing lower than 10MB for a while)" doesn't not fit well in a messaging use case in real world, or theoretic. For a 10MB topic, if the actual produce has been , say, 5MB for a long while, this shouldn't give the right to that topic to burst to 15MB for as much as tokens are present.. This is purely due to the fact that this will then start stressing the network and bookie disks. Imagine a 100 of such topics going around with similar configuration of fixed+burst limits and were doing way lower than the fixed rate for the past couple of hours. Now that they've earned enough tokens, if they all start bursting, this will bring down the system, which is probably not capable of supporting simultaneous peaks of all possible topics at all. Now of course we can utilize a broker level fixed rate limiter to not allow the overall throughput of the system to go beyond a number, but at that point - all the earning semantic goes for a toss anyway since the behavior would be unknown wrt which topics are now going through with bursting and which are being blocked due to the broker level fixed rate limiting. As such, letting topics loose would not sit well with any sort of SLA guarantees to the end user. Moreover, contrary to the earning tokens logic, in reality a topic _should_ be allowed to burst upto the SOP/SLA as soon as produce starts in the topic. It shouldn't _have_ to wait for tokens to fill up as it does below-fixed-rate for a while before it is allowed to burst. This is because there is no real benefit or reason to not let the topic do such as the hardware is already present and the topic is already provisioned (partitions, broker spread) accordingly, assuming the burst. In an algorithmic/academic/literature setting, token bucket sounds really promising.. but a platform with SLA to users would not run like that. > In the current rate limiters in Pulsar, the implementation is not > optimized to how Pulsar uses rate limiting. There's no need to use a > scheduler for adding "permits" as it's called in the current rate > limiter. The new tokens to add can be calculated based on the elapsed > time. Resuming from the blocking state (auto read disabled -> enabled) > requires a scheduler. The duration of the pause should be calculated > The precise one already does this. There is no scheduler for adding the tokens, only scheduler to enable back the auto-read on netty channel. It looks like your inclination is to simply remove the default "poller" implementation and just let precise one in the code base? > based on the rate and the average message size. Because of the nature > of asynchronous rate limiting in Pulsar topic publish throttling, the > tokens can go to a negative value and it also does. The calculation of > the pause could also take this into count. The rate limiting will be > very accurate and efficient in this way since the scheduler will only > This part is a bit unclear to me. When are you suggesting to enable back the auto-read on the channel? Ideally, it should open back up the next second. Time to the next second could vary from 1ms to 999ms.. I do not think that starting an on demand scheduler of delta ms would be accurate enough to achieve this.. A newly created scheduled task is anyways not this accurate.. With Java virtual threads coming up, I believe having one, or N, constantly running scheduled tasks of 1 second interval would not be heavy at all. In fact, even in Java 8, as per my benchmarks, it doesn't seem to take up much frames at all. be needed when the token bucket runs out of tokens and there's really > a need to throttle. This is the change that I would do to the current > implementation in an experiment and see how things behave with the > revisited solution. Eliminating the precise rate limiter and having > just a single rate limiter would be part of this. > I think that the code base has multiple usages of the rate limiter. > The dispatching rate limiting might require some variation. IIRC, Rate > limiting is also used for some other reasons in the code base in a > blocking manner. For example, unloading of bundles is rate limited. > Working on the code base will reveal this. > > Yes, I believe we should only touch publish paths right now. Yes, it might require some rounds of experimentation. Although in > general, I think it's a fairly straightforward problem to solve as > long as the requirements could be adapted to some small details that > make sense for bursting in the context of rate limiting. The detail is > that the long time average rate shouldn't go over the configured rate > even with the bursts. That's why the tokens usable for the burst > should be "earned". I'm not sure if it even is necessary to enforce > that a burst can happen once in every 10 minutes. Why does it matter > if the average rate limit is controlled? > "10 minutes" was just an example. Part of the problem is explained above.. If a topic is allowed to just burst forever (irrespective of the fact that it has earned the tokens or not) - it will start competing with other topics that are also bursting. This is a problem from a capacity planning point of view. Generally this is how capacity planning works 1. We will first do redline testing to determine how much MBps a set of single broker and an ensemble of bookies can achieve given a hardware configuration of broker and bookies. 2. Assuming linear horizontal scaling, we then setup enough brokers and bookies in order to sustain a required throughput in the cluster. Suppose, for this example, the cluster can now do 300MBps Now, assuming 50% bursting is allowed in a topic - does that mean the sum of fixed rate of all topics in the cluster should not exceed 200MBps? - If yes - then there is literally no difference in this situation vs just having a 50% elevated fixed rate of all the topic with 0 bursting support. To actually take advantage of this bursting feature, the capacity reserved to handle topic bursts would not be equal to the allowed bursting, but much less. For example, in this cluster that can do 300MBps, we can let the sum of fixed rate of all topics go up to 270MBps and keep just 10% of the hardware (30MBps) for bursting needs. This way, we are able to sustain more topics in the same cluster, with support of burst. This would only work if all topics do not burst together at the same time - and that's where the need to restrict duration and frequency of burst comes into play. Ofcourse, there can be other ways to model this problem and requirement.. and additional checks in place, but I believe this approach is the most deterministic approach with a good balance between efficient use of hardware and not being completely stringent wrt throughput on the topics. > > I think there is a need for a new protocol message from server to client > > indicating rate-limiting (say, a 429 response, with an optional > > retry-after). That should handle a lot of these things. For instance, we > > can then build configurable logic in the client to stop producing all > > together after a sufficient number of 429 received from the server. There > > can also be a "BLOCKED" protocol that the server could indicate to the > > client. > > Yes. I think Kafka has something like that. It's mentioned in the > Kafka Quotas [1] doc. > I believe that the needed explicit producer flow control could cover > this. In Pulsar, we need to add explicit producer flow control to > address the connection multiplexing problem. > > 1 - https://docs.confluent.io/kafka/design/quotas.html Yup, the way kafka sends back a delay response to the clients. > When you say - adding a permit-based flow control - even if this is > > implemented, multiplexing is still an issue as the tcp/ip channel itself > is > > put on pause at the netty level. Is there any other way of rate limiting > > and rejecting packets from a channel selectively so as to contain the > rate > > limiting effect only to the specific partition out of all the partitions > > being shared in the channel? > > The TCP/IP channel wouldn't be put on pause in that solution as the > primary way to handle flow control. > Yes, there are alternatives. It's the permit-based flow control. The > server would send out manageable amount of permits at a time. There > are challenges in making this work efficiently with low amounts of > memory. That's one of the tradeoffs. > Moreover, isn't this based on the assumption that the clients respect the tokens and actually behave in a good manner? If there is an implementation of a client that is not respecting the tokens (or, in this case, the lack of remaining tokens with the client), (very much possible as it can be an older java client itself) - then wouldn't the only way to actually throttle the client is to pause the TCP/IP channel? > > I will do so. Does this need a PIP? To reiterate, I will be opening a GH > > issue on the lines of "don't share connection to the broker across > producer > > objects" > > I think it could start with an issue that describes the problem. The > PIP is eventually needed to add the new option to the producer builder > and consumer builder interfaces. > > I will keep a note of this and take this up once I am free from this current PIP, whichever direction it takes. > > > Are you suggesting that we first go ahead and convert the rate limiter in > > pulsar to a simple, single-token based approach? > > I personally do not see any benefit in this apart from code refactoring. > > The precise rate limiter is basically doing that already- all be it > > refilling only every 1 second, rather than distributing the tokens across > > that second. > > I have described the benefits in some former comments. The code is not > maintainable at the moment other than by the people that have worked > with the code. Another point is getting rid of the separate "precise" > option. That leaks implementation details all the way to Pulsar users > that don't need to know what a "precise rate limiter" is. The default > Pulsar core rate limiter should do what it promises to do. That would > Is this even achievable? i.e. to completely remove the poller one and take away the knowledge about the precise one from users? This basically would then be a breaking change and would probably wait for pulsar 4.0.0 release? > be the starting point to match the existing feature set. After that > it's possible to make it the dual token bucket internally so that it's > possible to set a separate max rate since when the bucket is larger, > the bursts don't have a limit in a single token bucket (where the > bucket has a large capacity, therefore allows bursts when traffic > temporarily drops below configured rate). Yes, the precise rate > limiter is something like having a bucket capacity for 1 second and > that's why it doesn't allow bursts. > I believe here I (my organization) would have a difference in opinion due to the difference in goal and priority. I guess it's kind of a clash between your vision of the rate limiter in pulsar vs our goal right now and I am not sure how to proceed at this point. What I can promise is that I (my org) will take this to completion with respect to making the internal rate limiter supporting our use cases, but the way you envision it might not sit well with what we want and when we want it. > > > > algorithm is. That could be a starting point until we start covering > > > the advanced cases which require a dual token bucket algorithm. > > > If someone has the bandwidth, it would be fine to start experimenting > > > in this area with code to learn more. > > > > > > > > I think this is a big assumption. Since this is a critical use case in my > > organisation, I will have to contribute everything here myself. Now I do > > understand that reviews can take time in the OSS world, but this can't be > > left at "simple token based approach and then letting anyone pick and > > explore to extend/enhance it to dual token approach". This probably is > one > > of the main reasons why pluggability is important here :) > > If you check many other projects, most of them use token bucket based > approaches. That's why I'm confident that it is a feasible solution. > I know and almost all OSS projects around the messaging world have very very basic rate limiters. That is exactly why I am originally requesting a pluggable rate limiter to support our use case as that's not a common thing to have (the level of complexity) in the OSS project itself. Although, I also acknowledge the need of a lot of refactoring and changes in the based design as we have been discussing here. Generally, this is how I see it should be at the end in the OSS pulsar version - The code has been refactored, there are no implementation details leaked to the user, the rate limiter is efficient and based on a single token bucket based model without any bursting. But, it also provides pluggable rate limiter - and that is where individual complex use cases can be implemented and maintained internally by the users/organizations. Of course, they can choose to open source them, but I don't think pulling it back into Pulsar is the right approach with respect to future contributions and ease of maintaining the module. -- Girish Sharma