Hi Girish, replies inline.
On Thu, 9 Nov 2023 at 00:29, Girish Sharma <scrapmachi...@gmail.com> wrote: > While dual-rate dual token bucket looks promising, there is still some > challenge with respect to allowing a certain peak burst for/up to a bigger > duration. I am explaining it below: > Assume a 10MBps topic. Bursting support of 1.5x upto 2 minutes, once every > 10 minute interval. It's possible to have many ways to model a dual token buckets. When there are tokens in the bucket, they are consumed as fast as possible. This is why there is a need for the second token bucket which is used to rate limit the traffic to the absolute maximum rate. Technically the second bucket rate limits the average rate for a short time window. I'd pick the first bucket for handling the 10MB rate. The capacity of the first bucket would be 15MB * 120=1800MB. The fill would happen in special way. I'm not sure if Bucket4J has this at all. So describing the way of adding tokens to the bucket: the tokens in the bucket would remain the same when the rate is <10MB. As many tokens would be added to the bucket as are consumed by the actual traffic. The left over tokens 10MB - actual rate would go to a separate filling bucket that gets poured into the actual bucket every 10 minutes. This first bucket with this separate "filling bucket" would handle the bursting up to 1800MB. The second bucket would solely enforce the 1.5x limit of 15MB rate with a small capacity bucket which enforces the average rate for a short time window. There's one nuance here. The bursting support will only allow bursting if the average rate has been lower than 10MBps for the tokens to use for the bursting to be usable. It would be possible that for example 50% of the tokens would be immediately available and 50% of the tokens are made available in the "filling bucket" that gets poured into the actual bucket every 10 minutes. Without having some way to earn the burst, I don't think that there's a reasonable way to make things usable. The 10MB limit wouldn't have an actual meaning unless that is used to "earn" the tokens to be used for the burst. One other detail of topic publishing throttling in Pulsar is that the actual throttling happens after the limit has been exceeded. This is due to the fact that Pulsar's network handling uses Netty where you cannot block. When using the token bucket concepts, the tokens are always first consumed and after that there's a chance to pause message publishing. In code, you can find this at https://github.com/apache/pulsar/blob/c0eec1e46edeb46c888fa28f27b199ea7e7a1574/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1794 and digging down from there. In the current rate limiters in Pulsar, the implementation is not optimized to how Pulsar uses rate limiting. There's no need to use a scheduler for adding "permits" as it's called in the current rate limiter. The new tokens to add can be calculated based on the elapsed time. Resuming from the blocking state (auto read disabled -> enabled) requires a scheduler. The duration of the pause should be calculated based on the rate and the average message size. Because of the nature of asynchronous rate limiting in Pulsar topic publish throttling, the tokens can go to a negative value and it also does. The calculation of the pause could also take this into count. The rate limiting will be very accurate and efficient in this way since the scheduler will only be needed when the token bucket runs out of tokens and there's really a need to throttle. This is the change that I would do to the current implementation in an experiment and see how things behave with the revisited solution. Eliminating the precise rate limiter and having just a single rate limiter would be part of this. I think that the code base has multiple usages of the rate limiter. The dispatching rate limiting might require some variation. IIRC, Rate limiting is also used for some other reasons in the code base in a blocking manner. For example, unloading of bundles is rate limited. Working on the code base will reveal this. > While the number of events in the system topic would be fairly low per > namespace, the issue is that this system topic lies on the same broker > where the actual topic/partitions exist and those partitions are leading to > degradation of this particular broker. Yes, that could be possible but perhaps unlikely. > Agreed, there is a challenge, as it's not as straightforward as I've > demonstrated in the example above. Yes, it might require some rounds of experimentation. Although in general, I think it's a fairly straightforward problem to solve as long as the requirements could be adapted to some small details that make sense for bursting in the context of rate limiting. The detail is that the long time average rate shouldn't go over the configured rate even with the bursts. That's why the tokens usable for the burst should be "earned". I'm not sure if it even is necessary to enforce that a burst can happen once in every 10 minutes. Why does it matter if the average rate limit is controlled? > There is clearly a gap with respect to understanding what the common > requirements are. I am here speaking only of my requirements. Some of these > may contradict requirements of others who haven't spoken up so far. Would > we be changing the rate limiter once again when someone else comes up with I'll just repeat that Pulsar isn't that special that it would require very special rate limiting. The basics should be sufficient like it's for most of other similar projects. > new requirements? For instance, the "exponential decay" mentioned by Rajan > is completely different from any token based approach. "exponential decay" is usually mentioned in the context of rate limiting retries. for exampl, the exponential back off. I'd like to see examples of "exponential decay" in the context of actual messaging / traffic rate limiting before we FOMO about this. > I think there is a need for a new protocol message from server to client > indicating rate-limiting (say, a 429 response, with an optional > retry-after). That should handle a lot of these things. For instance, we > can then build configurable logic in the client to stop producing all > together after a sufficient number of 429 received from the server. There > can also be a "BLOCKED" protocol that the server could indicate to the > client. Yes. I think Kafka has something like that. It's mentioned in the Kafka Quotas [1] doc. I believe that the needed explicit producer flow control could cover this. In Pulsar, we need to add explicit producer flow control to address the connection multiplexing problem. 1 - https://docs.confluent.io/kafka/design/quotas.html > From our experience, inferring these things via client-broker clock sync > assumption would not hold true. We regularly notice clock differences > between clients and brokers in our setup. The Pulsar binary protocol could have a way to calculate the approximate clock difference between the client and the broker so that end-to-end latencies could be calculated on the broker side from client side timestamps. > When you say - adding a permit-based flow control - even if this is > implemented, multiplexing is still an issue as the tcp/ip channel itself is > put on pause at the netty level. Is there any other way of rate limiting > and rejecting packets from a channel selectively so as to contain the rate > limiting effect only to the specific partition out of all the partitions > being shared in the channel? The TCP/IP channel wouldn't be put on pause in that solution as the primary way to handle flow control. Yes, there are alternatives. It's the permit-based flow control. The server would send out manageable amount of permits at a time. There are challenges in making this work efficiently with low amounts of memory. That's one of the tradeoffs. > When I was doing poller vs precise testing wrt CPU, network, broker > latencies etc, one of the reason precise was much more CPU efficient and > had minimal impact on broker latencies was due to the fact that the netty > channel was being paused precisely. Yes. I have similar experiences and this has also been reported. btw. precise rate limiter had some bugs in the past, but there were some contributions from the community that made it useable. I also contributed to that are in reviews and some minor PRs. One of the big challenges to overcome was to understand how the solution works. That's why I think that the concepts should match common reference points so that it would be maintainable. > > control for producers and revisiting the broker side backpressure/flow > > control and therefore it won't happen quickly. > > Please go ahead and create a GH issue and share your context. That > > will be very helpful. > > > > > I will do so. Does this need a PIP? To reiterate, I will be opening a GH > issue on the lines of "don't share connection to the broker across producer > objects" I think it could start with an issue that describes the problem. The PIP is eventually needed to add the new option to the producer builder and consumer builder interfaces. > Are you suggesting that we first go ahead and convert the rate limiter in > pulsar to a simple, single-token based approach? > I personally do not see any benefit in this apart from code refactoring. > The precise rate limiter is basically doing that already- all be it > refilling only every 1 second, rather than distributing the tokens across > that second. I have described the benefits in some former comments. The code is not maintainable at the moment other than by the people that have worked with the code. Another point is getting rid of the separate "precise" option. That leaks implementation details all the way to Pulsar users that don't need to know what a "precise rate limiter" is. The default Pulsar core rate limiter should do what it promises to do. That would be the starting point to match the existing feature set. After that it's possible to make it the dual token bucket internally so that it's possible to set a separate max rate since when the bucket is larger, the bursts don't have a limit in a single token bucket (where the bucket has a large capacity, therefore allows bursts when traffic temporarily drops below configured rate). Yes, the precise rate limiter is something like having a bucket capacity for 1 second and that's why it doesn't allow bursts. > > algorithm is. That could be a starting point until we start covering > > the advanced cases which require a dual token bucket algorithm. > > If someone has the bandwidth, it would be fine to start experimenting > > in this area with code to learn more. > > > > > I think this is a big assumption. Since this is a critical use case in my > organisation, I will have to contribute everything here myself. Now I do > understand that reviews can take time in the OSS world, but this can't be > left at "simple token based approach and then letting anyone pick and > explore to extend/enhance it to dual token approach". This probably is one > of the main reasons why pluggability is important here :) If you check many other projects, most of them use token bucket based approaches. That's why I'm confident that it is a feasible solution. It's always possible to be creative and make a variation that covers the majority of Pulsar use case. Pulsar is OSS and if there's a gap in the solution, you don't even need pluggability. :) We are soon at the point where it's time to deliver. :) As mentioned before, I am busy with other things in the upcoming weeks, so I'll leave the exact schedule for my contributions open for now. Thanks for the great conversations. It has been helpful in many ways in making progress. Looking forward to the next step and what that could possibly be. -Lari