Hi 叶韵, The proposal looks good to me, and I left comments in the PR, https://github.com/apache/pulsar/pull/16281/.
Regards, Heesung On Wed, Jun 29, 2022 at 5:39 AM 叶韵 <qiang.huang1...@gmail.com> wrote: > Hi Pulsar community: > I open a pip to discuss "Shadow Topic, an alternative way to support > readonly topic ownership." > Proposal Link: > > - issue link: https://github.com/apache/pulsar/issues/16274 > - pr link: https://github.com/apache/pulsar/pull/16281 > > --- > ## Motivation > The modular load manager, implemented in `ModularLoadManagerImpl`, is a > flexible alternative to the previously implemented load manager, which > attempts to simplify how load is managed while also providing abstractions > so that complex load management strategies may be implemented. > > The load management component determines the criteria for unloading bundles > and contains the following load shedding strategy: `OverloadShedder` and > `ThresholdShedder` and `UniformLoadShedder`. (default is > `ThresholdShedder`since 2.10.0) > - `OverloadShedder`: This strategy attempts to shed exactly one bundle on > brokers which are overloaded > - `ThresholdShedder`: This strategy unloads any broker that exceeds the > average resource utilization of all brokers by a configured threshold. > - `UniformLoadShedder`:This strategy tends to distribute load uniformly > across all brokers. > > However, the bundle placement strategy contains only one: > `LeastLongTermMessageRate`, which selects a broker based on which one has > the least long term message rate. > > The load management in our pulsar cluster use `ThresholdShedder` as load > shedding strategy, and use `LeastLongTermMessageRate` as bundle placement > strategy, which does not work well. > Some broker nodes have a high load when the traffic of some topics are > relatively large. The load shedding strategy will unload some bundles in > any broker that exceeds the average resource utilization of all brokers by > a configured threshold. And the bundles will be transferred to the next > broker node. However it causes the load of the next broker node exceed the > average resource utilization. Therefore, the load balancing will occur > again on the current broker node due to high load. Worse yet, this scenario > keeps popping up. > > The load shedding strategy configuration is as follows > ``` > # load shedding strategy, support OverloadShedder and ThresholdShedder, > default is OverloadShedder > > loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder > > # The broker resource usage threshold. > # When the broker resource usage is greater than the pulsar cluster average > resource usage, > # the threshold shedder will be triggered to offload bundles from the > broker. > # It only takes effect in the ThresholdShedder strategy. > loadBalancerBrokerThresholdShedderPercentage=10 > > # When calculating new resource usage, the history usage accounts for. > # It only takes effect in the ThresholdShedder strategy. > loadBalancerHistoryResourcePercentage=0.9 > > # The BandWithIn usage weight when calculating new resource usage. > # It only takes effect in the ThresholdShedder strategy. > loadBalancerBandwithInResourceWeight=1.0 > > # The BandWithOut usage weight when calculating new resource usage. > # It only takes effect in the ThresholdShedder strategy. > loadBalancerBandwithOutResourceWeight=1.0 > > # The CPU usage weight when calculating new resource usage. > # It only takes effect in the ThresholdShedder strategy. > loadBalancerCPUResourceWeight=1.0 > > # The heap memory usage weight when calculating new resource usage. > # It only takes effect in the ThresholdShedder strategy. > loadBalancerMemoryResourceWeight=0.1 > > # The direct memory usage weight when calculating new resource usage. > # It only takes effect in the ThresholdShedder strategy. > loadBalancerDirectMemoryResourceWeight=0.1 > > # Bundle unload minimum throughput threshold (MB), avoiding bundle unload > frequently. > # It only takes effect in the ThresholdShedder strategy. > loadBalancerBundleUnloadMinThroughputThreshold=0.1 > ``` > The following screenshots are the status of the cluster: > Problem 1. Load balancing took a long time 10 hours and over 400 times, and > it has been unloading if there is a large traffic. > <img width="1247" alt="image" src=" > > https://user-images.githubusercontent.com/4970972/176341641-b85f8258-e973-4b14-8875-16be573dcbda.png > "> > <img width="1245" alt="image" src=" > > https://user-images.githubusercontent.com/4970972/176341676-ed81b465-10fc-4051-8353-42e6306d4210.png > "> > > Problem 2. The effect of cpu balancing is poor. > <img width="1247" alt="image" src=" > > https://user-images.githubusercontent.com/4970972/176341746-d3b28234-11ef-48c4-9f91-2fdf7bcde74b.png > "> > <img width="1246" alt="image" src=" > > https://user-images.githubusercontent.com/4970972/176341792-b77a0691-b402-4fa0-a7aa-ac15c890613a.png > "> > > The load shedding strategy `ThresholdShedder` work well, but not the bundle > placement strategyLeastLongTermMessageRate . > There are 3 possible reasons for the problems. > 1. Although the cluster has many brokers with low load, there are fewer > brokers to be considered for assignment. > <img width="1168" alt="image" src=" > > https://user-images.githubusercontent.com/4970972/176341873-6da69749-3c1d-49cf-9e83-b942a8327db0.png > "> > > Some brokers with lower load but more bundles can not be candidate due to > distributing bundles evenly in LoadManager by force. Most of brokers are > filtered out by the strategy, only 1 or 2 brokers can be candidate in the > total 136 brokers . It was fixed by #16059 > > 2. The memory usage of Java programs fluctuates widely, so that the maximum > resource usage calculated is based on memory usage most of the time, which > filters out brokers with low CPU load. Below is the sample of two brokers > jvm memory usage in the cluster. > <img width="1249" alt="image" src=" > > https://user-images.githubusercontent.com/4970972/176342043-f88f875d-5479-4132-a3f1-f9c053f3b7cb.png > "> > If the broker is overload, it will get highest score, which prevents it > from being a candidate. > <img width="1059" alt="image" src=" > > https://user-images.githubusercontent.com/4970972/176342107-179489e9-40b3-47b9-8158-f0e30fc037e4.png > "> > > 3. The bundle placement strategy is `LeastLongTermMessageRate`, which > selects a broker based on which one has the least long term message rate > instead of load metric. The `LeastLongTermMessageRate` does not get along > with `ThresholdShedder` well. Therefore, a load-based bundle placement > strategy is necessary to cooperate with `ThresholdShedder`. > > ### Current implementation details > The `ThresholdShedder` strategy that unloads any broker that exceeds the > average resource utilization of all brokers by a configured threshold. As a > consequence, this strategy tends to distribute load among all brokers. It > does this by first computing the average resource usage per broker for the > whole cluster. The resource usage for each broker is calculated using the > following method: `LocalBrokerData#getMaxResourceUsageWithWeight`). The > weights for each resource are configurable. Historical observations are > included in the running average based on the broker's setting for > loadBalancerHistoryResourcePercentage. Once the average resource usage is > calculated, a broker's current/historical usage is compared to the average > broker usage. If a broker's usage is greater than the average usage per > broker plus the loadBalancerBrokerThresholdShedderPercentage, this load > shedder proposes removing enough bundles to bring the unloaded broker 5% > below the current average broker usage. Note that recently unloaded bundles > are not unloaded again. > > ## Goal > Develop a new load-based bundle placement strategy for better load > balancing with fewer times, and less time, which cab achieve better > teamwork with `ThresholdShedder`. > > ## API Changes > No user-facing API changes are required. > > ## Implementation > This should be a detailed description of all the changes that are > expected to be made. It should be detailed enough that any developer that > is > familiar with Pulsar internals would be able to understand all the parts of > the > code changes for this proposal. > > This should also serve as documentation for any person that is trying to > understand or debug the behavior of a certain feature. > > The main idea of the new strategy is to unify the requirement of load > shedding strategy and bundle placement strategy, which consider the > resource usage with weight, including historical observations. > > How to calculate a score for a broker ? > - use its historical load and short-term load data with weight. > > How to select a broker for assignning bundle ? > - select a broker based on which one has the least resource usage with > weight. > > ### New configuration options > The existing cache implementation will not be removed at this point. Users > will > be able to configure the old implementation in `broker.conf`. > This option will be helpful in case of performance regressions would be > seen for > some use cases with the new strategy implementation. > ``` > # load assignment strategy, support LeastLongTermMessageRate and > LeastResourceUsageWithWeight, default is LeastLongTermMessageRate > > loadBalancerLoadAssignmentStrategy=org.apache.pulsar.broker.loadbalance.impl.LeastResourceUsageWithWeight > ``` > > Below are screenshots of the effect of the new strategy with less time and > fewer load balancing times. > <img width="1593" alt="image" src=" > > https://user-images.githubusercontent.com/4970972/176346492-f2ccdfda-b011-406d-88fe-df73d8bb839b.png > "> > <img width="1586" alt="image" src=" > > https://user-images.githubusercontent.com/4970972/176346531-63a9b8b0-ef7b-4f74-a904-37d7c07c1793.png > "> > > ## Reject Alternatives > None yet. > > ## Reference > [1] https://github.com/apache/pulsar/pull/16059 > [2] https://github.com/apache/pulsar/issues/16274 > [3] https://github.com/apache/pulsar/pull/16281 > > -- > BR, > Qiang Huang >