The implementation logic has been modified, and the corresponding example has 
also been modified as follows:


## Motivation
As we all know, Bundle split has 3 algorithms:
- range_equally_divide
- topic_count_equally_divide
- specified_positions_divide


However, none of these algorithms can divide bundles according to flow or qps, 
which may cause bundles to be split multiple times.


## Goal
Our goal is to split bundles according to flow or QPS, so we propose a PIP to 
introduce a split algorithm based on flow or QPS.
The main idea is that we can get the flow or qps information of a topic 
contained in a bundle,
then split according to loadBalancerNamespaceBundleMaxMsgRate or 
loadBalancerNamespaceBundleMaxBandwidthMbytes configuration


For example, there is bundle with boundaries 0x00000000 to 0x00000200, and six 
topics : t1 , t2 , t3 , t4, t5, t6.
loadBalancerNamespaceBundleMaxMsgRate=1100
loadBalancerNamespaceBundleMaxBandwidthMbytes=110


**Step 1: Get their hash position and corresponding flow and QPS:**


> t1 with hashcode 10 msgRate 100/s throughput 10M/s
>
> t2 with hashcode 20 msgRate 200/s throughput 20M/s
>
> t3 with hashcode 80 msgRate 300/s throughput 30M/s
>
> t4 with hashcode 90 msgRate 400/s throughput 40M/s
>
> t5 with hashcode 100 msgRate 500/s throughput 50M/s
>
> t6 with hashcode 110 msgRate 600/s throughput 60M/s




**Step 2: Calculate the total flow and qps of the bundle:**


> bundleMsgRate = 100 + 200 + 300 + 400 + 500 + 600 = 2100
> bundleThroughput = 10 + 20 + 30 + 40 + 50 + 60 = 210MB


**Step 3: Calculate the position to split and split:**


&gt; QPS: (100 + 200 + 300 + 400 ) < loadBalancerNamespaceBundleMaxMsgRate=1100 
&amp; (100+200+300+400+500) &gt; loadBalancerNamespaceBundleMaxMsgRate=1100
&gt; flow: (10 + 20 + 30 + 40 ) < 
loadBalancerNamespaceBundleMaxBandwidthMbytes=110 &amp; (10 + 20 + 30 + 40 + 50 
) &gt; loadBalancerNamespaceBundleMaxBandwidthMbytes=110
&gt;
&gt; Split between t4 and t5:
&gt; splitStartPosition = 90
&gt; splitEndPosition = 100
&gt; splitPosition = (90 + 100) / 2 = 95




## API Changes


1. Add a split algorithm class based on flow or qps:


`public class FlowOrQpsEquallyDivideBundleSplitAlgorithm implements 
NamespaceBundleSplitAlgorithm `


2. update the default configuration:
```
private List<String&gt; supportedNamespaceBundleSplitAlgorithms = 
Lists.newArrayList("range_equally_divide", "topic_count_equally_divide",
&nbsp; &nbsp;"specified_positions_divide", "flow_count_equally_divide");
```
3. added configuration
```
@FieldContext(
&nbsp; &nbsp; &nbsp; &nbsp; dynamic = true,
&nbsp; &nbsp; &nbsp; &nbsp; category = CATEGORY_LOAD_BALANCER,
&nbsp; &nbsp; &nbsp; &nbsp; doc = "Acceptable difference between qps and 
loadBalancerNamespaceBundleMaxMsgRate "
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; + " or flow and 
loadBalancerNamespaceBundleMaxBandwidthMbytes "
)
private int flowOrQpsDifferenceThresholdPercentage = 10;
```




## Implementation
The execution steps of the 
FlowOrQpsEquallyDivideBundleSplitAlgorithm#getSplitBoundary method are as 
follows:
1. Get the hash position of each topic and the corresponding msgRate and 
msgThroughput, and sort them according to the position size??


```
List<Long&gt; topicNameHashList = new ArrayList<&gt;(topics.size());
Map<Long, Double&gt; hashAndMsgMap = new HashMap<&gt;();
Map<Long, Double&gt; hashAndThroughput = new HashMap<&gt;();
```


2. According to the topic hash position, traverse all topics from small to 
large,
and split the bundle according to the configured 
loadBalancerNamespaceBundleMaxMsgRate or 
loadBalancerNamespaceBundleMaxBandwidthMbytes:


```
&nbsp; double bundleMsgRateTmp = 0;
&nbsp; double bundleThroughputTmp = 0;
&nbsp; for (int i = 0; i < topicNameHashList.size(); i++) {
&nbsp; &nbsp; &nbsp; long topicHashCode = topicNameHashList.get(i);
&nbsp; &nbsp; &nbsp; bundleThroughputTmp += 
hashAndThroughput.get(topicHashCode);
&nbsp; &nbsp; &nbsp; bundleMsgRateTmp += hashAndMsgMap.get(topicHashCode);


&nbsp; &nbsp; &nbsp; if (bundleMsgRateTmp &gt; 
loadBalancerNamespaceBundleMaxMsgRate
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; || bundleThroughputTmp &gt; 
loadBalancerNamespaceBundleMaxBandwidthBytes) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; long splitStart = i &gt; 0 ? 
topicNameHashList.get(i - 1) : topicHashCode;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; long splitEnd = i &gt; 0 ? 
topicHashCode : topicNameHashList.get(i + 1);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; long splitMiddle = splitStart + 
(splitEnd - splitStart) / 2;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; splitResults.add(splitMiddle);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; bundleMsgRateTmp =&nbsp; 
hashAndMsgMap.get(topicHashCode);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; bundleThroughputTmp = 
hashAndThroughput.get(topicHashCode);
&nbsp; &nbsp; &nbsp; }
&nbsp; }


```

Reply via email to