Hi Pulsar Community,

This is a PIP discussion on how to support splitting bundles according to flow 
or qps.


The issue can be found: https://github.com/apache/pulsar/issues/16782


I copy the content here for convenience, any suggestions are welcome and 
appreciated.




## Motivation
As we all know, Bundle split has 3 algorithms:
- range_equally_divide
- topic_count_equally_divide
- specified_positions_divide


However, none of these algorithms can divide bundles according to flow or qps, 
which may cause bundles to be split multiple times.


## Goal
Our goal is to split bundles according to flow or QPS, so we propose a PIP to 
introduce a split algorithm based on flow or QPS.
The main idea is that we can get the flow or qps information of a topic 
contained in a bundle,
and then split from the position where the flow or qps are evenly divided.


For example, there is bundle with boundaries 0x00000000 to 0x00000200, and six 
topics : t1 , t2 , t3 , t4, t5, t6.


**Step 1: Get their hash position and corresponding flow or QPS:**


> t1 with hashcode 10 msgRate 100/s throughput 1M/s
> 
> t2 with hashcode 20 msgRate 200/s throughput 2M/s
> 
> t3 with hashcode 80 msgRate 300/s throughput 3M/s
> 
> t4 with hashcode 90 msgRate 400/s throughput 4M/s
> 
> t5 with hashcode 100 msgRate 500/s throughput 5M/s
> 
> t6 with hashcode 110 msgRate 2000/s throughput 190M/s




**Step 2: Calculate the total flow and qps of the bundle:**


> bundleMsgRate=3500
> bundleThroughput=205MB


**Step 3: Calculate the flow and qps to split:**


> splitBundleMsgRate=1750
> splitBundleThroughput=102.5MB


**Step 4: Calculate the position to split and split:**


> splitStartPosition=100
> splitEndPosition=110
> splitPosition=(100+110)/2=105


## API Changes


1. Add a split algorithm class based on flow or qps:


`public class FlowOrQpsEquallyDivideBundleSplitAlgorithm implements 
NamespaceBundleSplitAlgorithm `


2. update the default configuration:
```
private List<String&gt; supportedNamespaceBundleSplitAlgorithms = 
Lists.newArrayList("range_equally_divide", "topic_count_equally_divide",
&nbsp; &nbsp;"specified_positions_divide", "flow_count_equally_divide");
```


## Implementation
The execution steps of the 
FlowOrQpsEquallyDivideBundleSplitAlgorithm#getSplitBoundary method are as 
follows:
1. Get the hash position of each topic and the corresponding msgRate and 
msgThroughput, and sort them according to the position size??


```
List<Long&gt; topicNameHashList = new ArrayList<&gt;(topics.size());
Map<Long, Double&gt; hashAndMsgMap = new HashMap<&gt;();
Map<Long, Double&gt; hashAndThroughput = new HashMap<&gt;();
```


2. Traverse the topic position from small to large to find the position that 
can roughly evenly divide the bundle's flow or qps??


```
double bundleMsgRateTmp = 0;
double bundleThroughputTmp = 0;
for (int i = 0; i < topicNameHashList.size(); i++) {
&nbsp; &nbsp; long topicHashCode = topicNameHashList.get(i);
&nbsp; &nbsp; bundleThroughputTmp += hashAndThroughput.get(topicHashCode);
&nbsp; &nbsp; bundleMsgRateTmp += hashAndMsgMap.get(topicHashCode);


&nbsp; &nbsp; if (bundleMsgRateTmp &gt; bundleMsgRate / 2 || 
bundleThroughputTmp &gt; bundleThroughput / 2) {
&nbsp; &nbsp; &nbsp; &nbsp; long splitStart = i &gt; 0 ? 
topicNameHashList.get(i - 1) : 0;
&nbsp; &nbsp; &nbsp; &nbsp; long splitEnd = topicHashCode;
&nbsp; &nbsp; &nbsp; &nbsp; long splitMiddle = splitStart + (splitEnd - 
splitStart) / 2;
&nbsp; &nbsp; &nbsp; &nbsp; splitResults.add(splitMiddle);
&nbsp; &nbsp; &nbsp; &nbsp; break;
&nbsp; &nbsp; }
}
```

Reply via email to