Hi 叶韵,
The proposal looks good to me, and I left comments in the PR,
https://github.com/apache/pulsar/pull/16281/.

Regards,
Heesung

On Wed, Jun 29, 2022 at 5:39 AM 叶韵 <qiang.huang1...@gmail.com> wrote:

> Hi Pulsar community:
> I open a pip to discuss "Shadow Topic, an alternative way to support
> readonly topic ownership."
> Proposal Link:
>
>    - issue link: https://github.com/apache/pulsar/issues/16274
>    - pr link: https://github.com/apache/pulsar/pull/16281
>
> ---
> ## Motivation
> The modular load manager, implemented in `ModularLoadManagerImpl`, is a
> flexible alternative to the previously implemented load manager, which
> attempts to simplify how load is managed while also providing abstractions
> so that complex load management strategies may be implemented.
>
> The load management component determines the criteria for unloading bundles
> and contains the following load shedding strategy: `OverloadShedder` and
> `ThresholdShedder` and `UniformLoadShedder`. (default is
> `ThresholdShedder`since 2.10.0)
> - `OverloadShedder`: This strategy attempts to shed exactly one bundle on
> brokers which are overloaded
> - `ThresholdShedder`: This strategy unloads any broker that exceeds the
> average resource utilization of all brokers by a configured threshold.
> - `UniformLoadShedder`:This strategy tends to distribute load uniformly
> across all brokers.
>
> However, the bundle placement strategy contains only one:
> `LeastLongTermMessageRate`, which selects a broker based on which one has
> the least long term message rate.
>
> The load management in our pulsar cluster use `ThresholdShedder` as load
> shedding strategy, and use `LeastLongTermMessageRate` as bundle placement
> strategy, which does not work well.
> Some broker nodes have a high load when the traffic of some topics are
> relatively large. The load shedding strategy will unload some bundles in
> any broker that exceeds the average resource utilization of all brokers by
> a configured threshold. And the bundles will be transferred to the next
> broker node. However it causes the load of the next broker node exceed the
> average resource utilization. Therefore, the load balancing will occur
> again on the current broker node due to high load. Worse yet, this scenario
> keeps popping up.
>
> The load shedding strategy configuration is as follows
> ```
> # load shedding strategy, support OverloadShedder and ThresholdShedder,
> default is OverloadShedder
>
> loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder
>
> # The broker resource usage threshold.
> # When the broker resource usage is greater than the pulsar cluster average
> resource usage,
> # the threshold shedder will be triggered to offload bundles from the
> broker.
> # It only takes effect in the ThresholdShedder strategy.
> loadBalancerBrokerThresholdShedderPercentage=10
>
> # When calculating new resource usage, the history usage accounts for.
> # It only takes effect in the ThresholdShedder strategy.
> loadBalancerHistoryResourcePercentage=0.9
>
> # The BandWithIn usage weight when calculating new resource usage.
> # It only takes effect in the ThresholdShedder strategy.
> loadBalancerBandwithInResourceWeight=1.0
>
> # The BandWithOut usage weight when calculating new resource usage.
> # It only takes effect in the ThresholdShedder strategy.
> loadBalancerBandwithOutResourceWeight=1.0
>
> # The CPU usage weight when calculating new resource usage.
> # It only takes effect in the ThresholdShedder strategy.
> loadBalancerCPUResourceWeight=1.0
>
> # The heap memory usage weight when calculating new resource usage.
> # It only takes effect in the ThresholdShedder strategy.
> loadBalancerMemoryResourceWeight=0.1
>
> # The direct memory usage weight when calculating new resource usage.
> # It only takes effect in the ThresholdShedder strategy.
> loadBalancerDirectMemoryResourceWeight=0.1
>
> # Bundle unload minimum throughput threshold (MB), avoiding bundle unload
> frequently.
> # It only takes effect in the ThresholdShedder strategy.
> loadBalancerBundleUnloadMinThroughputThreshold=0.1
> ```
> The following screenshots are the status of the cluster:
> Problem 1. Load balancing took a long time 10 hours and over 400 times, and
> it has been unloading if there is a large traffic.
> <img width="1247" alt="image" src="
>
> https://user-images.githubusercontent.com/4970972/176341641-b85f8258-e973-4b14-8875-16be573dcbda.png
> ">
> <img width="1245" alt="image" src="
>
> https://user-images.githubusercontent.com/4970972/176341676-ed81b465-10fc-4051-8353-42e6306d4210.png
> ">
>
> Problem 2. The effect of cpu balancing is poor.
> <img width="1247" alt="image" src="
>
> https://user-images.githubusercontent.com/4970972/176341746-d3b28234-11ef-48c4-9f91-2fdf7bcde74b.png
> ">
> <img width="1246" alt="image" src="
>
> https://user-images.githubusercontent.com/4970972/176341792-b77a0691-b402-4fa0-a7aa-ac15c890613a.png
> ">
>
> The load shedding strategy `ThresholdShedder` work well, but not the bundle
> placement strategyLeastLongTermMessageRate .
> There are 3 possible reasons for the problems.
> 1. Although the cluster has many brokers with low load, there are fewer
> brokers to be considered for assignment.
> <img width="1168" alt="image" src="
>
> https://user-images.githubusercontent.com/4970972/176341873-6da69749-3c1d-49cf-9e83-b942a8327db0.png
> ">
>
> Some brokers with lower load but more bundles can not be candidate due to
> distributing bundles evenly in LoadManager by force. Most of brokers are
> filtered out by the strategy, only 1 or 2 brokers can be candidate in the
> total 136 brokers . It was fixed by #16059
>
> 2. The memory usage of Java programs fluctuates widely, so that the maximum
> resource usage calculated is based on memory usage most of the time, which
> filters out brokers with low CPU load. Below is the sample of two brokers
> jvm memory usage in the cluster.
> <img width="1249" alt="image" src="
>
> https://user-images.githubusercontent.com/4970972/176342043-f88f875d-5479-4132-a3f1-f9c053f3b7cb.png
> ">
> If the broker is overload, it will get highest score, which prevents it
> from being a candidate.
> <img width="1059" alt="image" src="
>
> https://user-images.githubusercontent.com/4970972/176342107-179489e9-40b3-47b9-8158-f0e30fc037e4.png
> ">
>
> 3. The bundle placement strategy is `LeastLongTermMessageRate`, which
> selects a broker based on which one has the least long term message rate
> instead of load metric. The `LeastLongTermMessageRate` does not get along
> with `ThresholdShedder` well. Therefore, a load-based bundle placement
> strategy is necessary to cooperate with `ThresholdShedder`.
>
> ### Current implementation details
> The `ThresholdShedder` strategy that unloads any broker that exceeds the
> average resource utilization of all brokers by a configured threshold. As a
> consequence, this strategy tends to distribute load among all brokers. It
> does this by first computing the average resource usage per broker for the
> whole cluster. The resource usage for each broker is calculated using the
> following method: `LocalBrokerData#getMaxResourceUsageWithWeight`). The
> weights for each resource are configurable. Historical observations are
> included in the running average based on the broker's setting for
> loadBalancerHistoryResourcePercentage. Once the average resource usage is
> calculated, a broker's current/historical usage is compared to the average
> broker usage. If a broker's usage is greater than the average usage per
> broker plus the loadBalancerBrokerThresholdShedderPercentage, this load
> shedder proposes removing enough bundles to bring the unloaded broker 5%
> below the current average broker usage. Note that recently unloaded bundles
> are not unloaded again.
>
> ## Goal
> Develop a new load-based bundle placement strategy for better load
> balancing with fewer times, and less time, which cab achieve better
> teamwork with `ThresholdShedder`.
>
> ## API Changes
> No user-facing API changes are required.
>
> ## Implementation
> This should be a detailed description of all the changes that are
> expected to be made. It should be detailed enough that any developer that
> is
> familiar with Pulsar internals would be able to understand all the parts of
> the
> code changes for this proposal.
>
> This should also serve as documentation for any person that is trying to
> understand or debug the behavior of a certain feature.
>
> The main idea of the new strategy is to unify the requirement of load
> shedding strategy and bundle placement strategy, which consider the
> resource usage with weight, including historical observations.
>
> How to calculate a score for a broker ?
> - use its historical load and short-term load data with weight.
>
> How to select a broker for assignning bundle ?
> - select a broker based on which one has the least resource usage with
> weight.
>
> ### New configuration options
> The existing cache implementation will not be removed at this point. Users
> will
> be able to configure the old implementation in `broker.conf`.
> This option will be helpful in case of performance regressions would be
> seen for
> some use cases with the new strategy implementation.
> ```
> # load assignment strategy, support LeastLongTermMessageRate and
> LeastResourceUsageWithWeight, default is LeastLongTermMessageRate
>
> loadBalancerLoadAssignmentStrategy=org.apache.pulsar.broker.loadbalance.impl.LeastResourceUsageWithWeight
> ```
>
> Below are screenshots of the effect of the new strategy with less time and
> fewer load balancing times.
> <img width="1593" alt="image" src="
>
> https://user-images.githubusercontent.com/4970972/176346492-f2ccdfda-b011-406d-88fe-df73d8bb839b.png
> ">
> <img width="1586" alt="image" src="
>
> https://user-images.githubusercontent.com/4970972/176346531-63a9b8b0-ef7b-4f74-a904-37d7c07c1793.png
> ">
>
> ## Reject Alternatives
> None yet.
>
> ## Reference
> [1] https://github.com/apache/pulsar/pull/16059
> [2] https://github.com/apache/pulsar/issues/16274
> [3] https://github.com/apache/pulsar/pull/16281
>
> --
> BR,
> Qiang Huang
>

Reply via email to