Hi Pulsar Community,
This is a PIP discussion on how to support splitting bundles according to flow or qps. The issue can be found: https://github.com/apache/pulsar/issues/16782 I copy the content here for convenience, any suggestions are welcome and appreciated. ## Motivation As we all know, Bundle split has 3 algorithms: - range_equally_divide - topic_count_equally_divide - specified_positions_divide However, none of these algorithms can divide bundles according to flow or qps, which may cause bundles to be split multiple times. ## Goal Our goal is to split bundles according to flow or QPS, so we propose a PIP to introduce a split algorithm based on flow or QPS. The main idea is that we can get the flow or qps information of a topic contained in a bundle, and then split from the position where the flow or qps are evenly divided. For example, there is bundle with boundaries 0x00000000 to 0x00000200, and six topics : t1 , t2 , t3 , t4, t5, t6. **Step 1: Get their hash position and corresponding flow or QPS:** > t1 with hashcode 10 msgRate 100/s throughput 1M/s > > t2 with hashcode 20 msgRate 200/s throughput 2M/s > > t3 with hashcode 80 msgRate 300/s throughput 3M/s > > t4 with hashcode 90 msgRate 400/s throughput 4M/s > > t5 with hashcode 100 msgRate 500/s throughput 5M/s > > t6 with hashcode 110 msgRate 2000/s throughput 190M/s **Step 2: Calculate the total flow and qps of the bundle:** > bundleMsgRate=3500 > bundleThroughput=205MB **Step 3: Calculate the flow and qps to split:** > splitBundleMsgRate=1750 > splitBundleThroughput=102.5MB **Step 4: Calculate the position to split and split:** > splitStartPosition=100 > splitEndPosition=110 > splitPosition=(100+110)/2=105 ## API Changes 1. Add a split algorithm class based on flow or qps: `public class FlowOrQpsEquallyDivideBundleSplitAlgorithm implements NamespaceBundleSplitAlgorithm ` 2. update the default configuration: ``` private List<String> supportedNamespaceBundleSplitAlgorithms = Lists.newArrayList("range_equally_divide", "topic_count_equally_divide", "specified_positions_divide", "flow_count_equally_divide"); ``` ## Implementation The execution steps of the FlowOrQpsEquallyDivideBundleSplitAlgorithm#getSplitBoundary method are as follows: 1. Get the hash position of each topic and the corresponding msgRate and msgThroughput, and sort them according to the position size?? ``` List<Long> topicNameHashList = new ArrayList<>(topics.size()); Map<Long, Double> hashAndMsgMap = new HashMap<>(); Map<Long, Double> hashAndThroughput = new HashMap<>(); ``` 2. Traverse the topic position from small to large to find the position that can roughly evenly divide the bundle's flow or qps?? ``` double bundleMsgRateTmp = 0; double bundleThroughputTmp = 0; for (int i = 0; i < topicNameHashList.size(); i++) { long topicHashCode = topicNameHashList.get(i); bundleThroughputTmp += hashAndThroughput.get(topicHashCode); bundleMsgRateTmp += hashAndMsgMap.get(topicHashCode); if (bundleMsgRateTmp > bundleMsgRate / 2 || bundleThroughputTmp > bundleThroughput / 2) { long splitStart = i > 0 ? topicNameHashList.get(i - 1) : 0; long splitEnd = topicHashCode; long splitMiddle = splitStart + (splitEnd - splitStart) / 2; splitResults.add(splitMiddle); break; } } ```