The implementation logic has been modified, and the corresponding example has also been modified as follows:
## Motivation As we all know, Bundle split has 3 algorithms: - range_equally_divide - topic_count_equally_divide - specified_positions_divide However, none of these algorithms can divide bundles according to flow or qps, which may cause bundles to be split multiple times. ## Goal Our goal is to split bundles according to flow or QPS, so we propose a PIP to introduce a split algorithm based on flow or QPS. The main idea is that we can get the flow or qps information of a topic contained in a bundle, then split according to loadBalancerNamespaceBundleMaxMsgRate or loadBalancerNamespaceBundleMaxBandwidthMbytes configuration For example, there is bundle with boundaries 0x00000000 to 0x00000200, and six topics : t1 , t2 , t3 , t4, t5, t6. loadBalancerNamespaceBundleMaxMsgRate=1100 loadBalancerNamespaceBundleMaxBandwidthMbytes=110 **Step 1: Get their hash position and corresponding flow and QPS:** > t1 with hashcode 10 msgRate 100/s throughput 10M/s > > t2 with hashcode 20 msgRate 200/s throughput 20M/s > > t3 with hashcode 80 msgRate 300/s throughput 30M/s > > t4 with hashcode 90 msgRate 400/s throughput 40M/s > > t5 with hashcode 100 msgRate 500/s throughput 50M/s > > t6 with hashcode 110 msgRate 600/s throughput 60M/s **Step 2: Calculate the total flow and qps of the bundle:** > bundleMsgRate = 100 + 200 + 300 + 400 + 500 + 600 = 2100 > bundleThroughput = 10 + 20 + 30 + 40 + 50 + 60 = 210MB **Step 3: Calculate the position to split and split:** > QPS: (100 + 200 + 300 + 400 ) < loadBalancerNamespaceBundleMaxMsgRate=1100 & (100+200+300+400+500) > loadBalancerNamespaceBundleMaxMsgRate=1100 > flow: (10 + 20 + 30 + 40 ) < loadBalancerNamespaceBundleMaxBandwidthMbytes=110 & (10 + 20 + 30 + 40 + 50 ) > loadBalancerNamespaceBundleMaxBandwidthMbytes=110 > > Split between t4 and t5: > splitStartPosition = 90 > splitEndPosition = 100 > splitPosition = (90 + 100) / 2 = 95 ## API Changes 1. Add a split algorithm class based on flow or qps: `public class FlowOrQpsEquallyDivideBundleSplitAlgorithm implements NamespaceBundleSplitAlgorithm ` 2. update the default configuration: ``` private List<String> supportedNamespaceBundleSplitAlgorithms = Lists.newArrayList("range_equally_divide", "topic_count_equally_divide", "specified_positions_divide", "flow_count_equally_divide"); ``` 3. added configuration ``` @FieldContext( dynamic = true, category = CATEGORY_LOAD_BALANCER, doc = "Acceptable difference between qps and loadBalancerNamespaceBundleMaxMsgRate " + " or flow and loadBalancerNamespaceBundleMaxBandwidthMbytes " ) private int flowOrQpsDifferenceThresholdPercentage = 10; ``` ## Implementation The execution steps of the FlowOrQpsEquallyDivideBundleSplitAlgorithm#getSplitBoundary method are as follows: 1. Get the hash position of each topic and the corresponding msgRate and msgThroughput, and sort them according to the position size?? ``` List<Long> topicNameHashList = new ArrayList<>(topics.size()); Map<Long, Double> hashAndMsgMap = new HashMap<>(); Map<Long, Double> hashAndThroughput = new HashMap<>(); ``` 2. According to the topic hash position, traverse all topics from small to large, and split the bundle according to the configured loadBalancerNamespaceBundleMaxMsgRate or loadBalancerNamespaceBundleMaxBandwidthMbytes: ``` double bundleMsgRateTmp = 0; double bundleThroughputTmp = 0; for (int i = 0; i < topicNameHashList.size(); i++) { long topicHashCode = topicNameHashList.get(i); bundleThroughputTmp += hashAndThroughput.get(topicHashCode); bundleMsgRateTmp += hashAndMsgMap.get(topicHashCode); if (bundleMsgRateTmp > loadBalancerNamespaceBundleMaxMsgRate || bundleThroughputTmp > loadBalancerNamespaceBundleMaxBandwidthBytes) { long splitStart = i > 0 ? topicNameHashList.get(i - 1) : topicHashCode; long splitEnd = i > 0 ? topicHashCode : topicNameHashList.get(i + 1); long splitMiddle = splitStart + (splitEnd - splitStart) / 2; splitResults.add(splitMiddle); bundleMsgRateTmp = hashAndMsgMap.get(topicHashCode); bundleThroughputTmp = hashAndThroughput.get(topicHashCode); } } ```