Hi Aloys, Thanks for the great proposal.
I am considering if we can add a boundary param for split bundle API, The boundary must be between the start and the end of the bundle. looks like the followings: ```java void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles, String splitAlgorithmName, long boundary) throws PulsarAdminException; ``` And, provide a new API to get the position in the bundle for a topic which can be used to determine the split boundary of the bundle when splitting a bundle. Looks like currently we only have a bundle assign allocation strategy based on the topic name hash, maybe we can also consider taking advantage of other characteristics of a topic to choose a different bundle. Just a rough idea. This may be beyond the scope of this proposal. Thanks, Penghui On Thu, Feb 17, 2022 at 11:47 PM Aloys Zhang <aloyszh...@apache.org> wrote: > Hi Pulsar Community, > > This is a PIP discussion on how to support split partitions belonging to > specified topics in a bundle. > > The issue can be found: https://github.com/apache/pulsar/issues/13761 > > I copy the content here for convenience, any suggestions are welcome and > appreciated. > > > ## Motivation > > As we all know, a namespace bundle may contain lots of partitions belonging > to different topics. > The throughput of these topics may vary greatly. Some topics may with very > high rate/throughput while other topics have a very low rate/throughput. > > These partitions with high rate/throughput can cause broker overload and > bundle unloading. > At this point, if we split bundle manually with `range_equally_divide` or > `topic_count_equally_divide` split algorithm, there may need many times > split before these high rate/through partitions assigned to different new > bundles. > > For convenience, we call these high throughput topics `outstanding topic` > and their partitions `outstanding partition` in this PIP. > > ## Goal > > Our goal is to make it easier to split `outstanding partition` into new > bundles. > > There are two alternative ways to achieve this. Either of them will add a > new algorithm for bundle split. The difference is how the new bundle split > algorithm is implemented. > > One algorithm is to split bundle by `outstanding topic` which will split > the bundle into two new bundles and each new bundle contains an equally > `outstanding partition` once a time. > E.g, a bundle contains lots of topic partitions, and only one `outstanding > topic`(T) with 2 `outstanding partition` (T-partition-n, Tpartition-n+1). > This algorithm split this bundle at the middle point of these two > partition's hashcode. This algorithm has a disadvantage, it can only deal > with one `outstanding topic`. > > So we raised up another algorithm. > > The other algorithm is to split the bundle at the hashcode point of the > `outstanding partition` which will split the bundle into three bundles once > a time. The middle one contains the only point the hashcode of the > `outstanding partition, the left one is less than the hashcode, the right > one is more than the hashcode. > E.g. if we have a bundle 0x00_0x10 contains two `outstanding partition` > (partition-x and partition-y) whose hashcode is 0x03 and 0x07, this > algorithm is going to split bundle the bundle into five new bundles, > 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08( for > partition-y), 0x08_0x10. > > ## API Changes > > The Admin CLI `bin/pulsar-admin namespaces split-bundle -b ${bundle_range}` > will add a new parameter "--topic" or "-t" for `outstanding topic` name. > > The split interface changed from > > ```JAVA > void splitNamespaceBundle(String namespace, String bundle, boolean > unloadSplitBundles, String splitAlgorithmName)throws PulsarAdminException; > ``` > > to > > ```java > void splitNamespaceBundle(String namespace, String bundle, boolean > unloadSplitBundles, > String splitAlgorithmName, String topic) > throws PulsarAdminException; > ``` > > ## Implementation > > There are changes both from the Admin CLI and the broker side. > > First, Admin CLI for split bundle should support to specify the > `outstanding topic`, > > ```java > /** > * Split namespace bundle. > * > * @param namespace > * @param bundle range of bundle to split > * @param unloadSplitBundles > * @param splitAlgorithmName > * @param topic > * @throws PulsarAdminException > */ > void splitNamespaceBundle(String namespace, String bundle, boolean > unloadSplitBundles, > String splitAlgorithmName, String topic) > throws PulsarAdminException; > > ``` > > ```java > /** > * Split namespace bundle asynchronously. > * > * @param namespace > * @param bundle range of bundle to split > * @param unloadSplitBundles > * @param splitAlgorithmName > */ > CompletableFuture<Void> splitNamespaceBundleAsync( > String namespace, String bundle, boolean unloadSplitBundles, > String splitAlgorithmName, String topic); > ``` > > And for the broker side, first encapsulates the parameters for bundle split > into a new class `BundleSplitOption` > > ```java > public class BundleSplitOption { > private NamespaceService service; > private NamespaceBundle bundle; > private String topic; > } > ``` > > add a new split algorithm > > ```java > ublic class SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm implements > NamespaceBundleSplitAlgorithm { > @Override > public CompletableFuture<List<Long>> getSplitBoundary(BundleSplitOption > bundleSplitOption) { > > }); > } > } > ``` > > add the new algorithm to `NamespaceBundleSplitAlgorithm` > > ```JAVA > String SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE = > "specified_topic_count_equally_divide"; > > List<String> AVAILABLE_ALGORITHMS = > Lists.newArrayList(RANGE_EQUALLY_DIVIDE_NAME, > TOPIC_COUNT_EQUALLY_DIVIDE, > SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE); > > NamespaceBundleSplitAlgorithm SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE_ALGO = > new SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm(); > ``` > > modify the `splitAndOwnBundle` and `splitAndOwnBundleOnceAndRetry` for > [[NamespaceService.java]( > > https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1)](https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1 > ) > > > ```java > public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle, > boolean unload, > > NamespaceBundleSplitAlgorithm splitAlgorithm, String topic) { > > final CompletableFuture<Void> unloadFuture = new > CompletableFuture<>(); > final AtomicInteger counter = new > AtomicInteger(BUNDLE_SPLIT_RETRY_LIMIT); > splitAndOwnBundleOnceAndRetry(bundle, unload, counter, > unloadFuture, splitAlgorithm, topic); > > return unloadFuture; > } > ``` > > ```java > void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle, > boolean unload, > AtomicInteger counter, > CompletableFuture<Void> > completionFuture, > NamespaceBundleSplitAlgorithm > splitAlgorithm, > String topic) { > ``` > > Also, we change the REST api and broker.conf > > ```java > public void splitNamespaceBundle( > @Suspended final AsyncResponse asyncResponse, > @PathParam("property") String property, > @PathParam("cluster") String cluster, > @PathParam("namespace") String namespace, > @PathParam("bundle") String bundleRange, > @QueryParam("authoritative") @DefaultValue("false") boolean > authoritative, > @QueryParam("unload") @DefaultValue("false") boolean unload, > @QueryParam("topic") @DefaultValue("") String topic) {} > ``` > > ```shell > > supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_topic_count_equally_divide > ``` >