Hi Aloys,
+1 for this great PIP.

> The Admin CLI `bin/pulsar-admin namespaces split-bundle -b ${bundle_range}`
> will add a new parameter "--topic" or "-t" for  `outstanding topic` name.

Do we have limitation on this "topic" parameter. Can this be a partitioned 
topic?
If so, will this new algorithm split the bundle into more than 2 bundles? like 
each bundle for
one partition.

> This algorithm has a disadvantage, it can only deal
> with one `outstanding topic`.

For this disadvantage, I think it can be solved by extends the "topic" 
parameter from one topic to a topic list.

> The other algorithm is to split the bundle at the hashcode point of the
> `outstanding partition` which will split the bundle into three bundles once
> a time. The middle one contains the only point the hashcode of the
> `outstanding partition, the left one is less than the hashcode, the right
> one is more than the hashcode.
> E.g. if we have a bundle 0x00_0x10 contains two `outstanding partition`
> (partition-x and partition-y) whose hashcode is 0x03 and 0x07, this
> algorithm  is going to split bundle the bundle into five new bundles,
> 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08( for
> partition-y), 0x08_0x10.

I think this approach have more potential with abnormal topic isolation. If we 
can introduce 
some kind of bundle isolation strategy, (like broker-bundle affinity and 
anti-affinity mechanism), we can easily isolate some unexpected traffic to some 
empty brokers. 
IMO, this would improve the stability of broker cluster.

Thanks,
Haiting

On 2022/02/17 15:47:15 Aloys Zhang wrote:
> Hi Pulsar Community,
> 
> This is a PIP discussion on how to support split partitions belonging to
> specified topics in a bundle.
> 
> The issue can be found: https://github.com/apache/pulsar/issues/13761
> 
> I copy the content here for convenience, any suggestions are welcome and
> appreciated.
> 
> 
> ## Motivation
> 
> As we all know, a namespace bundle may contain lots of partitions belonging
> to different topics.
> The throughput of these topics may vary greatly. Some topics may with very
> high rate/throughput while other topics have a very low rate/throughput.
> 
> These partitions with high rate/throughput can cause broker overload and
> bundle unloading.
> At this point, if we split bundle manually with `range_equally_divide` or
> `topic_count_equally_divide` split algorithm, there may need many times
> split before these high rate/through partitions assigned to different new
> bundles.
> 
> For convenience, we call these high throughput topics `outstanding topic`
> and their partitions `outstanding partition` in this PIP.
> 
> ## Goal
> 
> Our goal is to make it easier to split `outstanding partition` into new
> bundles.
> 
> There are two alternative ways to achieve this. Either of them will add a
> new algorithm for bundle split. The difference is how the new bundle split
> algorithm is implemented.
> 
> One algorithm is to split bundle by `outstanding topic` which will split
> the bundle into two new bundles and each new bundle contains an equally
> `outstanding partition` once a time.
> E.g, a bundle contains lots of topic partitions, and only one `outstanding
> topic`(T) with 2  `outstanding partition` (T-partition-n, Tpartition-n+1).
> This algorithm split this bundle at the middle point of these two
> partition's hashcode.  This algorithm has a disadvantage, it can only deal
> with one `outstanding topic`.
> 
> So we raised up another algorithm.
> 
> The other algorithm is to split the bundle at the hashcode point of the
> `outstanding partition` which will split the bundle into three bundles once
> a time. The middle one contains the only point the hashcode of the
> `outstanding partition, the left one is less than the hashcode, the right
> one is more than the hashcode.
> E.g. if we have a bundle 0x00_0x10 contains two `outstanding partition`
> (partition-x and partition-y) whose hashcode is 0x03 and 0x07, this
> algorithm  is going to split bundle the bundle into five new bundles,
> 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08( for
> partition-y), 0x08_0x10.
> 
> ## API Changes
> 
> The Admin CLI `bin/pulsar-admin namespaces split-bundle -b ${bundle_range}`
> will add a new parameter "--topic" or "-t" for  `outstanding topic` name.
> 
> The split interface changed from
> 
> ```JAVA
> void splitNamespaceBundle(String namespace, String bundle, boolean
> unloadSplitBundles, String splitAlgorithmName)throws PulsarAdminException;
> ```
> 
> to
> 
> ```java
> void splitNamespaceBundle(String namespace, String bundle, boolean
> unloadSplitBundles,
>                               String splitAlgorithmName, String topic)
> throws PulsarAdminException;
> ```
> 
> ## Implementation
> 
> There are changes both from the Admin CLI and the broker side.
> 
> First, Admin CLI for split bundle should support to specify the
> `outstanding topic`,
> 
> ```java
> /**
>      * Split namespace bundle.
>      *
>      * @param namespace
>      * @param bundle range of bundle to split
>      * @param unloadSplitBundles
>      * @param splitAlgorithmName
>      * @param topic
>      * @throws PulsarAdminException
>      */
>     void splitNamespaceBundle(String namespace, String bundle, boolean
> unloadSplitBundles,
>                               String splitAlgorithmName, String topic)
> throws PulsarAdminException;
> 
> ```
> 
> ```java
> /**
>      * Split namespace bundle asynchronously.
>      *
>      * @param namespace
>      * @param bundle range of bundle to split
>      * @param unloadSplitBundles
>      * @param splitAlgorithmName
>      */
>     CompletableFuture<Void> splitNamespaceBundleAsync(
>             String namespace, String bundle, boolean unloadSplitBundles,
> String splitAlgorithmName, String topic);
> ```
> 
> And for the broker side, first encapsulates the parameters for bundle split
> into a new class `BundleSplitOption`
> 
> ```java
> public class BundleSplitOption {
>     private NamespaceService service;
>     private NamespaceBundle bundle;
>     private String topic;
> }
> ```
> 
> add a new split algorithm
> 
> ```java
> ublic class SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm implements
> NamespaceBundleSplitAlgorithm {
>     @Override
>     public CompletableFuture<List<Long>> getSplitBoundary(BundleSplitOption
> bundleSplitOption) {
> 
>         });
>     }
> }
> ```
> 
> add the new algorithm to `NamespaceBundleSplitAlgorithm`
> 
> ```JAVA
> String SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE =
> "specified_topic_count_equally_divide";
> 
> List<String> AVAILABLE_ALGORITHMS =
> Lists.newArrayList(RANGE_EQUALLY_DIVIDE_NAME,
>             TOPIC_COUNT_EQUALLY_DIVIDE,
> SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE);
> 
>  NamespaceBundleSplitAlgorithm SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE_ALGO =
>             new SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm();
> ```
> 
> modify the `splitAndOwnBundle` and `splitAndOwnBundleOnceAndRetry` for
>  [[NamespaceService.java](
> https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1)](https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1)
> 
> 
> ```java
> public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle,
> boolean unload,
> 
>  NamespaceBundleSplitAlgorithm splitAlgorithm, String topic) {
> 
>         final CompletableFuture<Void> unloadFuture = new
> CompletableFuture<>();
>         final AtomicInteger counter = new
> AtomicInteger(BUNDLE_SPLIT_RETRY_LIMIT);
>         splitAndOwnBundleOnceAndRetry(bundle, unload, counter,
> unloadFuture, splitAlgorithm, topic);
> 
>         return unloadFuture;
>     }
> ```
> 
> ```java
> void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
>                                        boolean unload,
>                                        AtomicInteger counter,
>                                        CompletableFuture<Void>
> completionFuture,
>                                        NamespaceBundleSplitAlgorithm
> splitAlgorithm,
>                                        String topic) {
> ```
> 
> Also, we change the REST api and broker.conf
> 
> ```java
> public void splitNamespaceBundle(
>             @Suspended final AsyncResponse asyncResponse,
>             @PathParam("property") String property,
>             @PathParam("cluster") String cluster,
>             @PathParam("namespace") String namespace,
>             @PathParam("bundle") String bundleRange,
>             @QueryParam("authoritative") @DefaultValue("false") boolean
> authoritative,
>             @QueryParam("unload") @DefaultValue("false") boolean unload,
>             @QueryParam("topic") @DefaultValue("") String topic) {}
> ```
> 
> ```shell
> supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_topic_count_equally_divide
> ```
> 

Reply via email to