Hi Pulsar community:
I open a pip to discuss "Shadow Topic, an alternative way to support
readonly topic ownership."
Proposal Link:

   - issue link: https://github.com/apache/pulsar/issues/16274
   - pr link: https://github.com/apache/pulsar/pull/16281

---
## Motivation
The modular load manager, implemented in `ModularLoadManagerImpl`, is a
flexible alternative to the previously implemented load manager, which
attempts to simplify how load is managed while also providing abstractions
so that complex load management strategies may be implemented.

The load management component determines the criteria for unloading bundles
and contains the following load shedding strategy: `OverloadShedder` and
`ThresholdShedder` and `UniformLoadShedder`. (default is
`ThresholdShedder`since 2.10.0)
- `OverloadShedder`: This strategy attempts to shed exactly one bundle on
brokers which are overloaded
- `ThresholdShedder`: This strategy unloads any broker that exceeds the
average resource utilization of all brokers by a configured threshold.
- `UniformLoadShedder`:This strategy tends to distribute load uniformly
across all brokers.

However, the bundle placement strategy contains only one:
`LeastLongTermMessageRate`, which selects a broker based on which one has
the least long term message rate.

The load management in our pulsar cluster use `ThresholdShedder` as load
shedding strategy, and use `LeastLongTermMessageRate` as bundle placement
strategy, which does not work well.
Some broker nodes have a high load when the traffic of some topics are
relatively large. The load shedding strategy will unload some bundles in
any broker that exceeds the average resource utilization of all brokers by
a configured threshold. And the bundles will be transferred to the next
broker node. However it causes the load of the next broker node exceed the
average resource utilization. Therefore, the load balancing will occur
again on the current broker node due to high load. Worse yet, this scenario
keeps popping up.

The load shedding strategy configuration is as follows
```
# load shedding strategy, support OverloadShedder and ThresholdShedder,
default is OverloadShedder
loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder

# The broker resource usage threshold.
# When the broker resource usage is greater than the pulsar cluster average
resource usage,
# the threshold shedder will be triggered to offload bundles from the
broker.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerBrokerThresholdShedderPercentage=10

# When calculating new resource usage, the history usage accounts for.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerHistoryResourcePercentage=0.9

# The BandWithIn usage weight when calculating new resource usage.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerBandwithInResourceWeight=1.0

# The BandWithOut usage weight when calculating new resource usage.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerBandwithOutResourceWeight=1.0

# The CPU usage weight when calculating new resource usage.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerCPUResourceWeight=1.0

# The heap memory usage weight when calculating new resource usage.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerMemoryResourceWeight=0.1

# The direct memory usage weight when calculating new resource usage.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerDirectMemoryResourceWeight=0.1

# Bundle unload minimum throughput threshold (MB), avoiding bundle unload
frequently.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerBundleUnloadMinThroughputThreshold=0.1
```
The following screenshots are the status of the cluster:
Problem 1. Load balancing took a long time 10 hours and over 400 times, and
it has been unloading if there is a large traffic.
<img width="1247" alt="image" src="
https://user-images.githubusercontent.com/4970972/176341641-b85f8258-e973-4b14-8875-16be573dcbda.png
">
<img width="1245" alt="image" src="
https://user-images.githubusercontent.com/4970972/176341676-ed81b465-10fc-4051-8353-42e6306d4210.png
">

Problem 2. The effect of cpu balancing is poor.
<img width="1247" alt="image" src="
https://user-images.githubusercontent.com/4970972/176341746-d3b28234-11ef-48c4-9f91-2fdf7bcde74b.png
">
<img width="1246" alt="image" src="
https://user-images.githubusercontent.com/4970972/176341792-b77a0691-b402-4fa0-a7aa-ac15c890613a.png
">

The load shedding strategy `ThresholdShedder` work well, but not the bundle
placement strategyLeastLongTermMessageRate .
There are 3 possible reasons for the problems.
1. Although the cluster has many brokers with low load, there are fewer
brokers to be considered for assignment.
<img width="1168" alt="image" src="
https://user-images.githubusercontent.com/4970972/176341873-6da69749-3c1d-49cf-9e83-b942a8327db0.png
">

Some brokers with lower load but more bundles can not be candidate due to
distributing bundles evenly in LoadManager by force. Most of brokers are
filtered out by the strategy, only 1 or 2 brokers can be candidate in the
total 136 brokers . It was fixed by #16059

2. The memory usage of Java programs fluctuates widely, so that the maximum
resource usage calculated is based on memory usage most of the time, which
filters out brokers with low CPU load. Below is the sample of two brokers
jvm memory usage in the cluster.
<img width="1249" alt="image" src="
https://user-images.githubusercontent.com/4970972/176342043-f88f875d-5479-4132-a3f1-f9c053f3b7cb.png
">
If the broker is overload, it will get highest score, which prevents it
from being a candidate.
<img width="1059" alt="image" src="
https://user-images.githubusercontent.com/4970972/176342107-179489e9-40b3-47b9-8158-f0e30fc037e4.png
">

3. The bundle placement strategy is `LeastLongTermMessageRate`, which
selects a broker based on which one has the least long term message rate
instead of load metric. The `LeastLongTermMessageRate` does not get along
with `ThresholdShedder` well. Therefore, a load-based bundle placement
strategy is necessary to cooperate with `ThresholdShedder`.

### Current implementation details
The `ThresholdShedder` strategy that unloads any broker that exceeds the
average resource utilization of all brokers by a configured threshold. As a
consequence, this strategy tends to distribute load among all brokers. It
does this by first computing the average resource usage per broker for the
whole cluster. The resource usage for each broker is calculated using the
following method: `LocalBrokerData#getMaxResourceUsageWithWeight`). The
weights for each resource are configurable. Historical observations are
included in the running average based on the broker's setting for
loadBalancerHistoryResourcePercentage. Once the average resource usage is
calculated, a broker's current/historical usage is compared to the average
broker usage. If a broker's usage is greater than the average usage per
broker plus the loadBalancerBrokerThresholdShedderPercentage, this load
shedder proposes removing enough bundles to bring the unloaded broker 5%
below the current average broker usage. Note that recently unloaded bundles
are not unloaded again.

## Goal
Develop a new load-based bundle placement strategy for better load
balancing with fewer times, and less time, which cab achieve better
teamwork with `ThresholdShedder`.

## API Changes
No user-facing API changes are required.

## Implementation
This should be a detailed description of all the changes that are
expected to be made. It should be detailed enough that any developer that is
familiar with Pulsar internals would be able to understand all the parts of
the
code changes for this proposal.

This should also serve as documentation for any person that is trying to
understand or debug the behavior of a certain feature.

The main idea of the new strategy is to unify the requirement of load
shedding strategy and bundle placement strategy, which consider the
resource usage with weight, including historical observations.

How to calculate a score for a broker ?
- use its historical load and short-term load data with weight.

How to select a broker for assignning bundle ?
- select a broker based on which one has the least resource usage with
weight.

### New configuration options
The existing cache implementation will not be removed at this point. Users
will
be able to configure the old implementation in `broker.conf`.
This option will be helpful in case of performance regressions would be
seen for
some use cases with the new strategy implementation.
```
# load assignment strategy, support LeastLongTermMessageRate and
LeastResourceUsageWithWeight, default is LeastLongTermMessageRate
loadBalancerLoadAssignmentStrategy=org.apache.pulsar.broker.loadbalance.impl.LeastResourceUsageWithWeight
```

Below are screenshots of the effect of the new strategy with less time and
fewer load balancing times.
<img width="1593" alt="image" src="
https://user-images.githubusercontent.com/4970972/176346492-f2ccdfda-b011-406d-88fe-df73d8bb839b.png
">
<img width="1586" alt="image" src="
https://user-images.githubusercontent.com/4970972/176346531-63a9b8b0-ef7b-4f74-a904-37d7c07c1793.png
">

## Reject Alternatives
None yet.

## Reference
[1] https://github.com/apache/pulsar/pull/16059
[2] https://github.com/apache/pulsar/issues/16274
[3] https://github.com/apache/pulsar/pull/16281

-- 
BR,
Qiang Huang

Reply via email to