This would be roughly the same as today's approach:
 * The limit per partition is X/s
 * You are given N partitions, so the max quota is X*N

Which would be easy to convert in the total limit allowed is Y and you
have N topic, therefore the per-topic quota is Y/N

The problem is that there will be always a skew and therefore it would
be very difficult for a user to fully utilize the allocated quota.

Additionally, that wouldn't work for setting tenant-level limits.


--
Matteo Merli
<matteo.me...@gmail.com>

On Tue, Mar 2, 2021 at 5:51 PM PengHui Li <peng...@apache.org> wrote:
>
> Yes, this is the drawback if do static partitioning.
>
> Thanks,
> Penghui
>
>
> Kaushik Ghosh <kaush...@splunk.com> 于2021年3月3日周三 上午9:21写道:
>
> > Wouldn't such a static partitioning approach have the drawback that in a
> > pathological case, all the namespaces associated with a certain
> > namespace-bundle may be inactive (and not use their quota) while other
> > namespaces are over-active and being restricted?
> >
> > Thanks,
> > Kaushik
> >
> > On Tue, Mar 2, 2021 at 5:03 PM PengHui Li <peng...@apache.org> wrote:
> >
> >> [ External sender. Exercise caution. ]
> >>
> >> The approach is sharing the quotas between brokers through an internal
> >> topic, for example, if the rate limit is 100msgs/s and the current rate is
> >> 50 msgs/s
> >> . If share quotas between brokers, we still need to achieve the policy to
> >> assign the remaining quotas to multiple brokers.
> >>
> >> How about assigning the quotas per namespace bundle? If set the publish
> >> rate limit to 100msgs/s, and the namespace has 10 bundles, so we can assign
> >> 10msgs/s per namespace bundle.
> >> Since a bundle always assigned to one broker. So we don't need to share
> >> the quotas.
> >>
> >> Just a rough idea. Instead of the share the quotas between brokers, I
> >> know that each approach has advantages and disadvantages, and we have done
> >> the broker publish buffer limitation by split the
> >> the whole buffer into multiple parts by the iothread.
> >>
> >> Thanks,
> >> Penghui
> >>
> >> Matteo Merli <mme...@apache.org> 于2021年3月1日周一 下午1:17写道:
> >>
> >>>
> >>> https://github.com/apache/pulsar/wiki/PIP-82%3A-Tenant-and-namespace-level-rate-limiting
> >>>
> >>> =============
> >>>
> >>>
> >>> * **Status**: Proposal
> >>> * **Authors**: Bharani Chadalavada, Kaushik Ghosh, Ravi Vaidyanathan,
> >>> Matteo Merli
> >>> * **Pull Request**:
> >>> * **Mailing List discussion**:
> >>> * **Release**:
> >>>
> >>> ## Motivation
> >>>
> >>> Currently in Pulsar, it is possible to configure rate limiting, in
> >>> terms of messages/sec or bytes/sec both on the producers or the
> >>> consumers for a topic. The rates are configured in the namespace
> >>> policies and the enforcement is done at the topic level, or at the
> >>> partition level, in the case of a partitioned topic.
> >>>
> >>> The fact that rate is enforced at topic level doesn’t allow to control
> >>> the max rate across a given namespace (a namespace can span multiple
> >>> brokers). For example if the limit is 100msg/s per topic, a user can
> >>> simply create more topics to keep increasing the load on the system.
> >>>
> >>> Instead, we should have a way to better define producers and consumers
> >>> limit for a namespace or a Pulsar tenant and have the Pulsar brokers
> >>> to collectively enforce them.
> >>>
> >>> ## Goal
> >>>
> >>> The goal for this feature is to allow users to configure a namespace
> >>> or tenant wide limit for producers and consumers and have that
> >>> enforced irrespective of the number of topics in the namespace, with
> >>> fair sharing of the quotas.
> >>>
> >>> Another important aspect is that the quota enforcement needs to be
> >>> able to dynamically adjust when the quota is raised or reduced.
> >>>
> >>> ### Non-goals
> >>>
> >>> It is not a goal to provide a super strict limiter, rather the
> >>> implementation would be allowed to either undercount or overcount for
> >>> short amounts of time, as long as the limiting converges close to the
> >>> configured quota, with an approximation of, say, 10%.
> >>>
> >>> It is not a goal to allow users to configure limits at multiple levels
> >>> (tenant/namespace/topic) and implement a hierarchical enforcement
> >>> mechanism.
> >>>
> >>> If the limits are configured at tenant level, it is not a goal to
> >>> evenly distribute the quotas across all namespaces. Similarly if the
> >>> limits are configured at namespace level, it is not a goal to evenly
> >>> distribute the quota across all topics in the namespace.
> >>>
> >>>
> >>> ## Implementation
> >>>
> >>> ### Configuration of quotas
> >>>
> >>> In order to implement the rate limiting per namespace or tenant, we’re
> >>> going to introduce the concept of a “ResourceGroup”. A ResourceGroup
> >>> is defined as the grouping of different rate limit quotas and it can
> >>> be associated with different resources, for example a Pulsar tenant or
> >>> a Pulsar namespace to start with.
> >>>
> >>> In addition to rate limiting (in bytes/s and msg/s), for producers and
> >>> consumers, the configuration of the ResourceGroup might also contain
> >>> additional quotas in the future, such as the storage quota, although
> >>> that is outside the scope of this current proposal.
> >>>
> >>> ### Enforcement
> >>>
> >>> In order to enforce the limit over several topics that belong to a
> >>> particular namespace, we need to have multiple brokers to cooperate
> >>> with each other with a feedback mechanism. With this each broker will
> >>> be able to know, within the scope of a particular ResourceGroup, how
> >>> much of the portion of the quota is currently being used by other
> >>> brokers.
> >>>
> >>> Each broker will then make sure that the available quota is split
> >>> optimally between the brokers who are requesting it.
> >>>
> >>> Note: Pulsar currently supports topic/partition level rate-limiting,
> >>> if that is configured along with the new namespace wide rate-limiting
> >>> using resource groups then both configurations will be effective. In
> >>> effect, at the broker level the old config will be enforced and also
> >>> the namespace level rate-limiter will be enforced, so the more
> >>> stringent of the two will get enforced.
> >>>
> >>> At some point in the future it will be good to make topic/partition
> >>> quota configuration to fit within the namespace level ratelimiter and
> >>> more self-explanatory. At that point the old configuration could be
> >>> deprecated over time, Not in the scope of this feature though.
> >>>
> >>>
> >>> #### Communications between brokers
> >>>
> >>> Brokers will be talking to each other using a regular Pulsar
> >>> topic. For the purposes of this feature, a non-persistent topic will
> >>> be the ideal choice to have minimum resources requirement and always
> >>> giving the last data value. We can mostly ignore the data losses as
> >>> part of an “undercounting” event which will lead to exceed the quota
> >>> for a brief amount of time.
> >>>
> >>> Each broker will publish the current actual usage, as an absolute
> >>> number, for each of the ResourceGroups that are currently having
> >>> traffic, and for which the traffic has changed significantly since the
> >>> last time it was reported (eg: ±10%). Each broker will also use these
> >>> updates to keep track of which brokers are communicating on various
> >>> ResourceGroups; hence, each broker that is active on a ResourceGroups
> >>> will mandatorily report its usage once in N cycles (value of N may be
> >>> configurable), even if the traffic has not changed significantly.
> >>>
> >>> The update will be in the form of a ProtocolBuffer message published
> >>> on the internal topic. The format of the update will be like:
> >>>
> >>> ```
> >>> {
> >>>    broker : “broker-1.example.com”,
> >>>    usage : {
> >>>       “tenant-1/ns1” : {
> >>>          topics: 1,
> >>>          publishedMsg : 100,
> >>>          publishedBytes : 100000,
> >>>       },
> >>>       “tenant-1/ns2” : {
> >>>          topics: 1,
> >>>          publishedMsg : 1000,
> >>>          publishedBytes : 500000,
> >>>       },
> >>>       “tenant-2” : {
> >>>          topics: 1,
> >>>          publishedMsg : 80000,
> >>>          publishedBytes : 9999999,
> >>>       },
> >>>    }
> >>> }
> >>> ```
> >>>
> >>> Each broker will use a Pulsar reader on the topic and will receive
> >>> every update from other brokers. These updates will get inserted into
> >>> a hash map:
> >>>
> >>> ```
> >>> Map<ResourceGroup, Map<BrokerName, Usage>>
> >>> ```
> >>>
> >>> With this, each broker will be aware of the actual usage done by each
> >>> broker on the particular resource group. It will then proceed to
> >>> adjust the rate on a local in-memory rate limiter, in the same way
> >>> we’re currently doing the per-topic rate limiting.
> >>>
> >>> Example of usage distribution for a given ResourceGroup with a quota
> >>> of 100. Let’s assume that the quota-assignment of 100 to this
> >>> ResourceGroup is known to all the brokers (through configuration not
> >>> shown here).
> >>>
> >>>  * broker-1: 10
> >>>  * broker-2:  50
> >>>  * broker-3: 30
> >>>
> >>> In this case, each broker will adjust their own local limits to
> >>> utilize the remaining 10 units. They might each split up the remaining
> >>> portion, each adding the remaining 10 units:
> >>>
> >>>  * broker-1 : 20
> >>>  * broker-2: 60
> >>>  * broker-3: 40
> >>>
> >>> In the short term, this will lead to passing the set quota, but it
> >>> will quickly converge in just a few cycles to the fair values.
> >>>
> >>> Alternatively, each broker may split up the 10 units proportionally,
> >>> based on historic usage (so they can use 1/9th, 5/9ths, and 1/3rd of
> >>> the residual 10 units).
> >>>
> >>>  * broker-1 : 11.11
> >>>  * broker-2: 55.56
> >>>  * broker-3: 33.33
> >>>
> >>> The opposite would happen (each broker would reduce its usage by the
> >>> corresponding fractional amount) if the recent-usage was over the
> >>> quota assigned on the resource-group.
> >>>
> >>> In a similar way, brokers will try to “steal” part of the quota when
> >>> there is another broker using a bigger portion. For example, consider
> >>> the following usage report map:
> >>>
> >>>  * broker-1: 80
> >>>  * broker-2:  20
> >>>
> >>> Broker-2 has the rate limiter set to 20 and that also reflects the
> >>> actual usage and therefore could just mean that broker-2 is unfairly
> >>> throttled. Since broker-1 is dominant in the usage map, broker-2 will
> >>> set the local limiter to a value that is higher than 20, for example
> >>> half-way to the next broker, in this case to `20 + (80 - 20)/2 - 50`.
> >>>
> >>> If indeed, broker-2 has more demand for traffic, that will increase
> >>> broker-2 usage to 30 in the next update and it consequently trigger
> >>> broker-1 to reduce its limit to 70. This step-by-step will continue
> >>> until it converges to the equilibrium point.
> >>>
> >>> Generalizing it for the N brokers case, the broker with the lowest
> >>> quota will steal part of the quota of the most dominant broker. Broker
> >>> with second lowest quota will try to steal part of the quota of the
> >>> second dominant broker and so on till all brokers converge to the
> >>> equilibrium point.
> >>>
> >>> #### Goal
> >>>
> >>> Whenever an event that influences the quota allocation
> >>> (broker/producer/consumer joins or leaves) occurs, the quota
> >>> adjustment step function needs to converge the quotas to stable
> >>> allocations in minimum number of iterations, while also ensuring that:
> >>>
> >>>  * The adjustment curve should be smooth instead of being jagged.
> >>>  * The quota is not under-utilized.
> >>>    - For example if the quota is 100 and there are two brokers and
> >>>       broker-1 is allocated 70, broker-2 is allocated 30. If
> >>>       broker-1's usage is 80 and broker-2's usage is 20 we need to
> >>>       ensure the design does not lead to under-utilization
> >>>  * Fairness of quota allocation across brokers.
> >>>    - If quota is 100 both brokers are seeing a uniform load of say 70,
> >>>      but one broker is allocated 70 and the other is allocated 30.
> >>>
> >>>
> >>> #### Clearing up stale broker entries
> >>>
> >>> Brokers are only sending updates in the common topic if there are
> >>> significant changes, or if they have not reported for a (configurable)
> >>> number of rounds due to unchanged usage. This is to minimize the
> >>> amount of traffic in the common topic and work to be done to process
> >>> them.
> >>>
> >>> When a broker publishes an update with a quota of 0, everyone will
> >>> remove that broker from the usage map. In the same way, when brokers
> >>> detect that one broker went down, through the ZooKeeper registration,
> >>> it will be clearing that broker from all the usage maps.
> >>>
> >>> #### Rate limiting across topics/partitions
> >>>
> >>> For each tenant/namespace that a broker is managing, the usage
> >>> reported by it is the aggregate of usages for all
> >>> topics/partitions. Therefore, the quota adjustment function will
> >>> divide the quota proportionally (taking the usages reported by other
> >>> brokers into consideration). And within the quota allocated to the
> >>> broker, it can choose to either sub-divide it evenly across it’s own
> >>> topics/partitions or sub-divide it proportional to the usages of each
> >>> topic/partition.
> >>>
> >>>
> >>> ### Resource consumption considerations
> >>>
> >>> With the proposed implementation, each broker will keep a full map of
> >>> all the resource groups and their usage, broker by broker.  The amount
> >>> of memory/bandwidth consumption will depend on several factors such as
> >>> number of namespaces, brokers etc. Below is an approximate estimate
> >>> for one type of scaled scenario [where the quotas are enforced at a
> >>> namespace level].
> >>>
> >>> In this scenario, let’s consider:
> >>>  * 100000 namespaces
> >>>  * 100 brokers.
> >>>
> >>> Each namespace is spread across 5 brokers. So, each broker is managing
> >>> 5000 namespaces.
> >>>
> >>> #### Memory
> >>>
> >>> For a given namespace, each broker stores usage from 5 other brokers
> >>> (including itself).
> >>>
> >>>  * Size of usage = 16 bytes (bytes+messages)
> >>>  * Size of usage for publish+consume = 32bytes
> >>>  * For one namespace, usage of 5 brokers = 32*5
> >>>  * 5000 namespaces = 32*5*5000 = 800K
> >>>
> >>> Meta-data overhead [Assuming that namespace name is about 80 bytes and
> >>> broker name is 40 bytes]: `5000*80 + 5 * 40 = 400K bytes`.
> >>>
> >>> Total memory requirement ~= 1MB.
> >>>
> >>> #### Bandwidth
> >>>
> >>> Each broker sends the usage for the namespaces that it manages.
> >>>
> >>>  * Size of usage = 16 bytes
> >>>  * Size of usage for publish+consume = 32bytes
> >>>  * For 5000 namespaces, each broker publishes periodically (say every
> >>>    minute): 32*5000 = 160K bytes.
> >>>  * Metadata overhead [assuming broker name is 40 bytes and namespace
> >>>    is 80 bytes]: 5000*80 = 400K.
> >>>  * For 100 brokers: (160K + 400K) * 100 = 56MB.
> >>>
> >>> So, publish side network bandwidth is 56MB.
> >>> Including the consumption side (across all brokers), it is 56MB*100 =
> >>> 5.6G.
> >>>
> >>> Few optimizations that can reduce the overall bandwidth:
> >>>
> >>>  * Brokers publish usage only if the usage is significantly different
> >>>    from the previous update.
> >>>  * Use compression (trade-off is higher CPU).
> >>>  * Publish to more than one system topic (so the network load gets
> >>>    distributed across brokers).
> >>>  * Since metadata changes [ namespace/tenant addition/removal ] are
> >>>    rare, publish the metadata update [namespace name to ID mapping]
> >>>    only when there is a change. The Usage report will carry the
> >>>    namespace/tenant ID instead of the name.
> >>>
> >>>
> >>> #### Persistent storage
> >>>
> >>> The usage data doesn’t require persistent storage. So, there is no
> >>> persistent storage overhead.
> >>>
> >>>
> >>> ## Alternative approaches
> >>>
> >>> ### External database
> >>>
> >>> One approach is to use a distributed DB (such as Cassandra, redis,
> >>> memcached etc) to store the quota counter and usage counters. Quota
> >>> reservation/refresh can just be increment/decrement operations on the
> >>> quota counter. The approach may seem reasonable, but has a few issues:
> >>>
> >>>  * Atomic increment/decrement operations on the distributed counter
> >>>    can incur significant latency overhead.
> >>>  * Dependency on external systems has a very high operational cost. It
> >>>    is yet another cluster that needs to be deployed, maintained,
> >>>    updated etc.
> >>>
> >>> ### Centralized implementation
> >>>
> >>> One broker is designated as leader for a given resource group and the
> >>> quota allocation is computed by that broker. The allocation is then
> >>> disseminated to other brokers. “Exclusive producer” feature can be
> >>> used for this purpose. This approach has a few issues.
> >>>
> >>>  * More complex implementation because of leader election.
> >>>  * If the leader dies, another needs to be elected which can lead to
> >>>    unnecessary latency.
> >>>
> >>> ### Using Zookeeper
> >>>
> >>> Another possible approach would have been for the brokers to exchange
> >>> usage information through Zookeeper. This was not pursued because of
> >>> perceived scaling issues: with a large number of brokers and/or
> >>> namespaces/tenants to rate-limit, the size of messages and the high
> >>> volume of writes into Zookeeper could become a problem. Since
> >>> Zookeeper is already used in certain parts of Pulsar, it was decided
> >>> that we should not burden that subsystem with more work.
> >>>
> >>>
> >>>
> >>>
> >>> --
> >>> Matteo Merli
> >>> <mme...@apache.org>
> >>>
> >>

Reply via email to