Hi Girish, replies inline.
> Hello Lari, replies inline. It's festive season here so I might be late in > the next reply. I'll have limited availability next week so possibly not replying until the following week. We have the next Pulsar community meeting on November 23rd, so let's wrap up all of this preparation by then. > > How would the rate limiter know if 5MB traffic is degraded traffic > > which would need to be allowed to burst? > > > > That's not what I was implying. I was trying to question the need for > 1800MB worth of capacity. I am assuming this is to allow a 2 minute burst > of 15MBps? But isn't this bucket only taking are of the delta beyond 10MBps? > Moreover, once 2 minutes are elapsed, which bucket is ensuring that the > rate is only allowed to go upto 10MBps? I guess there are multiple ways to model things. I was thinking of a model where the first bucket (and the "filling bucket" to add "earned" tokens with some interval) handles all logic related to the average rate limiting of 10MBps, including the bursting capacity which a token bucket inherently has. As we know, the token bucket doesn't have a rate limit when there are available tokens in the bucket. That's the reason why there's the separate independent bucket with a relatively short token capacity to enforce the maximum rate limit of 15MBps. There needs to be tokens in both bucket for traffic to flow freely. It's like an AND and not an OR (in literature there are also examples of this where tokens are taken from another bucket when the main one runs out). > > Without "earning the right to burst", there would have to be some > > other way to detect whether there's a need to burst. > > Why does the rate limiter need to decide if there is a need to burst? It is > dependent on the incoming message rate. Since the rate limiter has no > knowledge of what is going to happen in future, it cannot assume that the > messages beyond a fixed rate (10MB in our example) can be held/paused until > next second - thus deciding that there is no need to burst right now. To explain this, I'll first take a quote from your email about the bursting use cases, sent on Nov 7th: > Adding to what I wrote above, think of this pattern like the following: the > produce rate slowly increases from ~2MBps at around 4 AM to a known peak of > about 30MBps by 4 PM and > stays around that peak until 9 PM after which is again starts decreasing > until it reaches ~2MBps around 2 AM. > Now, due to some external triggers, maybe a scheduled sale event, at 10PM, > the quota may spike up to 40MBps for 4-5 minutes and then again go back down > to the usual ~20MBps . Let's observe these cases. (When we get further, it will be helpful to have a set of well defined representative concrete use case / scenario descriptions with concrete examples of expected behavior. Failure / chaos scenarios and operations (such as software upgrade) scenarios should also be covered.) > Why does the rate limiter need to decide if there is a need to burst? It is > dependent on the incoming message rate. Since the rate limiter has no This is a very good question. When we look at these examples, it feels that we'd need to define what the actual rules for the bursting are and how bursting is decided. It seems that there are two type of ways: "earning the right to burst" with the token bucket so that bursting could happen whenever there are extra tokens in the bucket; or having a way to detect that there's a need to burst. That relates directly to SLAs. If there's an SLA where message processing end-to-end latencies must be low, that usually would be violated if large backlogs pile up. I think you said something about avoiding getting into this at all. I agree that this would add complexity, but for the sake of explaining the possible cases for the need for bursting, I think that it's necessary to cover the end-to-end aspects. > While I understand that earning the right to burst model works well in > practice, another approach to think about this is that the bucket, > initially, starts filled. Moreover, the tokens here are being filled into > the bucket due to the available disk, cpu and network bandwidth.. The I completely agree. If it's fine to rely on the token bucket for bursting, this simplifies a lot of things. As you are mentioning, there could be multiple ways to fill/add tokens to the bucket. > general approach of where the tokens are initially required to be earned > might be helpful to tackle cold starts, but beyond that, a topic doing > 5MBps to accumulate enough tokens to burst for a few minutes in the future > doesn't really translate to the physical world, does it? The 1:1 > translation here is to an SSD where the topic's data is actually being > written to - So my bursting upto 15MBps doesn't really depend on the fact > that I was only doing 5 MBps in the last few minutes (and thus, > accumulating the remaining 5MBps worth tokens towards the burst) - now does > it? The SSD won't really gain the ability to allow for burst just because > there was low throughput in the last few minutes. Not even from a space POV > either. This example of the SSD isn't concrete in terms of Pulsar and Bookkeeper. The capacity of a single SSD should and must be significantly higher than the maximum write throughput of a single topic. Therefore a single SSD shouldn't be a bottleneck. There will always be a need to overprovision resources. You usually don't want to go beyond 60% or 70% utilization on disk, cpu or network resources so that queues in the system don't start to increase and impacting latencies. In Pulsar/Bookkeeper, the storage solution has a very effective load balancing, especially for writing. In Bookkeeper each ledger (the segment) of a topic selects the "ensemble" and the "write quorum", the set of bookies to write to, when the ledger is opened. The bookkeeper client could also change the ensemble in the middle of a ledger due to some event like a bookie becoming read-only or unavailable. This effectively load balances the writes across all bookies. > In practice this level of cross component coordination would never result > in a responsive and spontaneous system.. unless each message comes along > with a "I waited in queue for this long" time and on the server side, we Yes, the total end-to-end waiting time could be calculated from the timestamp information when there's a way to estimate the clock diff between the client and broker clocks. Since we are dealing with queues, queuing theory could be applied to estimate queue sizes. > now read the netty channel, parse the message and figure this out before > checking for rate limiting.. At which point, rate limiting isn' treally > doing anything if the broker is reading every message anyway. This may also This is how it works currently: the message is read, the rate limiter is updated and the message is sent regardless of the result of the rate limiter "tryAcquire" call. In asynchronous flows this makes sense. The problem is more the naming of methods which makes it hard to reason about. > require prioritization of messages as some messages from a different > producer to the same partition may have waited longer than others.. This is > out of scope of a rate limiter at this point. This is a good observation. Yes, I agree that this is not part of the scope at this point. > Rate limiter is a major facilitator and guardian of capacity planning. So, > it does relate here. +1 > At no point I am trying to go beyond the purview of a single broker here. > Unless, by system, you meant a single broker itself. For which, I talk > about a broker level rate limiter further below in the example. My point was that there's a need to take a holistic approach of capacity management. The Pulsar system is a distributed system and the storage system is separate, unlike storage on Kafka. There isn't an equivalent of the Pulsar load balancing in Kafka. In the Kafka world, you can calculate the capacity of a single broker. In Pulsar, the model is very different. If a broker is over loaded, the Pulsar load balancer can quickly shed load from that broker. I feel that focusing on a single broker and protecting the resources on it won't be helpful in the end. It's not to say that things cannot be improved from a single broker perspective. One essential role of rate limiters in capacity manage is to prevent a single resource of becoming overloaded. The Pulsar load balancer / load manager plays a key role in Pulsar. The other important role of rate limiters is about handling end application expectation of the service level. I'm not sure if someone remembers the example from the Google SRE book about a too good service level which caused a lot of problems and outages when there finally was a service level degradation. The problem was that application got coupled to the behavior that there aren't any disruptions in the service. IIRC, the solution was to inject failures to production periodically to make the service consumers get used to service disruptions and design their application to have the required resilience. I think it was Netflix that introduced the chaos monkey and also ran it in production to ensure that service consumers are sufficiently resilient to service disruptions. In the context of Pulsar, it is possible that messaging applications get coupled and depend to a service level that isn't well defined. >From this perspective, the key role of rate limiters is to ensure that the service provider and service consumer have a way to express the service level objective (SLO) in the system. If the rate isn't capped, there's the risk that service consumers get depended on very high rates which might only be achievable in a highly overprovisioned system. I wonder if you would be interested in sharing your requirements or context around SLOs and how you see the relationship to rate limiters? > Just as a side note, in practice, resource group level rate limiting is > very very imprecise due to the inherent nature that it has to sync broker > level data between multiple brokers are a frequency.. thus the data is > always stale and the produce goes way beyond the set limit, even when > resource groups use the precise rate limiter today. I think that it's a good starting point for further improvements based on feedback. It's currently in early stages. Based on what you have shared about your use cases for rate limiting, I'd assume that you would go beyond a single broker in the future. > In the very next sentence it also says "so a key takeaway from this was > though bursting was great it solved some of the customers problems um we > had it wasn't sufficient enough and we had tightly coupled how partition > level capacity to admission control right so we realized that removing the > admission control from partition from partition level would be the most > beneficial thing for customers and for us as well and letting the partition > burst always so we implemented something called as a global admission > control where we moved our mission control up" > Looks like the fact that the partition wasn't able to work due to lack of > tokens was a problem for them? I admit I haven't gone through the full > video yet. I think that it provides a good description of what kind of flexibility service consumers (tenant in a multi-tenant) system could be looking for. The particular bursting solution could be thought of a sort of serverless autoscaling solution from the service consumer's perspective. The service consumer pays for what they use. This has a lot of business value and customers are willing to pay for such flexibilitiy. From the service provider's perspective, there's a proper foundation for SLOs that can be operationalized in a way where the system can be optimally provisioned and scaled when capacity demand increases. In a multi-tenant system, the peak loads of different customers smoothen out a lot of the peaks and the load is fairly steady. AWS DynamoDB and Confluent Kora are examples of such multi-tenant systems where the "control plane" is essential to the operations and running at scale. > > Token bucket based solutions are widely used in the industry. I don't > > see a reason why we should be choosing some other conceptual model as > > the basis. > > > > Not another conceptual model, but we should be using the token bucket in a > way that works for real world situations.. I completely agree. > I agree with you here. That's why I mentioned the ideal state of rate > limiter in pulsar OSS version. I see that you did not comment on that in > your reply though.. I guess you are referring to this comment of yours: > I know and almost all OSS projects around the messaging world have very > very basic rate limiters. That is exactly why I am originally requesting > a pluggable rate limiter to support our use case as that's not a common > thing to have (the level of complexity) in the OSS project itself. Although, I > also acknowledge the need of a lot of refactoring and changes in the > based design as we have been discussing here. It would be interesting to hear more of practical details of what makes your use case so special that you need a custom rate limiter and it couldn't be covered by a generic rate limiting which is configured to your use case and requirements. I guess I just didn't want to keep on repeating the same things again and again and instead I've wanted to learn more about your use cases and work together with you to solve your requirements and attempt to generalize it in a way that we could have the implementation directly in Pulsar core. As we discussed these 2 attempts aren't conflicting. We might end up having a generic rate limiter that suites most of your requirements and for some requirements you might need that pluggable rate limiter. However, along all these message thread, there hasn't yet been any concrete examples that make your requirements so special that they couldn't be covered direclty in Pulsar core. Perhaps that's something that you are also iterating on and you might be learning new ways to handle your use cases and requirements while we make progress and small steps on improving the current rate limiter and the maintainability of it in preparation for further improvements. > > Optimally the pause would be such a duration that there's enough > > tokens to send the next message. > > One possible way to estimate the size of the next message is to use > > the average size or a rolling average size as the basis of the > > calculation. > > It might be as short as the minimum time to pause for the timer. > > Instead of using a generic scheduler for pausing, Netty's scheduler > > should be used. > > It is very efficient for high volumes. If there's a bottleneck, it's > > possible to have some minimum duration to pause. > > > > I still don't see any advantage here. What are we trying to save here? > Honestly, any kind of guessing inside the code will lead to weird anomalies > when running in a big system with various different usecases. Yes, that's a good question. The thought I had was about minimizing latency caused by rate limiting. Rate limiting will impact latency since things will have to be slowed down so I agree that it's not really a useful goal to minimize the pause. I guess a better solution is to explicitly configure the minimum available tokens when the pause is scheduled to end or simply by pausing for a configurable amount of time at a time, let's say 100ms or 1 second. > I did not want to confuse you here by bringing in more than one broker into > picture. In any case, resource groups are not a solution here. What you are > suggesting is that we put a check on a namespace or tenant by putting in a > resource group level rate limiting - all that does is restrict the scope of > contention of capacity within that tenant or namespace. Moreover, resource > group level rate limiting is not precise. In another internal discussion > about finalizing the requirements from our side, we found that the example > i provided where multiple topics are bursting at the same time and are > contending for the capacity (globally or within a resource group), there > will be situation where the contention is now leading to throttling of a > topic which is not even trying to burst - since the broker level/resource > group level rate limiter will just block any and all produce requests after > the permits have been exhausted.. the bursting topics could have very well > exhausted those in the first 800ms of a second while a non bursting topic > actually should have had the right to produce in the remaining 200ms of a > second. Thanks for sharing this feedback about resource groups. The problem sounds similar to what was described as the the reason to create AWS DynamoDB Global Admission Control. There seems to be some gaps in the resource groups design that it's not able to solve the problem that such solution should solve (when comparing to the AWS DynamoDB case)? > That is the one of the reasons for putting a strict check on the amount, > duration and frequency of bursting. Please explain more about this. It would be helpful for me in trying to understand your requirements. One question about the solution of "strict check on amount, duration and frequency of bursting". Do you already have a PoC where you have validated that the solution actually solves the problem? Could you simply write a PoC by forking apache/pulsar and changing the code directly without having it pluggable initially? > That's not the only difference, even ignoring the bug.. the default one has > a scheduler running at a lower than 1 second interval. It's only in this > scheduled method that it checks if the limit has breached and blocks the > auto-read.. So if a produce is able to produce whatever they like before > the first trigger of this scheduler, then there is practically no rate > limiting. Now, I don't know if we can call this as contact.. its more of an > implicit one .. or rather, if there is any rate limiting contract in > documentation, this default one certainly breaks that. Yes, you are right. I had forgotten and I completely missed that code path when the default rate limiter is used in publish throttling. Thanks for providing the analysis. I wouldn't call it a contract. It should be possible to fix such bugs. However, it's true that behavior that is considered a bug is part of a contract for someone. I think it's called Hyrum's laws which states this too. Obviously it's the Pulsar community and PMC that decides whether the default rate limiter could be refactored. It's pretty bad if a project gets into a state where changes cannot be made. I believe that we can find a model where 99% of the users will be happy. Can't we just require the remaining 1% to adapt? :) > As mentioned in the comment above, resource groups shouldn't be discussed > here.. they are out of scope of the discussion involving partition level > rate limiting and I did not intend to bring them into the discussion. > I would like to see a comment on my proposal about the state of pulsar rate > limiter. I believe that's true "meeting in the middle". Yes, we don't need to dive deep in details. However, since rate limiting in Pulsar won't be sufficient on it's own to handle capacity management challenges, i think it's useful to at least introduce the concepts so that with the common background information, it's easier to refer to the role of resource groups when something in the rate limiter doesn't belong there but instead belongs to the resource group part of capacity management in Pulsar. Similarly referring to the Pulsar load balancing / load manager and defining and clarifying it's role in capacity management will also help holistic improvements to Pulsar. That's the end goal that many of us have and I believe it's also important for you when you are operating Pulsar at scale. > Thank you for painstakingly replying to these long emails.. This probably > is the longest thread in pulsar ML in recent times :) Likewise, you have been doing your part really well. Thanks! This has been a valuable discussion. It is hard to measure the value of learning and gaining understanding of a problem. The value will show up in the future when we are better equiped to solve the right problems instead of solving the problems that we'd like to solve. :) > Would love to hear a plan from your point of view and what you envision. Thanks, I hope I was able to express most of it in these long emails. I'm having a break next week and after that I was thinking of summarizing this discussion from my viewpoint and then meet in the Pulsar community meeting on November 23rd to discuss the summary and conclusions and the path forward. Perhaps you could also prepare in a similar way where you summarize your viewpoints and we discuss this on Nov 23rd in the Pulsar community meeting together with everyone who is interested to participate. If we have completed the preparation before the meeting, we could possibly already exchange our summaries asynchronously before the meeting. Girish, Would this work for you? -Lari