Hi Penghui,

> I am considering if we can add a boundary param for split bundle API,
> The boundary must be between the start and the end of the bundle.
> looks like the followings:
>
> ```java
> void splitNamespaceBundle(String namespace, String bundle, boolean
> unloadSplitBundles,
>                               String splitAlgorithmName, long boundary)
> throws PulsarAdminException;
> ```
>
> And, provide a new API to get the position in the bundle for a topic which
> can be used to determine the split boundary of the bundle when splitting
> a bundle.
>

 Do you mean that
1. First, add a new API, maybe `getHashPositioin`,  to get the hash
position in a bundle
2. Then use this position to split the overloaded bundle
If so, when we split a bundle with multi partitions of a topic, we need to
call the `getHashPositioin` multi times to get the middle position of all
these positions.

Looks like currently we only have a bundle assign allocation strategy
> based on the topic name hash, maybe we can also consider taking advantage
> of other characteristics of a topic to choose a different bundle.
>
It makes sense for me, sounds like a re-hash. It seems like a new way to
assign topics to a  bundle, but not a bundle split algorithm. I think we
can raise another feature or PIP for this idea.



PengHui Li <peng...@apache.org> 于2022年2月18日周五 08:54写道:

> Hi Aloys,
>
> Thanks for the great proposal.
>
> I am considering if we can add a boundary param for split bundle API,
> The boundary must be between the start and the end of the bundle.
> looks like the followings:
>
> ```java
> void splitNamespaceBundle(String namespace, String bundle, boolean
> unloadSplitBundles,
>                               String splitAlgorithmName, long boundary)
> throws PulsarAdminException;
> ```
>
> And, provide a new API to get the position in the bundle for a topic which
> can be used to determine the split boundary of the bundle when splitting
> a bundle.
>
> Looks like currently we only have a bundle assign allocation strategy
> based on the topic name hash, maybe we can also consider taking advantage
> of
> other characteristics of a topic to choose a different bundle. Just a rough
> idea.
> This may be beyond the scope of this proposal.
>
> Thanks,
> Penghui
>
> On Thu, Feb 17, 2022 at 11:47 PM Aloys Zhang <aloyszh...@apache.org>
> wrote:
>
> > Hi Pulsar Community,
> >
> > This is a PIP discussion on how to support split partitions belonging to
> > specified topics in a bundle.
> >
> > The issue can be found: https://github.com/apache/pulsar/issues/13761
> >
> > I copy the content here for convenience, any suggestions are welcome and
> > appreciated.
> >
> >
> > ## Motivation
> >
> > As we all know, a namespace bundle may contain lots of partitions
> belonging
> > to different topics.
> > The throughput of these topics may vary greatly. Some topics may with
> very
> > high rate/throughput while other topics have a very low rate/throughput.
> >
> > These partitions with high rate/throughput can cause broker overload and
> > bundle unloading.
> > At this point, if we split bundle manually with `range_equally_divide` or
> > `topic_count_equally_divide` split algorithm, there may need many times
> > split before these high rate/through partitions assigned to different new
> > bundles.
> >
> > For convenience, we call these high throughput topics `outstanding topic`
> > and their partitions `outstanding partition` in this PIP.
> >
> > ## Goal
> >
> > Our goal is to make it easier to split `outstanding partition` into new
> > bundles.
> >
> > There are two alternative ways to achieve this. Either of them will add a
> > new algorithm for bundle split. The difference is how the new bundle
> split
> > algorithm is implemented.
> >
> > One algorithm is to split bundle by `outstanding topic` which will split
> > the bundle into two new bundles and each new bundle contains an equally
> > `outstanding partition` once a time.
> > E.g, a bundle contains lots of topic partitions, and only one
> `outstanding
> > topic`(T) with 2  `outstanding partition` (T-partition-n,
> Tpartition-n+1).
> > This algorithm split this bundle at the middle point of these two
> > partition's hashcode.  This algorithm has a disadvantage, it can only
> deal
> > with one `outstanding topic`.
> >
> > So we raised up another algorithm.
> >
> > The other algorithm is to split the bundle at the hashcode point of the
> > `outstanding partition` which will split the bundle into three bundles
> once
> > a time. The middle one contains the only point the hashcode of the
> > `outstanding partition, the left one is less than the hashcode, the right
> > one is more than the hashcode.
> > E.g. if we have a bundle 0x00_0x10 contains two `outstanding partition`
> > (partition-x and partition-y) whose hashcode is 0x03 and 0x07, this
> > algorithm  is going to split bundle the bundle into five new bundles,
> > 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08( for
> > partition-y), 0x08_0x10.
> >
> > ## API Changes
> >
> > The Admin CLI `bin/pulsar-admin namespaces split-bundle -b
> ${bundle_range}`
> > will add a new parameter "--topic" or "-t" for  `outstanding topic` name.
> >
> > The split interface changed from
> >
> > ```JAVA
> > void splitNamespaceBundle(String namespace, String bundle, boolean
> > unloadSplitBundles, String splitAlgorithmName)throws
> PulsarAdminException;
> > ```
> >
> > to
> >
> > ```java
> > void splitNamespaceBundle(String namespace, String bundle, boolean
> > unloadSplitBundles,
> >                               String splitAlgorithmName, String topic)
> > throws PulsarAdminException;
> > ```
> >
> > ## Implementation
> >
> > There are changes both from the Admin CLI and the broker side.
> >
> > First, Admin CLI for split bundle should support to specify the
> > `outstanding topic`,
> >
> > ```java
> > /**
> >      * Split namespace bundle.
> >      *
> >      * @param namespace
> >      * @param bundle range of bundle to split
> >      * @param unloadSplitBundles
> >      * @param splitAlgorithmName
> >      * @param topic
> >      * @throws PulsarAdminException
> >      */
> >     void splitNamespaceBundle(String namespace, String bundle, boolean
> > unloadSplitBundles,
> >                               String splitAlgorithmName, String topic)
> > throws PulsarAdminException;
> >
> > ```
> >
> > ```java
> > /**
> >      * Split namespace bundle asynchronously.
> >      *
> >      * @param namespace
> >      * @param bundle range of bundle to split
> >      * @param unloadSplitBundles
> >      * @param splitAlgorithmName
> >      */
> >     CompletableFuture<Void> splitNamespaceBundleAsync(
> >             String namespace, String bundle, boolean unloadSplitBundles,
> > String splitAlgorithmName, String topic);
> > ```
> >
> > And for the broker side, first encapsulates the parameters for bundle
> split
> > into a new class `BundleSplitOption`
> >
> > ```java
> > public class BundleSplitOption {
> >     private NamespaceService service;
> >     private NamespaceBundle bundle;
> >     private String topic;
> > }
> > ```
> >
> > add a new split algorithm
> >
> > ```java
> > ublic class SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm
> implements
> > NamespaceBundleSplitAlgorithm {
> >     @Override
> >     public CompletableFuture<List<Long>>
> getSplitBoundary(BundleSplitOption
> > bundleSplitOption) {
> >
> >         });
> >     }
> > }
> > ```
> >
> > add the new algorithm to `NamespaceBundleSplitAlgorithm`
> >
> > ```JAVA
> > String SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE =
> > "specified_topic_count_equally_divide";
> >
> > List<String> AVAILABLE_ALGORITHMS =
> > Lists.newArrayList(RANGE_EQUALLY_DIVIDE_NAME,
> >             TOPIC_COUNT_EQUALLY_DIVIDE,
> > SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE);
> >
> >  NamespaceBundleSplitAlgorithm SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE_ALGO
> =
> >             new SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm();
> > ```
> >
> > modify the `splitAndOwnBundle` and `splitAndOwnBundleOnceAndRetry` for
> >  [[NamespaceService.java](
> >
> >
> https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1)](https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1
> > )
> >
> >
> > ```java
> > public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle,
> > boolean unload,
> >
> >  NamespaceBundleSplitAlgorithm splitAlgorithm, String topic) {
> >
> >         final CompletableFuture<Void> unloadFuture = new
> > CompletableFuture<>();
> >         final AtomicInteger counter = new
> > AtomicInteger(BUNDLE_SPLIT_RETRY_LIMIT);
> >         splitAndOwnBundleOnceAndRetry(bundle, unload, counter,
> > unloadFuture, splitAlgorithm, topic);
> >
> >         return unloadFuture;
> >     }
> > ```
> >
> > ```java
> > void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
> >                                        boolean unload,
> >                                        AtomicInteger counter,
> >                                        CompletableFuture<Void>
> > completionFuture,
> >                                        NamespaceBundleSplitAlgorithm
> > splitAlgorithm,
> >                                        String topic) {
> > ```
> >
> > Also, we change the REST api and broker.conf
> >
> > ```java
> > public void splitNamespaceBundle(
> >             @Suspended final AsyncResponse asyncResponse,
> >             @PathParam("property") String property,
> >             @PathParam("cluster") String cluster,
> >             @PathParam("namespace") String namespace,
> >             @PathParam("bundle") String bundleRange,
> >             @QueryParam("authoritative") @DefaultValue("false") boolean
> > authoritative,
> >             @QueryParam("unload") @DefaultValue("false") boolean unload,
> >             @QueryParam("topic") @DefaultValue("") String topic) {}
> > ```
> >
> > ```shell
> >
> >
> supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_topic_count_equally_divide
> > ```
> >
>

Reply via email to