Hi all,
Since there are no more suggestions for more than 72 hours, I have updated
the PIP  https://github.com/apache/pulsar/issues/13761 according to the
discussion.
And then will send a vote thread soon.

Aloys Zhang <aloyszh...@apache.org> 于2022年2月22日周二 10:38写道:

> >Should this be "positions"? We are going to split one bundle into
> multi-bundles,
> in most cases, bundle number will be position number + 1, right?
>
> Sure, it should be “positions” or “positionList”
>
> PengHui Li <peng...@apache.org> 于2022年2月21日周一 20:50写道:
>
>> > And the (anti-)affinity way needs more discussion or maybe we can
>> introduce
>> a new PIP for it.
>>
>> +1
>>
>> Thanks,
>> Penghui
>>
>> On Mon, Feb 21, 2022 at 6:47 PM Haiting Jiang <jianghait...@apache.org>
>> wrote:
>>
>> > > 2. calculate the position to split this bundle(also need a new API)
>> > Should this be "positions"? We are going to split one bundle into
>> > multi-bundles,
>> > in most cases, bundle number will be position number + 1, right?
>> >
>> > > And the (anti-)affinity way needs more discussion or maybe we can
>> > introduce
>> > > a new PIP for it.
>> > +1, this is not in the scope of this PIP.
>> >
>> >
>> > Thanks,
>> > Haiting
>> >
>> > On 2022/02/21 08:57:01 Aloys Zhang wrote:
>> > > Hi penghui and haiting,
>> > >
>> > > I try to figure out how the (anti-)affinity works.
>> > >
>> > > > if I understand correctly, it looks like if we have a partitioned
>> topic
>> > > with 10
>> > > > partitions under a namespace with 16 bundles, if applies the
>> > > anti-affinity policy,
>> > > > partition-0 map to bundle 0, partition-1 map to bundle 1, and so on.
>> > > > Of course, it is not necessary for every partitioned topic to start
>> > from
>> > > bundle 0,
>> > > > we can use the partition-0 hash to determine the start bundle index.
>> > >
>> > > As penghui described, I think this is a mechanism for assigning topics
>> > to a
>> > > bundle that controls how a topic is mapped to a bundle.
>> > >
>> > > > IMO, this affinity serves the purpose of isolating an abnormal
>> topic to
>> > > some spare
>> > > > brokers.  These brokers host these kind of topics only. Here are
>> some
>> > > cases :
>> > >
>> > > And as haiting mentioned here, It's more like an isolation policy that
>> > > decides topics can be owned by which broker.
>> > > So, I am a little confused about how the (anti-)affinity works now.
>> > >
>> > > Back to this PIP which aims to solve the problem that a small number
>> of
>> > > topics in a bundle have a load that exceeds the average.
>> > > So, we can
>> > > 1. get the positions for the topics with we are interested( need a new
>> > API)
>> > > 2. calculate the position to split this bundle(also need a new API)
>> > > I think this way is enough for solving the problem.
>> > >
>> > > And the (anti-)affinity way needs more discussion or maybe we can
>> > introduce
>> > > a new PIP for it.
>> > > What do you think?@penghui @haiting
>> > >
>> > >
>> > > Thanks,
>> > > Aloys
>> > >
>> > > Aloys Zhang <aloyszh...@apache.org> 于2022年2月21日周一 14:39写道:
>> > >
>> > > > Hi, penghui
>> > > >
>> > > > >  The new API does not necessarily have to query by topic one by
>> one,
>> > > > we have listed all the "topic -> position" of a bundle?
>> > > >
>> > > > I see. After we got all the positions of the topics we want to split
>> > in a
>> > > > bundle, it's quite easy for us to decide how to it.
>> > > >
>> > > > Haiting Jiang <jianghait...@apache.org> 于2022年2月20日周日 12:05写道:
>> > > >
>> > > >> > Do you have an example for affinity? I don't fully understand how
>> > this
>> > > >> is
>> > > >> > used
>> > > >> > in practice.
>> > > >>
>> > > >> IMO, this affinity serves the purpose of isolating an abnormal
>> topic
>> > to
>> > > >> some spare
>> > > >> brokers.  These brokers host these kind of topics only. Here are
>> some
>> > > >> cases :
>> > > >>
>> > > >> 1. A topic may have unexpected short spike traffic flows
>> periodically
>> > and
>> > > >> causing broker overloads and negative impact on other topics.
>> > > >> Until we have more proper solutions, we can always isolate these
>> > topics
>> > > >> first,
>> > > >>  and make the service recover time as small as possible.
>> > > >>
>> > > >> 2. Some users may encounter some bugs in brokers, and we can
>> isolate
>> > the
>> > > >> topic to
>> > > >> exclusive brokers, and use more radical approach to locate the bug,
>> > like
>> > > >> enable debug
>> > > >> level logs or even add some temporary code patch.
>> > > >>
>> > > >> 3. User may already have configured failure domain and
>> anti-affinity
>> > > >> namespace, but with
>> > > >> business logic code changes, some topic may need to migrate from
>> one
>> > > >> namespace
>> > > >> to another. This will take some time for user to change the client
>> > side
>> > > >> config.
>> > > >> In the meanwhile, we can isolate the topic first.
>> > > >>
>> > > >> Thanks,
>> > > >> Haiting
>> > > >>
>> > > >> On 2022/02/18 15:26:09 PengHui Li wrote:
>> > > >> > Hi Haiting,
>> > > >> >
>> > > >> > > I think this approach have more potential with abnormal topic
>> > > >> isolation.
>> > > >> > If we can introduce
>> > > >> > some kind of bundle isolation strategy, (like broker-bundle
>> > affinity and
>> > > >> > anti-affinity mechanism), we can easily isolate some unexpected
>> > traffic
>> > > >> to
>> > > >> > some empty brokers.
>> > > >> > IMO, this would improve the stability of broker cluster.
>> > > >> >
>> > > >> > if I understand correctly, it looks like if we have a partitioned
>> > topic
>> > > >> > with 10
>> > > >> > partitions under a namespace with 16 bundles, if applies the
>> > > >> anti-affinity
>> > > >> > policy,
>> > > >> > partition-0 map to bundle 0, partition-1 map to bundle 1, and so
>> on.
>> > > >> > Of course, it is not necessary for every partitioned topic to
>> start
>> > from
>> > > >> > bundle 0,
>> > > >> > we can use the partition-0 hash to determine the start bundle
>> index.
>> > > >> >
>> > > >> > Do you have an example for affinity? I don't fully understand how
>> > this
>> > > >> is
>> > > >> > used
>> > > >> > in practice.
>> > > >> >
>> > > >> > Best,
>> > > >> > Penghui
>> > > >> >
>> > > >> > On Fri, Feb 18, 2022 at 11:16 PM PengHui Li <peng...@apache.org>
>> > wrote:
>> > > >> >
>> > > >> > > Hi Aloys,
>> > > >> > >
>> > > >> > > >  Do you mean that
>> > > >> > > 1. First, add a new API, maybe `getHashPositioin`,  to get the
>> > hash
>> > > >> > > position in a bundle
>> > > >> > > 2. Then use this position to split the overloaded bundle
>> > > >> > > If so, when we split a bundle with multi partitions of a
>> topic, we
>> > > >> need to
>> > > >> > > call the `getHashPositioin` multi times to get the middle
>> > position of
>> > > >> all
>> > > >> > > these positions.
>> > > >> > >
>> > > >> > > Yes, this want I mean. In this way, users can control to
>> assign 1
>> > > >> topic or
>> > > >> > > 3 topics to one bundle. This is more like increasing the
>> > transparency
>> > > >> of
>> > > >> > > the topic in the bundle, you can all the positions of the
>> topics,
>> > so
>> > > >> how
>> > > >> > > planning for bundle splitting becomes more flexible.
>> > > >> > >
>> > > >> > > The new API does not necessarily have to query by topic one by
>> > one,
>> > > >> > > we have listed all the "topic -> position" of a bundle?
>> > > >> > >
>> > > >> > > Thanks,
>> > > >> > > Penghui
>> > > >> > >
>> > > >> > > On Fri, Feb 18, 2022 at 4:51 PM Haiting Jiang <
>> > > >> jianghait...@apache.org>
>> > > >> > > wrote:
>> > > >> > >
>> > > >> > >> Hi Aloys,
>> > > >> > >> +1 for this great PIP.
>> > > >> > >>
>> > > >> > >> > The Admin CLI `bin/pulsar-admin namespaces split-bundle -b
>> > > >> > >> ${bundle_range}`
>> > > >> > >> > will add a new parameter "--topic" or "-t" for  `outstanding
>> > topic`
>> > > >> > >> name.
>> > > >> > >>
>> > > >> > >> Do we have limitation on this "topic" parameter. Can this be a
>> > > >> > >> partitioned topic?
>> > > >> > >> If so, will this new algorithm split the bundle into more
>> than 2
>> > > >> bundles?
>> > > >> > >> like each bundle for
>> > > >> > >> one partition.
>> > > >> > >>
>> > > >> > >> > This algorithm has a disadvantage, it can only deal
>> > > >> > >> > with one `outstanding topic`.
>> > > >> > >>
>> > > >> > >> For this disadvantage, I think it can be solved by extends the
>> > > >> "topic"
>> > > >> > >> parameter from one topic to a topic list.
>> > > >> > >>
>> > > >> > >> > The other algorithm is to split the bundle at the hashcode
>> > point
>> > > >> of the
>> > > >> > >> > `outstanding partition` which will split the bundle into
>> three
>> > > >> bundles
>> > > >> > >> once
>> > > >> > >> > a time. The middle one contains the only point the hashcode
>> of
>> > the
>> > > >> > >> > `outstanding partition, the left one is less than the
>> > hashcode, the
>> > > >> > >> right
>> > > >> > >> > one is more than the hashcode.
>> > > >> > >> > E.g. if we have a bundle 0x00_0x10 contains two `outstanding
>> > > >> partition`
>> > > >> > >> > (partition-x and partition-y) whose hashcode is 0x03 and
>> 0x07,
>> > this
>> > > >> > >> > algorithm  is going to split bundle the bundle into five new
>> > > >> bundles,
>> > > >> > >> > 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08(
>> > for
>> > > >> > >> > partition-y), 0x08_0x10.
>> > > >> > >>
>> > > >> > >> I think this approach have more potential with abnormal topic
>> > > >> isolation.
>> > > >> > >> If we can introduce
>> > > >> > >> some kind of bundle isolation strategy, (like broker-bundle
>> > affinity
>> > > >> and
>> > > >> > >> anti-affinity mechanism), we can easily isolate some
>> unexpected
>> > > >> traffic to
>> > > >> > >> some empty brokers.
>> > > >> > >> IMO, this would improve the stability of broker cluster.
>> > > >> > >>
>> > > >> > >> Thanks,
>> > > >> > >> Haiting
>> > > >> > >>
>> > > >> > >> On 2022/02/17 15:47:15 Aloys Zhang wrote:
>> > > >> > >> > Hi Pulsar Community,
>> > > >> > >> >
>> > > >> > >> > This is a PIP discussion on how to support split partitions
>> > > >> belonging to
>> > > >> > >> > specified topics in a bundle.
>> > > >> > >> >
>> > > >> > >> > The issue can be found:
>> > > >> https://github.com/apache/pulsar/issues/13761
>> > > >> > >> >
>> > > >> > >> > I copy the content here for convenience, any suggestions are
>> > > >> welcome and
>> > > >> > >> > appreciated.
>> > > >> > >> >
>> > > >> > >> >
>> > > >> > >> > ## Motivation
>> > > >> > >> >
>> > > >> > >> > As we all know, a namespace bundle may contain lots of
>> > partitions
>> > > >> > >> belonging
>> > > >> > >> > to different topics.
>> > > >> > >> > The throughput of these topics may vary greatly. Some topics
>> > may
>> > > >> with
>> > > >> > >> very
>> > > >> > >> > high rate/throughput while other topics have a very low
>> > > >> rate/throughput.
>> > > >> > >> >
>> > > >> > >> > These partitions with high rate/throughput can cause broker
>> > > >> overload and
>> > > >> > >> > bundle unloading.
>> > > >> > >> > At this point, if we split bundle manually with
>> > > >> `range_equally_divide`
>> > > >> > >> or
>> > > >> > >> > `topic_count_equally_divide` split algorithm, there may need
>> > many
>> > > >> times
>> > > >> > >> > split before these high rate/through partitions assigned to
>> > > >> different
>> > > >> > >> new
>> > > >> > >> > bundles.
>> > > >> > >> >
>> > > >> > >> > For convenience, we call these high throughput topics
>> > `outstanding
>> > > >> > >> topic`
>> > > >> > >> > and their partitions `outstanding partition` in this PIP.
>> > > >> > >> >
>> > > >> > >> > ## Goal
>> > > >> > >> >
>> > > >> > >> > Our goal is to make it easier to split `outstanding
>> partition`
>> > > >> into new
>> > > >> > >> > bundles.
>> > > >> > >> >
>> > > >> > >> > There are two alternative ways to achieve this. Either of
>> them
>> > > >> will add
>> > > >> > >> a
>> > > >> > >> > new algorithm for bundle split. The difference is how the
>> new
>> > > >> bundle
>> > > >> > >> split
>> > > >> > >> > algorithm is implemented.
>> > > >> > >> >
>> > > >> > >> > One algorithm is to split bundle by `outstanding topic`
>> which
>> > will
>> > > >> split
>> > > >> > >> > the bundle into two new bundles and each new bundle
>> contains an
>> > > >> equally
>> > > >> > >> > `outstanding partition` once a time.
>> > > >> > >> > E.g, a bundle contains lots of topic partitions, and only
>> one
>> > > >> > >> `outstanding
>> > > >> > >> > topic`(T) with 2  `outstanding partition` (T-partition-n,
>> > > >> > >> Tpartition-n+1).
>> > > >> > >> > This algorithm split this bundle at the middle point of
>> these
>> > two
>> > > >> > >> > partition's hashcode.  This algorithm has a disadvantage, it
>> > can
>> > > >> only
>> > > >> > >> deal
>> > > >> > >> > with one `outstanding topic`.
>> > > >> > >> >
>> > > >> > >> > So we raised up another algorithm.
>> > > >> > >> >
>> > > >> > >> > The other algorithm is to split the bundle at the hashcode
>> > point
>> > > >> of the
>> > > >> > >> > `outstanding partition` which will split the bundle into
>> three
>> > > >> bundles
>> > > >> > >> once
>> > > >> > >> > a time. The middle one contains the only point the hashcode
>> of
>> > the
>> > > >> > >> > `outstanding partition, the left one is less than the
>> > hashcode, the
>> > > >> > >> right
>> > > >> > >> > one is more than the hashcode.
>> > > >> > >> > E.g. if we have a bundle 0x00_0x10 contains two `outstanding
>> > > >> partition`
>> > > >> > >> > (partition-x and partition-y) whose hashcode is 0x03 and
>> 0x07,
>> > this
>> > > >> > >> > algorithm  is going to split bundle the bundle into five new
>> > > >> bundles,
>> > > >> > >> > 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08(
>> > for
>> > > >> > >> > partition-y), 0x08_0x10.
>> > > >> > >> >
>> > > >> > >> > ## API Changes
>> > > >> > >> >
>> > > >> > >> > The Admin CLI `bin/pulsar-admin namespaces split-bundle -b
>> > > >> > >> ${bundle_range}`
>> > > >> > >> > will add a new parameter "--topic" or "-t" for  `outstanding
>> > topic`
>> > > >> > >> name.
>> > > >> > >> >
>> > > >> > >> > The split interface changed from
>> > > >> > >> >
>> > > >> > >> > ```JAVA
>> > > >> > >> > void splitNamespaceBundle(String namespace, String bundle,
>> > boolean
>> > > >> > >> > unloadSplitBundles, String splitAlgorithmName)throws
>> > > >> > >> PulsarAdminException;
>> > > >> > >> > ```
>> > > >> > >> >
>> > > >> > >> > to
>> > > >> > >> >
>> > > >> > >> > ```java
>> > > >> > >> > void splitNamespaceBundle(String namespace, String bundle,
>> > boolean
>> > > >> > >> > unloadSplitBundles,
>> > > >> > >> >                               String splitAlgorithmName,
>> String
>> > > >> topic)
>> > > >> > >> > throws PulsarAdminException;
>> > > >> > >> > ```
>> > > >> > >> >
>> > > >> > >> > ## Implementation
>> > > >> > >> >
>> > > >> > >> > There are changes both from the Admin CLI and the broker
>> side.
>> > > >> > >> >
>> > > >> > >> > First, Admin CLI for split bundle should support to specify
>> the
>> > > >> > >> > `outstanding topic`,
>> > > >> > >> >
>> > > >> > >> > ```java
>> > > >> > >> > /**
>> > > >> > >> >      * Split namespace bundle.
>> > > >> > >> >      *
>> > > >> > >> >      * @param namespace
>> > > >> > >> >      * @param bundle range of bundle to split
>> > > >> > >> >      * @param unloadSplitBundles
>> > > >> > >> >      * @param splitAlgorithmName
>> > > >> > >> >      * @param topic
>> > > >> > >> >      * @throws PulsarAdminException
>> > > >> > >> >      */
>> > > >> > >> >     void splitNamespaceBundle(String namespace, String
>> bundle,
>> > > >> boolean
>> > > >> > >> > unloadSplitBundles,
>> > > >> > >> >                               String splitAlgorithmName,
>> String
>> > > >> topic)
>> > > >> > >> > throws PulsarAdminException;
>> > > >> > >> >
>> > > >> > >> > ```
>> > > >> > >> >
>> > > >> > >> > ```java
>> > > >> > >> > /**
>> > > >> > >> >      * Split namespace bundle asynchronously.
>> > > >> > >> >      *
>> > > >> > >> >      * @param namespace
>> > > >> > >> >      * @param bundle range of bundle to split
>> > > >> > >> >      * @param unloadSplitBundles
>> > > >> > >> >      * @param splitAlgorithmName
>> > > >> > >> >      */
>> > > >> > >> >     CompletableFuture<Void> splitNamespaceBundleAsync(
>> > > >> > >> >             String namespace, String bundle, boolean
>> > > >> unloadSplitBundles,
>> > > >> > >> > String splitAlgorithmName, String topic);
>> > > >> > >> > ```
>> > > >> > >> >
>> > > >> > >> > And for the broker side, first encapsulates the parameters
>> for
>> > > >> bundle
>> > > >> > >> split
>> > > >> > >> > into a new class `BundleSplitOption`
>> > > >> > >> >
>> > > >> > >> > ```java
>> > > >> > >> > public class BundleSplitOption {
>> > > >> > >> >     private NamespaceService service;
>> > > >> > >> >     private NamespaceBundle bundle;
>> > > >> > >> >     private String topic;
>> > > >> > >> > }
>> > > >> > >> > ```
>> > > >> > >> >
>> > > >> > >> > add a new split algorithm
>> > > >> > >> >
>> > > >> > >> > ```java
>> > > >> > >> > ublic class
>> > SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm
>> > > >> > >> implements
>> > > >> > >> > NamespaceBundleSplitAlgorithm {
>> > > >> > >> >     @Override
>> > > >> > >> >     public CompletableFuture<List<Long>>
>> > > >> > >> getSplitBoundary(BundleSplitOption
>> > > >> > >> > bundleSplitOption) {
>> > > >> > >> >
>> > > >> > >> >         });
>> > > >> > >> >     }
>> > > >> > >> > }
>> > > >> > >> > ```
>> > > >> > >> >
>> > > >> > >> > add the new algorithm to `NamespaceBundleSplitAlgorithm`
>> > > >> > >> >
>> > > >> > >> > ```JAVA
>> > > >> > >> > String SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE =
>> > > >> > >> > "specified_topic_count_equally_divide";
>> > > >> > >> >
>> > > >> > >> > List<String> AVAILABLE_ALGORITHMS =
>> > > >> > >> > Lists.newArrayList(RANGE_EQUALLY_DIVIDE_NAME,
>> > > >> > >> >             TOPIC_COUNT_EQUALLY_DIVIDE,
>> > > >> > >> > SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE);
>> > > >> > >> >
>> > > >> > >> >  NamespaceBundleSplitAlgorithm
>> > > >> > >> SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE_ALGO =
>> > > >> > >> >             new
>> > > >> SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm();
>> > > >> > >> > ```
>> > > >> > >> >
>> > > >> > >> > modify the `splitAndOwnBundle` and
>> > `splitAndOwnBundleOnceAndRetry`
>> > > >> for
>> > > >> > >> >  [[NamespaceService.java](
>> > > >> > >> >
>> > > >> > >>
>> > > >>
>> >
>> https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1)](https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1
>> > > >> > >> )
>> > > >> > >> >
>> > > >> > >> >
>> > > >> > >> > ```java
>> > > >> > >> > public CompletableFuture<Void>
>> > splitAndOwnBundle(NamespaceBundle
>> > > >> bundle,
>> > > >> > >> > boolean unload,
>> > > >> > >> >
>> > > >> > >> >  NamespaceBundleSplitAlgorithm splitAlgorithm, String
>> topic) {
>> > > >> > >> >
>> > > >> > >> >         final CompletableFuture<Void> unloadFuture = new
>> > > >> > >> > CompletableFuture<>();
>> > > >> > >> >         final AtomicInteger counter = new
>> > > >> > >> > AtomicInteger(BUNDLE_SPLIT_RETRY_LIMIT);
>> > > >> > >> >         splitAndOwnBundleOnceAndRetry(bundle, unload,
>> counter,
>> > > >> > >> > unloadFuture, splitAlgorithm, topic);
>> > > >> > >> >
>> > > >> > >> >         return unloadFuture;
>> > > >> > >> >     }
>> > > >> > >> > ```
>> > > >> > >> >
>> > > >> > >> > ```java
>> > > >> > >> > void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
>> > > >> > >> >                                        boolean unload,
>> > > >> > >> >                                        AtomicInteger
>> counter,
>> > > >> > >> >
>> CompletableFuture<Void>
>> > > >> > >> > completionFuture,
>> > > >> > >> >
>> > > >> NamespaceBundleSplitAlgorithm
>> > > >> > >> > splitAlgorithm,
>> > > >> > >> >                                        String topic) {
>> > > >> > >> > ```
>> > > >> > >> >
>> > > >> > >> > Also, we change the REST api and broker.conf
>> > > >> > >> >
>> > > >> > >> > ```java
>> > > >> > >> > public void splitNamespaceBundle(
>> > > >> > >> >             @Suspended final AsyncResponse asyncResponse,
>> > > >> > >> >             @PathParam("property") String property,
>> > > >> > >> >             @PathParam("cluster") String cluster,
>> > > >> > >> >             @PathParam("namespace") String namespace,
>> > > >> > >> >             @PathParam("bundle") String bundleRange,
>> > > >> > >> >             @QueryParam("authoritative")
>> @DefaultValue("false")
>> > > >> boolean
>> > > >> > >> > authoritative,
>> > > >> > >> >             @QueryParam("unload") @DefaultValue("false")
>> > boolean
>> > > >> unload,
>> > > >> > >> >             @QueryParam("topic") @DefaultValue("") String
>> > topic) {}
>> > > >> > >> > ```
>> > > >> > >> >
>> > > >> > >> > ```shell
>> > > >> > >> >
>> > > >> > >>
>> > > >>
>> >
>> supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_topic_count_equally_divide
>> > > >> > >> > ```
>> > > >> > >> >
>> > > >> > >>
>> > > >> > >
>> > > >> >
>> > > >>
>> > > >
>> > >
>> >
>>
>

Reply via email to