Hi Pulsar Community, This is a PIP discussion on how to support split partitions belonging to specified topics in a bundle.
The issue can be found: https://github.com/apache/pulsar/issues/13761 I copy the content here for convenience, any suggestions are welcome and appreciated. ## Motivation As we all know, a namespace bundle may contain lots of partitions belonging to different topics. The throughput of these topics may vary greatly. Some topics may with very high rate/throughput while other topics have a very low rate/throughput. These partitions with high rate/throughput can cause broker overload and bundle unloading. At this point, if we split bundle manually with `range_equally_divide` or `topic_count_equally_divide` split algorithm, there may need many times split before these high rate/through partitions assigned to different new bundles. For convenience, we call these high throughput topics `outstanding topic` and their partitions `outstanding partition` in this PIP. ## Goal Our goal is to make it easier to split `outstanding partition` into new bundles. There are two alternative ways to achieve this. Either of them will add a new algorithm for bundle split. The difference is how the new bundle split algorithm is implemented. One algorithm is to split bundle by `outstanding topic` which will split the bundle into two new bundles and each new bundle contains an equally `outstanding partition` once a time. E.g, a bundle contains lots of topic partitions, and only one `outstanding topic`(T) with 2 `outstanding partition` (T-partition-n, Tpartition-n+1). This algorithm split this bundle at the middle point of these two partition's hashcode. This algorithm has a disadvantage, it can only deal with one `outstanding topic`. So we raised up another algorithm. The other algorithm is to split the bundle at the hashcode point of the `outstanding partition` which will split the bundle into three bundles once a time. The middle one contains the only point the hashcode of the `outstanding partition, the left one is less than the hashcode, the right one is more than the hashcode. E.g. if we have a bundle 0x00_0x10 contains two `outstanding partition` (partition-x and partition-y) whose hashcode is 0x03 and 0x07, this algorithm is going to split bundle the bundle into five new bundles, 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08( for partition-y), 0x08_0x10. ## API Changes The Admin CLI `bin/pulsar-admin namespaces split-bundle -b ${bundle_range}` will add a new parameter "--topic" or "-t" for `outstanding topic` name. The split interface changed from ```JAVA void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles, String splitAlgorithmName)throws PulsarAdminException; ``` to ```java void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles, String splitAlgorithmName, String topic) throws PulsarAdminException; ``` ## Implementation There are changes both from the Admin CLI and the broker side. First, Admin CLI for split bundle should support to specify the `outstanding topic`, ```java /** * Split namespace bundle. * * @param namespace * @param bundle range of bundle to split * @param unloadSplitBundles * @param splitAlgorithmName * @param topic * @throws PulsarAdminException */ void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles, String splitAlgorithmName, String topic) throws PulsarAdminException; ``` ```java /** * Split namespace bundle asynchronously. * * @param namespace * @param bundle range of bundle to split * @param unloadSplitBundles * @param splitAlgorithmName */ CompletableFuture<Void> splitNamespaceBundleAsync( String namespace, String bundle, boolean unloadSplitBundles, String splitAlgorithmName, String topic); ``` And for the broker side, first encapsulates the parameters for bundle split into a new class `BundleSplitOption` ```java public class BundleSplitOption { private NamespaceService service; private NamespaceBundle bundle; private String topic; } ``` add a new split algorithm ```java ublic class SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm implements NamespaceBundleSplitAlgorithm { @Override public CompletableFuture<List<Long>> getSplitBoundary(BundleSplitOption bundleSplitOption) { }); } } ``` add the new algorithm to `NamespaceBundleSplitAlgorithm` ```JAVA String SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE = "specified_topic_count_equally_divide"; List<String> AVAILABLE_ALGORITHMS = Lists.newArrayList(RANGE_EQUALLY_DIVIDE_NAME, TOPIC_COUNT_EQUALLY_DIVIDE, SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE); NamespaceBundleSplitAlgorithm SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE_ALGO = new SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm(); ``` modify the `splitAndOwnBundle` and `splitAndOwnBundleOnceAndRetry` for [[NamespaceService.java]( https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1)](https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1) ```java public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle, boolean unload, NamespaceBundleSplitAlgorithm splitAlgorithm, String topic) { final CompletableFuture<Void> unloadFuture = new CompletableFuture<>(); final AtomicInteger counter = new AtomicInteger(BUNDLE_SPLIT_RETRY_LIMIT); splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture, splitAlgorithm, topic); return unloadFuture; } ``` ```java void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle, boolean unload, AtomicInteger counter, CompletableFuture<Void> completionFuture, NamespaceBundleSplitAlgorithm splitAlgorithm, String topic) { ``` Also, we change the REST api and broker.conf ```java public void splitNamespaceBundle( @Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, @PathParam("bundle") String bundleRange, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @QueryParam("unload") @DefaultValue("false") boolean unload, @QueryParam("topic") @DefaultValue("") String topic) {} ``` ```shell supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_topic_count_equally_divide ```