This would be roughly the same as today's approach: * The limit per partition is X/s * You are given N partitions, so the max quota is X*N
Which would be easy to convert in the total limit allowed is Y and you have N topic, therefore the per-topic quota is Y/N The problem is that there will be always a skew and therefore it would be very difficult for a user to fully utilize the allocated quota. Additionally, that wouldn't work for setting tenant-level limits. -- Matteo Merli <matteo.me...@gmail.com> On Tue, Mar 2, 2021 at 5:51 PM PengHui Li <peng...@apache.org> wrote: > > Yes, this is the drawback if do static partitioning. > > Thanks, > Penghui > > > Kaushik Ghosh <kaush...@splunk.com> 于2021年3月3日周三 上午9:21写道: > > > Wouldn't such a static partitioning approach have the drawback that in a > > pathological case, all the namespaces associated with a certain > > namespace-bundle may be inactive (and not use their quota) while other > > namespaces are over-active and being restricted? > > > > Thanks, > > Kaushik > > > > On Tue, Mar 2, 2021 at 5:03 PM PengHui Li <peng...@apache.org> wrote: > > > >> [ External sender. Exercise caution. ] > >> > >> The approach is sharing the quotas between brokers through an internal > >> topic, for example, if the rate limit is 100msgs/s and the current rate is > >> 50 msgs/s > >> . If share quotas between brokers, we still need to achieve the policy to > >> assign the remaining quotas to multiple brokers. > >> > >> How about assigning the quotas per namespace bundle? If set the publish > >> rate limit to 100msgs/s, and the namespace has 10 bundles, so we can assign > >> 10msgs/s per namespace bundle. > >> Since a bundle always assigned to one broker. So we don't need to share > >> the quotas. > >> > >> Just a rough idea. Instead of the share the quotas between brokers, I > >> know that each approach has advantages and disadvantages, and we have done > >> the broker publish buffer limitation by split the > >> the whole buffer into multiple parts by the iothread. > >> > >> Thanks, > >> Penghui > >> > >> Matteo Merli <mme...@apache.org> 于2021年3月1日周一 下午1:17写道: > >> > >>> > >>> https://github.com/apache/pulsar/wiki/PIP-82%3A-Tenant-and-namespace-level-rate-limiting > >>> > >>> ============= > >>> > >>> > >>> * **Status**: Proposal > >>> * **Authors**: Bharani Chadalavada, Kaushik Ghosh, Ravi Vaidyanathan, > >>> Matteo Merli > >>> * **Pull Request**: > >>> * **Mailing List discussion**: > >>> * **Release**: > >>> > >>> ## Motivation > >>> > >>> Currently in Pulsar, it is possible to configure rate limiting, in > >>> terms of messages/sec or bytes/sec both on the producers or the > >>> consumers for a topic. The rates are configured in the namespace > >>> policies and the enforcement is done at the topic level, or at the > >>> partition level, in the case of a partitioned topic. > >>> > >>> The fact that rate is enforced at topic level doesn’t allow to control > >>> the max rate across a given namespace (a namespace can span multiple > >>> brokers). For example if the limit is 100msg/s per topic, a user can > >>> simply create more topics to keep increasing the load on the system. > >>> > >>> Instead, we should have a way to better define producers and consumers > >>> limit for a namespace or a Pulsar tenant and have the Pulsar brokers > >>> to collectively enforce them. > >>> > >>> ## Goal > >>> > >>> The goal for this feature is to allow users to configure a namespace > >>> or tenant wide limit for producers and consumers and have that > >>> enforced irrespective of the number of topics in the namespace, with > >>> fair sharing of the quotas. > >>> > >>> Another important aspect is that the quota enforcement needs to be > >>> able to dynamically adjust when the quota is raised or reduced. > >>> > >>> ### Non-goals > >>> > >>> It is not a goal to provide a super strict limiter, rather the > >>> implementation would be allowed to either undercount or overcount for > >>> short amounts of time, as long as the limiting converges close to the > >>> configured quota, with an approximation of, say, 10%. > >>> > >>> It is not a goal to allow users to configure limits at multiple levels > >>> (tenant/namespace/topic) and implement a hierarchical enforcement > >>> mechanism. > >>> > >>> If the limits are configured at tenant level, it is not a goal to > >>> evenly distribute the quotas across all namespaces. Similarly if the > >>> limits are configured at namespace level, it is not a goal to evenly > >>> distribute the quota across all topics in the namespace. > >>> > >>> > >>> ## Implementation > >>> > >>> ### Configuration of quotas > >>> > >>> In order to implement the rate limiting per namespace or tenant, we’re > >>> going to introduce the concept of a “ResourceGroup”. A ResourceGroup > >>> is defined as the grouping of different rate limit quotas and it can > >>> be associated with different resources, for example a Pulsar tenant or > >>> a Pulsar namespace to start with. > >>> > >>> In addition to rate limiting (in bytes/s and msg/s), for producers and > >>> consumers, the configuration of the ResourceGroup might also contain > >>> additional quotas in the future, such as the storage quota, although > >>> that is outside the scope of this current proposal. > >>> > >>> ### Enforcement > >>> > >>> In order to enforce the limit over several topics that belong to a > >>> particular namespace, we need to have multiple brokers to cooperate > >>> with each other with a feedback mechanism. With this each broker will > >>> be able to know, within the scope of a particular ResourceGroup, how > >>> much of the portion of the quota is currently being used by other > >>> brokers. > >>> > >>> Each broker will then make sure that the available quota is split > >>> optimally between the brokers who are requesting it. > >>> > >>> Note: Pulsar currently supports topic/partition level rate-limiting, > >>> if that is configured along with the new namespace wide rate-limiting > >>> using resource groups then both configurations will be effective. In > >>> effect, at the broker level the old config will be enforced and also > >>> the namespace level rate-limiter will be enforced, so the more > >>> stringent of the two will get enforced. > >>> > >>> At some point in the future it will be good to make topic/partition > >>> quota configuration to fit within the namespace level ratelimiter and > >>> more self-explanatory. At that point the old configuration could be > >>> deprecated over time, Not in the scope of this feature though. > >>> > >>> > >>> #### Communications between brokers > >>> > >>> Brokers will be talking to each other using a regular Pulsar > >>> topic. For the purposes of this feature, a non-persistent topic will > >>> be the ideal choice to have minimum resources requirement and always > >>> giving the last data value. We can mostly ignore the data losses as > >>> part of an “undercounting” event which will lead to exceed the quota > >>> for a brief amount of time. > >>> > >>> Each broker will publish the current actual usage, as an absolute > >>> number, for each of the ResourceGroups that are currently having > >>> traffic, and for which the traffic has changed significantly since the > >>> last time it was reported (eg: ±10%). Each broker will also use these > >>> updates to keep track of which brokers are communicating on various > >>> ResourceGroups; hence, each broker that is active on a ResourceGroups > >>> will mandatorily report its usage once in N cycles (value of N may be > >>> configurable), even if the traffic has not changed significantly. > >>> > >>> The update will be in the form of a ProtocolBuffer message published > >>> on the internal topic. The format of the update will be like: > >>> > >>> ``` > >>> { > >>> broker : “broker-1.example.com”, > >>> usage : { > >>> “tenant-1/ns1” : { > >>> topics: 1, > >>> publishedMsg : 100, > >>> publishedBytes : 100000, > >>> }, > >>> “tenant-1/ns2” : { > >>> topics: 1, > >>> publishedMsg : 1000, > >>> publishedBytes : 500000, > >>> }, > >>> “tenant-2” : { > >>> topics: 1, > >>> publishedMsg : 80000, > >>> publishedBytes : 9999999, > >>> }, > >>> } > >>> } > >>> ``` > >>> > >>> Each broker will use a Pulsar reader on the topic and will receive > >>> every update from other brokers. These updates will get inserted into > >>> a hash map: > >>> > >>> ``` > >>> Map<ResourceGroup, Map<BrokerName, Usage>> > >>> ``` > >>> > >>> With this, each broker will be aware of the actual usage done by each > >>> broker on the particular resource group. It will then proceed to > >>> adjust the rate on a local in-memory rate limiter, in the same way > >>> we’re currently doing the per-topic rate limiting. > >>> > >>> Example of usage distribution for a given ResourceGroup with a quota > >>> of 100. Let’s assume that the quota-assignment of 100 to this > >>> ResourceGroup is known to all the brokers (through configuration not > >>> shown here). > >>> > >>> * broker-1: 10 > >>> * broker-2: 50 > >>> * broker-3: 30 > >>> > >>> In this case, each broker will adjust their own local limits to > >>> utilize the remaining 10 units. They might each split up the remaining > >>> portion, each adding the remaining 10 units: > >>> > >>> * broker-1 : 20 > >>> * broker-2: 60 > >>> * broker-3: 40 > >>> > >>> In the short term, this will lead to passing the set quota, but it > >>> will quickly converge in just a few cycles to the fair values. > >>> > >>> Alternatively, each broker may split up the 10 units proportionally, > >>> based on historic usage (so they can use 1/9th, 5/9ths, and 1/3rd of > >>> the residual 10 units). > >>> > >>> * broker-1 : 11.11 > >>> * broker-2: 55.56 > >>> * broker-3: 33.33 > >>> > >>> The opposite would happen (each broker would reduce its usage by the > >>> corresponding fractional amount) if the recent-usage was over the > >>> quota assigned on the resource-group. > >>> > >>> In a similar way, brokers will try to “steal” part of the quota when > >>> there is another broker using a bigger portion. For example, consider > >>> the following usage report map: > >>> > >>> * broker-1: 80 > >>> * broker-2: 20 > >>> > >>> Broker-2 has the rate limiter set to 20 and that also reflects the > >>> actual usage and therefore could just mean that broker-2 is unfairly > >>> throttled. Since broker-1 is dominant in the usage map, broker-2 will > >>> set the local limiter to a value that is higher than 20, for example > >>> half-way to the next broker, in this case to `20 + (80 - 20)/2 - 50`. > >>> > >>> If indeed, broker-2 has more demand for traffic, that will increase > >>> broker-2 usage to 30 in the next update and it consequently trigger > >>> broker-1 to reduce its limit to 70. This step-by-step will continue > >>> until it converges to the equilibrium point. > >>> > >>> Generalizing it for the N brokers case, the broker with the lowest > >>> quota will steal part of the quota of the most dominant broker. Broker > >>> with second lowest quota will try to steal part of the quota of the > >>> second dominant broker and so on till all brokers converge to the > >>> equilibrium point. > >>> > >>> #### Goal > >>> > >>> Whenever an event that influences the quota allocation > >>> (broker/producer/consumer joins or leaves) occurs, the quota > >>> adjustment step function needs to converge the quotas to stable > >>> allocations in minimum number of iterations, while also ensuring that: > >>> > >>> * The adjustment curve should be smooth instead of being jagged. > >>> * The quota is not under-utilized. > >>> - For example if the quota is 100 and there are two brokers and > >>> broker-1 is allocated 70, broker-2 is allocated 30. If > >>> broker-1's usage is 80 and broker-2's usage is 20 we need to > >>> ensure the design does not lead to under-utilization > >>> * Fairness of quota allocation across brokers. > >>> - If quota is 100 both brokers are seeing a uniform load of say 70, > >>> but one broker is allocated 70 and the other is allocated 30. > >>> > >>> > >>> #### Clearing up stale broker entries > >>> > >>> Brokers are only sending updates in the common topic if there are > >>> significant changes, or if they have not reported for a (configurable) > >>> number of rounds due to unchanged usage. This is to minimize the > >>> amount of traffic in the common topic and work to be done to process > >>> them. > >>> > >>> When a broker publishes an update with a quota of 0, everyone will > >>> remove that broker from the usage map. In the same way, when brokers > >>> detect that one broker went down, through the ZooKeeper registration, > >>> it will be clearing that broker from all the usage maps. > >>> > >>> #### Rate limiting across topics/partitions > >>> > >>> For each tenant/namespace that a broker is managing, the usage > >>> reported by it is the aggregate of usages for all > >>> topics/partitions. Therefore, the quota adjustment function will > >>> divide the quota proportionally (taking the usages reported by other > >>> brokers into consideration). And within the quota allocated to the > >>> broker, it can choose to either sub-divide it evenly across it’s own > >>> topics/partitions or sub-divide it proportional to the usages of each > >>> topic/partition. > >>> > >>> > >>> ### Resource consumption considerations > >>> > >>> With the proposed implementation, each broker will keep a full map of > >>> all the resource groups and their usage, broker by broker. The amount > >>> of memory/bandwidth consumption will depend on several factors such as > >>> number of namespaces, brokers etc. Below is an approximate estimate > >>> for one type of scaled scenario [where the quotas are enforced at a > >>> namespace level]. > >>> > >>> In this scenario, let’s consider: > >>> * 100000 namespaces > >>> * 100 brokers. > >>> > >>> Each namespace is spread across 5 brokers. So, each broker is managing > >>> 5000 namespaces. > >>> > >>> #### Memory > >>> > >>> For a given namespace, each broker stores usage from 5 other brokers > >>> (including itself). > >>> > >>> * Size of usage = 16 bytes (bytes+messages) > >>> * Size of usage for publish+consume = 32bytes > >>> * For one namespace, usage of 5 brokers = 32*5 > >>> * 5000 namespaces = 32*5*5000 = 800K > >>> > >>> Meta-data overhead [Assuming that namespace name is about 80 bytes and > >>> broker name is 40 bytes]: `5000*80 + 5 * 40 = 400K bytes`. > >>> > >>> Total memory requirement ~= 1MB. > >>> > >>> #### Bandwidth > >>> > >>> Each broker sends the usage for the namespaces that it manages. > >>> > >>> * Size of usage = 16 bytes > >>> * Size of usage for publish+consume = 32bytes > >>> * For 5000 namespaces, each broker publishes periodically (say every > >>> minute): 32*5000 = 160K bytes. > >>> * Metadata overhead [assuming broker name is 40 bytes and namespace > >>> is 80 bytes]: 5000*80 = 400K. > >>> * For 100 brokers: (160K + 400K) * 100 = 56MB. > >>> > >>> So, publish side network bandwidth is 56MB. > >>> Including the consumption side (across all brokers), it is 56MB*100 = > >>> 5.6G. > >>> > >>> Few optimizations that can reduce the overall bandwidth: > >>> > >>> * Brokers publish usage only if the usage is significantly different > >>> from the previous update. > >>> * Use compression (trade-off is higher CPU). > >>> * Publish to more than one system topic (so the network load gets > >>> distributed across brokers). > >>> * Since metadata changes [ namespace/tenant addition/removal ] are > >>> rare, publish the metadata update [namespace name to ID mapping] > >>> only when there is a change. The Usage report will carry the > >>> namespace/tenant ID instead of the name. > >>> > >>> > >>> #### Persistent storage > >>> > >>> The usage data doesn’t require persistent storage. So, there is no > >>> persistent storage overhead. > >>> > >>> > >>> ## Alternative approaches > >>> > >>> ### External database > >>> > >>> One approach is to use a distributed DB (such as Cassandra, redis, > >>> memcached etc) to store the quota counter and usage counters. Quota > >>> reservation/refresh can just be increment/decrement operations on the > >>> quota counter. The approach may seem reasonable, but has a few issues: > >>> > >>> * Atomic increment/decrement operations on the distributed counter > >>> can incur significant latency overhead. > >>> * Dependency on external systems has a very high operational cost. It > >>> is yet another cluster that needs to be deployed, maintained, > >>> updated etc. > >>> > >>> ### Centralized implementation > >>> > >>> One broker is designated as leader for a given resource group and the > >>> quota allocation is computed by that broker. The allocation is then > >>> disseminated to other brokers. “Exclusive producer” feature can be > >>> used for this purpose. This approach has a few issues. > >>> > >>> * More complex implementation because of leader election. > >>> * If the leader dies, another needs to be elected which can lead to > >>> unnecessary latency. > >>> > >>> ### Using Zookeeper > >>> > >>> Another possible approach would have been for the brokers to exchange > >>> usage information through Zookeeper. This was not pursued because of > >>> perceived scaling issues: with a large number of brokers and/or > >>> namespaces/tenants to rate-limit, the size of messages and the high > >>> volume of writes into Zookeeper could become a problem. Since > >>> Zookeeper is already used in certain parts of Pulsar, it was decided > >>> that we should not burden that subsystem with more work. > >>> > >>> > >>> > >>> > >>> -- > >>> Matteo Merli > >>> <mme...@apache.org> > >>> > >>