Hi Penghui,
> I am considering if we can add a boundary param for split bundle API, > The boundary must be between the start and the end of the bundle. > looks like the followings: > > ```java > void splitNamespaceBundle(String namespace, String bundle, boolean > unloadSplitBundles, > String splitAlgorithmName, long boundary) > throws PulsarAdminException; > ``` > > And, provide a new API to get the position in the bundle for a topic which > can be used to determine the split boundary of the bundle when splitting > a bundle. > Do you mean that 1. First, add a new API, maybe `getHashPositioin`, to get the hash position in a bundle 2. Then use this position to split the overloaded bundle If so, when we split a bundle with multi partitions of a topic, we need to call the `getHashPositioin` multi times to get the middle position of all these positions. Looks like currently we only have a bundle assign allocation strategy > based on the topic name hash, maybe we can also consider taking advantage > of other characteristics of a topic to choose a different bundle. > It makes sense for me, sounds like a re-hash. It seems like a new way to assign topics to a bundle, but not a bundle split algorithm. I think we can raise another feature or PIP for this idea. PengHui Li <peng...@apache.org> 于2022年2月18日周五 08:54写道: > Hi Aloys, > > Thanks for the great proposal. > > I am considering if we can add a boundary param for split bundle API, > The boundary must be between the start and the end of the bundle. > looks like the followings: > > ```java > void splitNamespaceBundle(String namespace, String bundle, boolean > unloadSplitBundles, > String splitAlgorithmName, long boundary) > throws PulsarAdminException; > ``` > > And, provide a new API to get the position in the bundle for a topic which > can be used to determine the split boundary of the bundle when splitting > a bundle. > > Looks like currently we only have a bundle assign allocation strategy > based on the topic name hash, maybe we can also consider taking advantage > of > other characteristics of a topic to choose a different bundle. Just a rough > idea. > This may be beyond the scope of this proposal. > > Thanks, > Penghui > > On Thu, Feb 17, 2022 at 11:47 PM Aloys Zhang <aloyszh...@apache.org> > wrote: > > > Hi Pulsar Community, > > > > This is a PIP discussion on how to support split partitions belonging to > > specified topics in a bundle. > > > > The issue can be found: https://github.com/apache/pulsar/issues/13761 > > > > I copy the content here for convenience, any suggestions are welcome and > > appreciated. > > > > > > ## Motivation > > > > As we all know, a namespace bundle may contain lots of partitions > belonging > > to different topics. > > The throughput of these topics may vary greatly. Some topics may with > very > > high rate/throughput while other topics have a very low rate/throughput. > > > > These partitions with high rate/throughput can cause broker overload and > > bundle unloading. > > At this point, if we split bundle manually with `range_equally_divide` or > > `topic_count_equally_divide` split algorithm, there may need many times > > split before these high rate/through partitions assigned to different new > > bundles. > > > > For convenience, we call these high throughput topics `outstanding topic` > > and their partitions `outstanding partition` in this PIP. > > > > ## Goal > > > > Our goal is to make it easier to split `outstanding partition` into new > > bundles. > > > > There are two alternative ways to achieve this. Either of them will add a > > new algorithm for bundle split. The difference is how the new bundle > split > > algorithm is implemented. > > > > One algorithm is to split bundle by `outstanding topic` which will split > > the bundle into two new bundles and each new bundle contains an equally > > `outstanding partition` once a time. > > E.g, a bundle contains lots of topic partitions, and only one > `outstanding > > topic`(T) with 2 `outstanding partition` (T-partition-n, > Tpartition-n+1). > > This algorithm split this bundle at the middle point of these two > > partition's hashcode. This algorithm has a disadvantage, it can only > deal > > with one `outstanding topic`. > > > > So we raised up another algorithm. > > > > The other algorithm is to split the bundle at the hashcode point of the > > `outstanding partition` which will split the bundle into three bundles > once > > a time. The middle one contains the only point the hashcode of the > > `outstanding partition, the left one is less than the hashcode, the right > > one is more than the hashcode. > > E.g. if we have a bundle 0x00_0x10 contains two `outstanding partition` > > (partition-x and partition-y) whose hashcode is 0x03 and 0x07, this > > algorithm is going to split bundle the bundle into five new bundles, > > 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08( for > > partition-y), 0x08_0x10. > > > > ## API Changes > > > > The Admin CLI `bin/pulsar-admin namespaces split-bundle -b > ${bundle_range}` > > will add a new parameter "--topic" or "-t" for `outstanding topic` name. > > > > The split interface changed from > > > > ```JAVA > > void splitNamespaceBundle(String namespace, String bundle, boolean > > unloadSplitBundles, String splitAlgorithmName)throws > PulsarAdminException; > > ``` > > > > to > > > > ```java > > void splitNamespaceBundle(String namespace, String bundle, boolean > > unloadSplitBundles, > > String splitAlgorithmName, String topic) > > throws PulsarAdminException; > > ``` > > > > ## Implementation > > > > There are changes both from the Admin CLI and the broker side. > > > > First, Admin CLI for split bundle should support to specify the > > `outstanding topic`, > > > > ```java > > /** > > * Split namespace bundle. > > * > > * @param namespace > > * @param bundle range of bundle to split > > * @param unloadSplitBundles > > * @param splitAlgorithmName > > * @param topic > > * @throws PulsarAdminException > > */ > > void splitNamespaceBundle(String namespace, String bundle, boolean > > unloadSplitBundles, > > String splitAlgorithmName, String topic) > > throws PulsarAdminException; > > > > ``` > > > > ```java > > /** > > * Split namespace bundle asynchronously. > > * > > * @param namespace > > * @param bundle range of bundle to split > > * @param unloadSplitBundles > > * @param splitAlgorithmName > > */ > > CompletableFuture<Void> splitNamespaceBundleAsync( > > String namespace, String bundle, boolean unloadSplitBundles, > > String splitAlgorithmName, String topic); > > ``` > > > > And for the broker side, first encapsulates the parameters for bundle > split > > into a new class `BundleSplitOption` > > > > ```java > > public class BundleSplitOption { > > private NamespaceService service; > > private NamespaceBundle bundle; > > private String topic; > > } > > ``` > > > > add a new split algorithm > > > > ```java > > ublic class SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm > implements > > NamespaceBundleSplitAlgorithm { > > @Override > > public CompletableFuture<List<Long>> > getSplitBoundary(BundleSplitOption > > bundleSplitOption) { > > > > }); > > } > > } > > ``` > > > > add the new algorithm to `NamespaceBundleSplitAlgorithm` > > > > ```JAVA > > String SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE = > > "specified_topic_count_equally_divide"; > > > > List<String> AVAILABLE_ALGORITHMS = > > Lists.newArrayList(RANGE_EQUALLY_DIVIDE_NAME, > > TOPIC_COUNT_EQUALLY_DIVIDE, > > SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE); > > > > NamespaceBundleSplitAlgorithm SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE_ALGO > = > > new SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm(); > > ``` > > > > modify the `splitAndOwnBundle` and `splitAndOwnBundleOnceAndRetry` for > > [[NamespaceService.java]( > > > > > https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1)](https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1 > > ) > > > > > > ```java > > public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle, > > boolean unload, > > > > NamespaceBundleSplitAlgorithm splitAlgorithm, String topic) { > > > > final CompletableFuture<Void> unloadFuture = new > > CompletableFuture<>(); > > final AtomicInteger counter = new > > AtomicInteger(BUNDLE_SPLIT_RETRY_LIMIT); > > splitAndOwnBundleOnceAndRetry(bundle, unload, counter, > > unloadFuture, splitAlgorithm, topic); > > > > return unloadFuture; > > } > > ``` > > > > ```java > > void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle, > > boolean unload, > > AtomicInteger counter, > > CompletableFuture<Void> > > completionFuture, > > NamespaceBundleSplitAlgorithm > > splitAlgorithm, > > String topic) { > > ``` > > > > Also, we change the REST api and broker.conf > > > > ```java > > public void splitNamespaceBundle( > > @Suspended final AsyncResponse asyncResponse, > > @PathParam("property") String property, > > @PathParam("cluster") String cluster, > > @PathParam("namespace") String namespace, > > @PathParam("bundle") String bundleRange, > > @QueryParam("authoritative") @DefaultValue("false") boolean > > authoritative, > > @QueryParam("unload") @DefaultValue("false") boolean unload, > > @QueryParam("topic") @DefaultValue("") String topic) {} > > ``` > > > > ```shell > > > > > supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_topic_count_equally_divide > > ``` > > >