+1 Penghui
On Sat, Apr 9, 2022 at 4:24 PM Haiting Jiang <jianghait...@apache.org> wrote: > Hi Pulsar community, > > I created a PIP to add support for subscription level dispatch rate > limiter setting. > > The proposal can be found: https://github.com/apache/pulsar/issues/15094 > > ---- > > ## Motivation > > Currently, for message dispatch rate limiter in a subscription , we have 3 > level setting : > - Broker level setting: configured with > `dispatchThrottlingRatePerSubscriptionInMsg` and > `dispatchThrottlingRatePerSubscriptionInByte` in broker.conf > - Namespace level setting: configured with > `org.apache.pulsar.client.admin.Namespaces#setSubscriptionDispatchRate` > - Topic level setting: configured with > `org.apache.pulsar.client.admin.TopicPolicies#setSubscriptionDispatchRate` > > As we all know, in the pub-sub messaging model, different subscriber of > the same topic process the messages for various purpose, and they may have > different requirement of message dispatch rate limiter. Here are some use > case in my organization: > - On the client side, subscriptions have different max-process-capacity. > If the dispatch rate is too large, they may crush their downstream services. > - We are billing base on the max message rate of the subscription. Some > are sensitive to budgets and willing to pay less for lower throughput. > > > ## Goal > > Support subscription level dispatch rate limiter setting. > > ## API Changes > > > 1. Add client api in org.apache.pulsar.client.admin.TopicPolicies. > ``` > void getSubscriptionDispatchRate(String topic, String sub) throws > PulsarAdminException; > void getSubscriptionDispatchRate(String topic, String sub, boolean > applied) throws PulsarAdminException; > void setSubscriptionDispatchRate(String topic, String sub, DispatchRate > dispatchRate) throws PulsarAdminException; > void removeSubscriptionDispatchRate(String topic, String sub) throws > PulsarAdminException; > > //And the async version of these methods. > > ``` > > 2. Add new admin API > > ``` > @PUT @DELETE @GET > @Path("/{tenant}/{namespace}/{topic}/{subName}/dispatchRate") > > ``` > > ## Implementation > > The rate limiter itself is already implemented with each subscription. We > only need to update the rate limiter settings if subscription level config > is set. > I propose to just add a new field in > `org.apache.pulsar.common.policies.data.TopicPolicies` to store the data. > ``` > private Map<String/*SubName*/, DispatchRateImpl> > subscriptionDispatchRateMap; > ``` > And subscription level rate limiter setting has higher priority than topic > level. We need to calculate the applied value when we create the > subscription or any level config is changed. > > ## Reject Alternatives > None yet. > > > Thanks, > Haiting >