Hi Pulsar community: I open a pip to discuss "Shadow Topic, an alternative way to support readonly topic ownership." Proposal Link:
- issue link: https://github.com/apache/pulsar/issues/16274 - pr link: https://github.com/apache/pulsar/pull/16281 --- ## Motivation The modular load manager, implemented in `ModularLoadManagerImpl`, is a flexible alternative to the previously implemented load manager, which attempts to simplify how load is managed while also providing abstractions so that complex load management strategies may be implemented. The load management component determines the criteria for unloading bundles and contains the following load shedding strategy: `OverloadShedder` and `ThresholdShedder` and `UniformLoadShedder`. (default is `ThresholdShedder`since 2.10.0) - `OverloadShedder`: This strategy attempts to shed exactly one bundle on brokers which are overloaded - `ThresholdShedder`: This strategy unloads any broker that exceeds the average resource utilization of all brokers by a configured threshold. - `UniformLoadShedder`:This strategy tends to distribute load uniformly across all brokers. However, the bundle placement strategy contains only one: `LeastLongTermMessageRate`, which selects a broker based on which one has the least long term message rate. The load management in our pulsar cluster use `ThresholdShedder` as load shedding strategy, and use `LeastLongTermMessageRate` as bundle placement strategy, which does not work well. Some broker nodes have a high load when the traffic of some topics are relatively large. The load shedding strategy will unload some bundles in any broker that exceeds the average resource utilization of all brokers by a configured threshold. And the bundles will be transferred to the next broker node. However it causes the load of the next broker node exceed the average resource utilization. Therefore, the load balancing will occur again on the current broker node due to high load. Worse yet, this scenario keeps popping up. The load shedding strategy configuration is as follows ``` # load shedding strategy, support OverloadShedder and ThresholdShedder, default is OverloadShedder loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder # The broker resource usage threshold. # When the broker resource usage is greater than the pulsar cluster average resource usage, # the threshold shedder will be triggered to offload bundles from the broker. # It only takes effect in the ThresholdShedder strategy. loadBalancerBrokerThresholdShedderPercentage=10 # When calculating new resource usage, the history usage accounts for. # It only takes effect in the ThresholdShedder strategy. loadBalancerHistoryResourcePercentage=0.9 # The BandWithIn usage weight when calculating new resource usage. # It only takes effect in the ThresholdShedder strategy. loadBalancerBandwithInResourceWeight=1.0 # The BandWithOut usage weight when calculating new resource usage. # It only takes effect in the ThresholdShedder strategy. loadBalancerBandwithOutResourceWeight=1.0 # The CPU usage weight when calculating new resource usage. # It only takes effect in the ThresholdShedder strategy. loadBalancerCPUResourceWeight=1.0 # The heap memory usage weight when calculating new resource usage. # It only takes effect in the ThresholdShedder strategy. loadBalancerMemoryResourceWeight=0.1 # The direct memory usage weight when calculating new resource usage. # It only takes effect in the ThresholdShedder strategy. loadBalancerDirectMemoryResourceWeight=0.1 # Bundle unload minimum throughput threshold (MB), avoiding bundle unload frequently. # It only takes effect in the ThresholdShedder strategy. loadBalancerBundleUnloadMinThroughputThreshold=0.1 ``` The following screenshots are the status of the cluster: Problem 1. Load balancing took a long time 10 hours and over 400 times, and it has been unloading if there is a large traffic. <img width="1247" alt="image" src=" https://user-images.githubusercontent.com/4970972/176341641-b85f8258-e973-4b14-8875-16be573dcbda.png "> <img width="1245" alt="image" src=" https://user-images.githubusercontent.com/4970972/176341676-ed81b465-10fc-4051-8353-42e6306d4210.png "> Problem 2. The effect of cpu balancing is poor. <img width="1247" alt="image" src=" https://user-images.githubusercontent.com/4970972/176341746-d3b28234-11ef-48c4-9f91-2fdf7bcde74b.png "> <img width="1246" alt="image" src=" https://user-images.githubusercontent.com/4970972/176341792-b77a0691-b402-4fa0-a7aa-ac15c890613a.png "> The load shedding strategy `ThresholdShedder` work well, but not the bundle placement strategyLeastLongTermMessageRate . There are 3 possible reasons for the problems. 1. Although the cluster has many brokers with low load, there are fewer brokers to be considered for assignment. <img width="1168" alt="image" src=" https://user-images.githubusercontent.com/4970972/176341873-6da69749-3c1d-49cf-9e83-b942a8327db0.png "> Some brokers with lower load but more bundles can not be candidate due to distributing bundles evenly in LoadManager by force. Most of brokers are filtered out by the strategy, only 1 or 2 brokers can be candidate in the total 136 brokers . It was fixed by #16059 2. The memory usage of Java programs fluctuates widely, so that the maximum resource usage calculated is based on memory usage most of the time, which filters out brokers with low CPU load. Below is the sample of two brokers jvm memory usage in the cluster. <img width="1249" alt="image" src=" https://user-images.githubusercontent.com/4970972/176342043-f88f875d-5479-4132-a3f1-f9c053f3b7cb.png "> If the broker is overload, it will get highest score, which prevents it from being a candidate. <img width="1059" alt="image" src=" https://user-images.githubusercontent.com/4970972/176342107-179489e9-40b3-47b9-8158-f0e30fc037e4.png "> 3. The bundle placement strategy is `LeastLongTermMessageRate`, which selects a broker based on which one has the least long term message rate instead of load metric. The `LeastLongTermMessageRate` does not get along with `ThresholdShedder` well. Therefore, a load-based bundle placement strategy is necessary to cooperate with `ThresholdShedder`. ### Current implementation details The `ThresholdShedder` strategy that unloads any broker that exceeds the average resource utilization of all brokers by a configured threshold. As a consequence, this strategy tends to distribute load among all brokers. It does this by first computing the average resource usage per broker for the whole cluster. The resource usage for each broker is calculated using the following method: `LocalBrokerData#getMaxResourceUsageWithWeight`). The weights for each resource are configurable. Historical observations are included in the running average based on the broker's setting for loadBalancerHistoryResourcePercentage. Once the average resource usage is calculated, a broker's current/historical usage is compared to the average broker usage. If a broker's usage is greater than the average usage per broker plus the loadBalancerBrokerThresholdShedderPercentage, this load shedder proposes removing enough bundles to bring the unloaded broker 5% below the current average broker usage. Note that recently unloaded bundles are not unloaded again. ## Goal Develop a new load-based bundle placement strategy for better load balancing with fewer times, and less time, which cab achieve better teamwork with `ThresholdShedder`. ## API Changes No user-facing API changes are required. ## Implementation This should be a detailed description of all the changes that are expected to be made. It should be detailed enough that any developer that is familiar with Pulsar internals would be able to understand all the parts of the code changes for this proposal. This should also serve as documentation for any person that is trying to understand or debug the behavior of a certain feature. The main idea of the new strategy is to unify the requirement of load shedding strategy and bundle placement strategy, which consider the resource usage with weight, including historical observations. How to calculate a score for a broker ? - use its historical load and short-term load data with weight. How to select a broker for assignning bundle ? - select a broker based on which one has the least resource usage with weight. ### New configuration options The existing cache implementation will not be removed at this point. Users will be able to configure the old implementation in `broker.conf`. This option will be helpful in case of performance regressions would be seen for some use cases with the new strategy implementation. ``` # load assignment strategy, support LeastLongTermMessageRate and LeastResourceUsageWithWeight, default is LeastLongTermMessageRate loadBalancerLoadAssignmentStrategy=org.apache.pulsar.broker.loadbalance.impl.LeastResourceUsageWithWeight ``` Below are screenshots of the effect of the new strategy with less time and fewer load balancing times. <img width="1593" alt="image" src=" https://user-images.githubusercontent.com/4970972/176346492-f2ccdfda-b011-406d-88fe-df73d8bb839b.png "> <img width="1586" alt="image" src=" https://user-images.githubusercontent.com/4970972/176346531-63a9b8b0-ef7b-4f74-a904-37d7c07c1793.png "> ## Reject Alternatives None yet. ## Reference [1] https://github.com/apache/pulsar/pull/16059 [2] https://github.com/apache/pulsar/issues/16274 [3] https://github.com/apache/pulsar/pull/16281 -- BR, Qiang Huang