Hi Pulsar Community,

This is a PIP discussion on how to support split partitions belonging to
specified topics in a bundle.

The issue can be found: https://github.com/apache/pulsar/issues/13761

I copy the content here for convenience, any suggestions are welcome and
appreciated.


## Motivation

As we all know, a namespace bundle may contain lots of partitions belonging
to different topics.
The throughput of these topics may vary greatly. Some topics may with very
high rate/throughput while other topics have a very low rate/throughput.

These partitions with high rate/throughput can cause broker overload and
bundle unloading.
At this point, if we split bundle manually with `range_equally_divide` or
`topic_count_equally_divide` split algorithm, there may need many times
split before these high rate/through partitions assigned to different new
bundles.

For convenience, we call these high throughput topics `outstanding topic`
and their partitions `outstanding partition` in this PIP.

## Goal

Our goal is to make it easier to split `outstanding partition` into new
bundles.

There are two alternative ways to achieve this. Either of them will add a
new algorithm for bundle split. The difference is how the new bundle split
algorithm is implemented.

One algorithm is to split bundle by `outstanding topic` which will split
the bundle into two new bundles and each new bundle contains an equally
`outstanding partition` once a time.
E.g, a bundle contains lots of topic partitions, and only one `outstanding
topic`(T) with 2  `outstanding partition` (T-partition-n, Tpartition-n+1).
This algorithm split this bundle at the middle point of these two
partition's hashcode.  This algorithm has a disadvantage, it can only deal
with one `outstanding topic`.

So we raised up another algorithm.

The other algorithm is to split the bundle at the hashcode point of the
`outstanding partition` which will split the bundle into three bundles once
a time. The middle one contains the only point the hashcode of the
`outstanding partition, the left one is less than the hashcode, the right
one is more than the hashcode.
E.g. if we have a bundle 0x00_0x10 contains two `outstanding partition`
(partition-x and partition-y) whose hashcode is 0x03 and 0x07, this
algorithm  is going to split bundle the bundle into five new bundles,
0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08( for
partition-y), 0x08_0x10.

## API Changes

The Admin CLI `bin/pulsar-admin namespaces split-bundle -b ${bundle_range}`
will add a new parameter "--topic" or "-t" for  `outstanding topic` name.

The split interface changed from

```JAVA
void splitNamespaceBundle(String namespace, String bundle, boolean
unloadSplitBundles, String splitAlgorithmName)throws PulsarAdminException;
```

to

```java
void splitNamespaceBundle(String namespace, String bundle, boolean
unloadSplitBundles,
                              String splitAlgorithmName, String topic)
throws PulsarAdminException;
```

## Implementation

There are changes both from the Admin CLI and the broker side.

First, Admin CLI for split bundle should support to specify the
`outstanding topic`,

```java
/**
     * Split namespace bundle.
     *
     * @param namespace
     * @param bundle range of bundle to split
     * @param unloadSplitBundles
     * @param splitAlgorithmName
     * @param topic
     * @throws PulsarAdminException
     */
    void splitNamespaceBundle(String namespace, String bundle, boolean
unloadSplitBundles,
                              String splitAlgorithmName, String topic)
throws PulsarAdminException;

```

```java
/**
     * Split namespace bundle asynchronously.
     *
     * @param namespace
     * @param bundle range of bundle to split
     * @param unloadSplitBundles
     * @param splitAlgorithmName
     */
    CompletableFuture<Void> splitNamespaceBundleAsync(
            String namespace, String bundle, boolean unloadSplitBundles,
String splitAlgorithmName, String topic);
```

And for the broker side, first encapsulates the parameters for bundle split
into a new class `BundleSplitOption`

```java
public class BundleSplitOption {
    private NamespaceService service;
    private NamespaceBundle bundle;
    private String topic;
}
```

add a new split algorithm

```java
ublic class SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm implements
NamespaceBundleSplitAlgorithm {
    @Override
    public CompletableFuture<List<Long>> getSplitBoundary(BundleSplitOption
bundleSplitOption) {

        });
    }
}
```

add the new algorithm to `NamespaceBundleSplitAlgorithm`

```JAVA
String SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE =
"specified_topic_count_equally_divide";

List<String> AVAILABLE_ALGORITHMS =
Lists.newArrayList(RANGE_EQUALLY_DIVIDE_NAME,
            TOPIC_COUNT_EQUALLY_DIVIDE,
SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE);

 NamespaceBundleSplitAlgorithm SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE_ALGO =
            new SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm();
```

modify the `splitAndOwnBundle` and `splitAndOwnBundleOnceAndRetry` for
 [[NamespaceService.java](
https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1)](https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1)


```java
public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle,
boolean unload,

 NamespaceBundleSplitAlgorithm splitAlgorithm, String topic) {

        final CompletableFuture<Void> unloadFuture = new
CompletableFuture<>();
        final AtomicInteger counter = new
AtomicInteger(BUNDLE_SPLIT_RETRY_LIMIT);
        splitAndOwnBundleOnceAndRetry(bundle, unload, counter,
unloadFuture, splitAlgorithm, topic);

        return unloadFuture;
    }
```

```java
void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
                                       boolean unload,
                                       AtomicInteger counter,
                                       CompletableFuture<Void>
completionFuture,
                                       NamespaceBundleSplitAlgorithm
splitAlgorithm,
                                       String topic) {
```

Also, we change the REST api and broker.conf

```java
public void splitNamespaceBundle(
            @Suspended final AsyncResponse asyncResponse,
            @PathParam("property") String property,
            @PathParam("cluster") String cluster,
            @PathParam("namespace") String namespace,
            @PathParam("bundle") String bundleRange,
            @QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
            @QueryParam("unload") @DefaultValue("false") boolean unload,
            @QueryParam("topic") @DefaultValue("") String topic) {}
```

```shell
supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_topic_count_equally_divide
```

Reply via email to