> 2. calculate the position to split this bundle(also need a new API)
Should this be "positions"? We are going to split one bundle into 
multi-bundles,  
in most cases, bundle number will be position number + 1, right? 

> And the (anti-)affinity way needs more discussion or maybe we can introduce
> a new PIP for it.
+1, this is not in the scope of this PIP.


Thanks,
Haiting

On 2022/02/21 08:57:01 Aloys Zhang wrote:
> Hi penghui and haiting,
> 
> I try to figure out how the (anti-)affinity works.
> 
> > if I understand correctly, it looks like if we have a partitioned topic
> with 10
> > partitions under a namespace with 16 bundles, if applies the
> anti-affinity policy,
> > partition-0 map to bundle 0, partition-1 map to bundle 1, and so on.
> > Of course, it is not necessary for every partitioned topic to start from
> bundle 0,
> > we can use the partition-0 hash to determine the start bundle index.
> 
> As penghui described, I think this is a mechanism for assigning topics to a
> bundle that controls how a topic is mapped to a bundle.
> 
> > IMO, this affinity serves the purpose of isolating an abnormal topic to
> some spare
> > brokers.  These brokers host these kind of topics only. Here are some
> cases :
> 
> And as haiting mentioned here, It's more like an isolation policy that
> decides topics can be owned by which broker.
> So, I am a little confused about how the (anti-)affinity works now.
> 
> Back to this PIP which aims to solve the problem that a small number of
> topics in a bundle have a load that exceeds the average.
> So, we can
> 1. get the positions for the topics with we are interested( need a new API)
> 2. calculate the position to split this bundle(also need a new API)
> I think this way is enough for solving the problem.
> 
> And the (anti-)affinity way needs more discussion or maybe we can introduce
> a new PIP for it.
> What do you think?@penghui @haiting
> 
> 
> Thanks,
> Aloys
> 
> Aloys Zhang <aloyszh...@apache.org> 于2022年2月21日周一 14:39写道:
> 
> > Hi, penghui
> >
> > >  The new API does not necessarily have to query by topic one by one,
> > we have listed all the "topic -> position" of a bundle?
> >
> > I see. After we got all the positions of the topics we want to split in a
> > bundle, it's quite easy for us to decide how to it.
> >
> > Haiting Jiang <jianghait...@apache.org> 于2022年2月20日周日 12:05写道:
> >
> >> > Do you have an example for affinity? I don't fully understand how this
> >> is
> >> > used
> >> > in practice.
> >>
> >> IMO, this affinity serves the purpose of isolating an abnormal topic to
> >> some spare
> >> brokers.  These brokers host these kind of topics only. Here are some
> >> cases :
> >>
> >> 1. A topic may have unexpected short spike traffic flows periodically and
> >> causing broker overloads and negative impact on other topics.
> >> Until we have more proper solutions, we can always isolate these topics
> >> first,
> >>  and make the service recover time as small as possible.
> >>
> >> 2. Some users may encounter some bugs in brokers, and we can isolate the
> >> topic to
> >> exclusive brokers, and use more radical approach to locate the bug, like
> >> enable debug
> >> level logs or even add some temporary code patch.
> >>
> >> 3. User may already have configured failure domain and anti-affinity
> >> namespace, but with
> >> business logic code changes, some topic may need to migrate from one
> >> namespace
> >> to another. This will take some time for user to change the client side
> >> config.
> >> In the meanwhile, we can isolate the topic first.
> >>
> >> Thanks,
> >> Haiting
> >>
> >> On 2022/02/18 15:26:09 PengHui Li wrote:
> >> > Hi Haiting,
> >> >
> >> > > I think this approach have more potential with abnormal topic
> >> isolation.
> >> > If we can introduce
> >> > some kind of bundle isolation strategy, (like broker-bundle affinity and
> >> > anti-affinity mechanism), we can easily isolate some unexpected traffic
> >> to
> >> > some empty brokers.
> >> > IMO, this would improve the stability of broker cluster.
> >> >
> >> > if I understand correctly, it looks like if we have a partitioned topic
> >> > with 10
> >> > partitions under a namespace with 16 bundles, if applies the
> >> anti-affinity
> >> > policy,
> >> > partition-0 map to bundle 0, partition-1 map to bundle 1, and so on.
> >> > Of course, it is not necessary for every partitioned topic to start from
> >> > bundle 0,
> >> > we can use the partition-0 hash to determine the start bundle index.
> >> >
> >> > Do you have an example for affinity? I don't fully understand how this
> >> is
> >> > used
> >> > in practice.
> >> >
> >> > Best,
> >> > Penghui
> >> >
> >> > On Fri, Feb 18, 2022 at 11:16 PM PengHui Li <peng...@apache.org> wrote:
> >> >
> >> > > Hi Aloys,
> >> > >
> >> > > >  Do you mean that
> >> > > 1. First, add a new API, maybe `getHashPositioin`,  to get the hash
> >> > > position in a bundle
> >> > > 2. Then use this position to split the overloaded bundle
> >> > > If so, when we split a bundle with multi partitions of a topic, we
> >> need to
> >> > > call the `getHashPositioin` multi times to get the middle position of
> >> all
> >> > > these positions.
> >> > >
> >> > > Yes, this want I mean. In this way, users can control to assign 1
> >> topic or
> >> > > 3 topics to one bundle. This is more like increasing the transparency
> >> of
> >> > > the topic in the bundle, you can all the positions of the topics, so
> >> how
> >> > > planning for bundle splitting becomes more flexible.
> >> > >
> >> > > The new API does not necessarily have to query by topic one by one,
> >> > > we have listed all the "topic -> position" of a bundle?
> >> > >
> >> > > Thanks,
> >> > > Penghui
> >> > >
> >> > > On Fri, Feb 18, 2022 at 4:51 PM Haiting Jiang <
> >> jianghait...@apache.org>
> >> > > wrote:
> >> > >
> >> > >> Hi Aloys,
> >> > >> +1 for this great PIP.
> >> > >>
> >> > >> > The Admin CLI `bin/pulsar-admin namespaces split-bundle -b
> >> > >> ${bundle_range}`
> >> > >> > will add a new parameter "--topic" or "-t" for  `outstanding topic`
> >> > >> name.
> >> > >>
> >> > >> Do we have limitation on this "topic" parameter. Can this be a
> >> > >> partitioned topic?
> >> > >> If so, will this new algorithm split the bundle into more than 2
> >> bundles?
> >> > >> like each bundle for
> >> > >> one partition.
> >> > >>
> >> > >> > This algorithm has a disadvantage, it can only deal
> >> > >> > with one `outstanding topic`.
> >> > >>
> >> > >> For this disadvantage, I think it can be solved by extends the
> >> "topic"
> >> > >> parameter from one topic to a topic list.
> >> > >>
> >> > >> > The other algorithm is to split the bundle at the hashcode point
> >> of the
> >> > >> > `outstanding partition` which will split the bundle into three
> >> bundles
> >> > >> once
> >> > >> > a time. The middle one contains the only point the hashcode of the
> >> > >> > `outstanding partition, the left one is less than the hashcode, the
> >> > >> right
> >> > >> > one is more than the hashcode.
> >> > >> > E.g. if we have a bundle 0x00_0x10 contains two `outstanding
> >> partition`
> >> > >> > (partition-x and partition-y) whose hashcode is 0x03 and 0x07, this
> >> > >> > algorithm  is going to split bundle the bundle into five new
> >> bundles,
> >> > >> > 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08( for
> >> > >> > partition-y), 0x08_0x10.
> >> > >>
> >> > >> I think this approach have more potential with abnormal topic
> >> isolation.
> >> > >> If we can introduce
> >> > >> some kind of bundle isolation strategy, (like broker-bundle affinity
> >> and
> >> > >> anti-affinity mechanism), we can easily isolate some unexpected
> >> traffic to
> >> > >> some empty brokers.
> >> > >> IMO, this would improve the stability of broker cluster.
> >> > >>
> >> > >> Thanks,
> >> > >> Haiting
> >> > >>
> >> > >> On 2022/02/17 15:47:15 Aloys Zhang wrote:
> >> > >> > Hi Pulsar Community,
> >> > >> >
> >> > >> > This is a PIP discussion on how to support split partitions
> >> belonging to
> >> > >> > specified topics in a bundle.
> >> > >> >
> >> > >> > The issue can be found:
> >> https://github.com/apache/pulsar/issues/13761
> >> > >> >
> >> > >> > I copy the content here for convenience, any suggestions are
> >> welcome and
> >> > >> > appreciated.
> >> > >> >
> >> > >> >
> >> > >> > ## Motivation
> >> > >> >
> >> > >> > As we all know, a namespace bundle may contain lots of partitions
> >> > >> belonging
> >> > >> > to different topics.
> >> > >> > The throughput of these topics may vary greatly. Some topics may
> >> with
> >> > >> very
> >> > >> > high rate/throughput while other topics have a very low
> >> rate/throughput.
> >> > >> >
> >> > >> > These partitions with high rate/throughput can cause broker
> >> overload and
> >> > >> > bundle unloading.
> >> > >> > At this point, if we split bundle manually with
> >> `range_equally_divide`
> >> > >> or
> >> > >> > `topic_count_equally_divide` split algorithm, there may need many
> >> times
> >> > >> > split before these high rate/through partitions assigned to
> >> different
> >> > >> new
> >> > >> > bundles.
> >> > >> >
> >> > >> > For convenience, we call these high throughput topics `outstanding
> >> > >> topic`
> >> > >> > and their partitions `outstanding partition` in this PIP.
> >> > >> >
> >> > >> > ## Goal
> >> > >> >
> >> > >> > Our goal is to make it easier to split `outstanding partition`
> >> into new
> >> > >> > bundles.
> >> > >> >
> >> > >> > There are two alternative ways to achieve this. Either of them
> >> will add
> >> > >> a
> >> > >> > new algorithm for bundle split. The difference is how the new
> >> bundle
> >> > >> split
> >> > >> > algorithm is implemented.
> >> > >> >
> >> > >> > One algorithm is to split bundle by `outstanding topic` which will
> >> split
> >> > >> > the bundle into two new bundles and each new bundle contains an
> >> equally
> >> > >> > `outstanding partition` once a time.
> >> > >> > E.g, a bundle contains lots of topic partitions, and only one
> >> > >> `outstanding
> >> > >> > topic`(T) with 2  `outstanding partition` (T-partition-n,
> >> > >> Tpartition-n+1).
> >> > >> > This algorithm split this bundle at the middle point of these two
> >> > >> > partition's hashcode.  This algorithm has a disadvantage, it can
> >> only
> >> > >> deal
> >> > >> > with one `outstanding topic`.
> >> > >> >
> >> > >> > So we raised up another algorithm.
> >> > >> >
> >> > >> > The other algorithm is to split the bundle at the hashcode point
> >> of the
> >> > >> > `outstanding partition` which will split the bundle into three
> >> bundles
> >> > >> once
> >> > >> > a time. The middle one contains the only point the hashcode of the
> >> > >> > `outstanding partition, the left one is less than the hashcode, the
> >> > >> right
> >> > >> > one is more than the hashcode.
> >> > >> > E.g. if we have a bundle 0x00_0x10 contains two `outstanding
> >> partition`
> >> > >> > (partition-x and partition-y) whose hashcode is 0x03 and 0x07, this
> >> > >> > algorithm  is going to split bundle the bundle into five new
> >> bundles,
> >> > >> > 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08( for
> >> > >> > partition-y), 0x08_0x10.
> >> > >> >
> >> > >> > ## API Changes
> >> > >> >
> >> > >> > The Admin CLI `bin/pulsar-admin namespaces split-bundle -b
> >> > >> ${bundle_range}`
> >> > >> > will add a new parameter "--topic" or "-t" for  `outstanding topic`
> >> > >> name.
> >> > >> >
> >> > >> > The split interface changed from
> >> > >> >
> >> > >> > ```JAVA
> >> > >> > void splitNamespaceBundle(String namespace, String bundle, boolean
> >> > >> > unloadSplitBundles, String splitAlgorithmName)throws
> >> > >> PulsarAdminException;
> >> > >> > ```
> >> > >> >
> >> > >> > to
> >> > >> >
> >> > >> > ```java
> >> > >> > void splitNamespaceBundle(String namespace, String bundle, boolean
> >> > >> > unloadSplitBundles,
> >> > >> >                               String splitAlgorithmName, String
> >> topic)
> >> > >> > throws PulsarAdminException;
> >> > >> > ```
> >> > >> >
> >> > >> > ## Implementation
> >> > >> >
> >> > >> > There are changes both from the Admin CLI and the broker side.
> >> > >> >
> >> > >> > First, Admin CLI for split bundle should support to specify the
> >> > >> > `outstanding topic`,
> >> > >> >
> >> > >> > ```java
> >> > >> > /**
> >> > >> >      * Split namespace bundle.
> >> > >> >      *
> >> > >> >      * @param namespace
> >> > >> >      * @param bundle range of bundle to split
> >> > >> >      * @param unloadSplitBundles
> >> > >> >      * @param splitAlgorithmName
> >> > >> >      * @param topic
> >> > >> >      * @throws PulsarAdminException
> >> > >> >      */
> >> > >> >     void splitNamespaceBundle(String namespace, String bundle,
> >> boolean
> >> > >> > unloadSplitBundles,
> >> > >> >                               String splitAlgorithmName, String
> >> topic)
> >> > >> > throws PulsarAdminException;
> >> > >> >
> >> > >> > ```
> >> > >> >
> >> > >> > ```java
> >> > >> > /**
> >> > >> >      * Split namespace bundle asynchronously.
> >> > >> >      *
> >> > >> >      * @param namespace
> >> > >> >      * @param bundle range of bundle to split
> >> > >> >      * @param unloadSplitBundles
> >> > >> >      * @param splitAlgorithmName
> >> > >> >      */
> >> > >> >     CompletableFuture<Void> splitNamespaceBundleAsync(
> >> > >> >             String namespace, String bundle, boolean
> >> unloadSplitBundles,
> >> > >> > String splitAlgorithmName, String topic);
> >> > >> > ```
> >> > >> >
> >> > >> > And for the broker side, first encapsulates the parameters for
> >> bundle
> >> > >> split
> >> > >> > into a new class `BundleSplitOption`
> >> > >> >
> >> > >> > ```java
> >> > >> > public class BundleSplitOption {
> >> > >> >     private NamespaceService service;
> >> > >> >     private NamespaceBundle bundle;
> >> > >> >     private String topic;
> >> > >> > }
> >> > >> > ```
> >> > >> >
> >> > >> > add a new split algorithm
> >> > >> >
> >> > >> > ```java
> >> > >> > ublic class SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm
> >> > >> implements
> >> > >> > NamespaceBundleSplitAlgorithm {
> >> > >> >     @Override
> >> > >> >     public CompletableFuture<List<Long>>
> >> > >> getSplitBoundary(BundleSplitOption
> >> > >> > bundleSplitOption) {
> >> > >> >
> >> > >> >         });
> >> > >> >     }
> >> > >> > }
> >> > >> > ```
> >> > >> >
> >> > >> > add the new algorithm to `NamespaceBundleSplitAlgorithm`
> >> > >> >
> >> > >> > ```JAVA
> >> > >> > String SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE =
> >> > >> > "specified_topic_count_equally_divide";
> >> > >> >
> >> > >> > List<String> AVAILABLE_ALGORITHMS =
> >> > >> > Lists.newArrayList(RANGE_EQUALLY_DIVIDE_NAME,
> >> > >> >             TOPIC_COUNT_EQUALLY_DIVIDE,
> >> > >> > SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE);
> >> > >> >
> >> > >> >  NamespaceBundleSplitAlgorithm
> >> > >> SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE_ALGO =
> >> > >> >             new
> >> SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm();
> >> > >> > ```
> >> > >> >
> >> > >> > modify the `splitAndOwnBundle` and `splitAndOwnBundleOnceAndRetry`
> >> for
> >> > >> >  [[NamespaceService.java](
> >> > >> >
> >> > >>
> >> https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1)](https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1
> >> > >> )
> >> > >> >
> >> > >> >
> >> > >> > ```java
> >> > >> > public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle
> >> bundle,
> >> > >> > boolean unload,
> >> > >> >
> >> > >> >  NamespaceBundleSplitAlgorithm splitAlgorithm, String topic) {
> >> > >> >
> >> > >> >         final CompletableFuture<Void> unloadFuture = new
> >> > >> > CompletableFuture<>();
> >> > >> >         final AtomicInteger counter = new
> >> > >> > AtomicInteger(BUNDLE_SPLIT_RETRY_LIMIT);
> >> > >> >         splitAndOwnBundleOnceAndRetry(bundle, unload, counter,
> >> > >> > unloadFuture, splitAlgorithm, topic);
> >> > >> >
> >> > >> >         return unloadFuture;
> >> > >> >     }
> >> > >> > ```
> >> > >> >
> >> > >> > ```java
> >> > >> > void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
> >> > >> >                                        boolean unload,
> >> > >> >                                        AtomicInteger counter,
> >> > >> >                                        CompletableFuture<Void>
> >> > >> > completionFuture,
> >> > >> >
> >> NamespaceBundleSplitAlgorithm
> >> > >> > splitAlgorithm,
> >> > >> >                                        String topic) {
> >> > >> > ```
> >> > >> >
> >> > >> > Also, we change the REST api and broker.conf
> >> > >> >
> >> > >> > ```java
> >> > >> > public void splitNamespaceBundle(
> >> > >> >             @Suspended final AsyncResponse asyncResponse,
> >> > >> >             @PathParam("property") String property,
> >> > >> >             @PathParam("cluster") String cluster,
> >> > >> >             @PathParam("namespace") String namespace,
> >> > >> >             @PathParam("bundle") String bundleRange,
> >> > >> >             @QueryParam("authoritative") @DefaultValue("false")
> >> boolean
> >> > >> > authoritative,
> >> > >> >             @QueryParam("unload") @DefaultValue("false") boolean
> >> unload,
> >> > >> >             @QueryParam("topic") @DefaultValue("") String topic) {}
> >> > >> > ```
> >> > >> >
> >> > >> > ```shell
> >> > >> >
> >> > >>
> >> supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_topic_count_equally_divide
> >> > >> > ```
> >> > >> >
> >> > >>
> >> > >
> >> >
> >>
> >
> 

Reply via email to