Hi, penghui > The new API does not necessarily have to query by topic one by one, we have listed all the "topic -> position" of a bundle?
I see. After we got all the positions of the topics we want to split in a bundle, it's quite easy for us to decide how to it. Haiting Jiang <jianghait...@apache.org> 于2022年2月20日周日 12:05写道: > > Do you have an example for affinity? I don't fully understand how this is > > used > > in practice. > > IMO, this affinity serves the purpose of isolating an abnormal topic to > some spare > brokers. These brokers host these kind of topics only. Here are some > cases : > > 1. A topic may have unexpected short spike traffic flows periodically and > causing broker overloads and negative impact on other topics. > Until we have more proper solutions, we can always isolate these topics > first, > and make the service recover time as small as possible. > > 2. Some users may encounter some bugs in brokers, and we can isolate the > topic to > exclusive brokers, and use more radical approach to locate the bug, like > enable debug > level logs or even add some temporary code patch. > > 3. User may already have configured failure domain and anti-affinity > namespace, but with > business logic code changes, some topic may need to migrate from one > namespace > to another. This will take some time for user to change the client side > config. > In the meanwhile, we can isolate the topic first. > > Thanks, > Haiting > > On 2022/02/18 15:26:09 PengHui Li wrote: > > Hi Haiting, > > > > > I think this approach have more potential with abnormal topic > isolation. > > If we can introduce > > some kind of bundle isolation strategy, (like broker-bundle affinity and > > anti-affinity mechanism), we can easily isolate some unexpected traffic > to > > some empty brokers. > > IMO, this would improve the stability of broker cluster. > > > > if I understand correctly, it looks like if we have a partitioned topic > > with 10 > > partitions under a namespace with 16 bundles, if applies the > anti-affinity > > policy, > > partition-0 map to bundle 0, partition-1 map to bundle 1, and so on. > > Of course, it is not necessary for every partitioned topic to start from > > bundle 0, > > we can use the partition-0 hash to determine the start bundle index. > > > > Do you have an example for affinity? I don't fully understand how this is > > used > > in practice. > > > > Best, > > Penghui > > > > On Fri, Feb 18, 2022 at 11:16 PM PengHui Li <peng...@apache.org> wrote: > > > > > Hi Aloys, > > > > > > > Do you mean that > > > 1. First, add a new API, maybe `getHashPositioin`, to get the hash > > > position in a bundle > > > 2. Then use this position to split the overloaded bundle > > > If so, when we split a bundle with multi partitions of a topic, we > need to > > > call the `getHashPositioin` multi times to get the middle position of > all > > > these positions. > > > > > > Yes, this want I mean. In this way, users can control to assign 1 > topic or > > > 3 topics to one bundle. This is more like increasing the transparency > of > > > the topic in the bundle, you can all the positions of the topics, so > how > > > planning for bundle splitting becomes more flexible. > > > > > > The new API does not necessarily have to query by topic one by one, > > > we have listed all the "topic -> position" of a bundle? > > > > > > Thanks, > > > Penghui > > > > > > On Fri, Feb 18, 2022 at 4:51 PM Haiting Jiang <jianghait...@apache.org > > > > > wrote: > > > > > >> Hi Aloys, > > >> +1 for this great PIP. > > >> > > >> > The Admin CLI `bin/pulsar-admin namespaces split-bundle -b > > >> ${bundle_range}` > > >> > will add a new parameter "--topic" or "-t" for `outstanding topic` > > >> name. > > >> > > >> Do we have limitation on this "topic" parameter. Can this be a > > >> partitioned topic? > > >> If so, will this new algorithm split the bundle into more than 2 > bundles? > > >> like each bundle for > > >> one partition. > > >> > > >> > This algorithm has a disadvantage, it can only deal > > >> > with one `outstanding topic`. > > >> > > >> For this disadvantage, I think it can be solved by extends the "topic" > > >> parameter from one topic to a topic list. > > >> > > >> > The other algorithm is to split the bundle at the hashcode point of > the > > >> > `outstanding partition` which will split the bundle into three > bundles > > >> once > > >> > a time. The middle one contains the only point the hashcode of the > > >> > `outstanding partition, the left one is less than the hashcode, the > > >> right > > >> > one is more than the hashcode. > > >> > E.g. if we have a bundle 0x00_0x10 contains two `outstanding > partition` > > >> > (partition-x and partition-y) whose hashcode is 0x03 and 0x07, this > > >> > algorithm is going to split bundle the bundle into five new > bundles, > > >> > 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08( for > > >> > partition-y), 0x08_0x10. > > >> > > >> I think this approach have more potential with abnormal topic > isolation. > > >> If we can introduce > > >> some kind of bundle isolation strategy, (like broker-bundle affinity > and > > >> anti-affinity mechanism), we can easily isolate some unexpected > traffic to > > >> some empty brokers. > > >> IMO, this would improve the stability of broker cluster. > > >> > > >> Thanks, > > >> Haiting > > >> > > >> On 2022/02/17 15:47:15 Aloys Zhang wrote: > > >> > Hi Pulsar Community, > > >> > > > >> > This is a PIP discussion on how to support split partitions > belonging to > > >> > specified topics in a bundle. > > >> > > > >> > The issue can be found: > https://github.com/apache/pulsar/issues/13761 > > >> > > > >> > I copy the content here for convenience, any suggestions are > welcome and > > >> > appreciated. > > >> > > > >> > > > >> > ## Motivation > > >> > > > >> > As we all know, a namespace bundle may contain lots of partitions > > >> belonging > > >> > to different topics. > > >> > The throughput of these topics may vary greatly. Some topics may > with > > >> very > > >> > high rate/throughput while other topics have a very low > rate/throughput. > > >> > > > >> > These partitions with high rate/throughput can cause broker > overload and > > >> > bundle unloading. > > >> > At this point, if we split bundle manually with > `range_equally_divide` > > >> or > > >> > `topic_count_equally_divide` split algorithm, there may need many > times > > >> > split before these high rate/through partitions assigned to > different > > >> new > > >> > bundles. > > >> > > > >> > For convenience, we call these high throughput topics `outstanding > > >> topic` > > >> > and their partitions `outstanding partition` in this PIP. > > >> > > > >> > ## Goal > > >> > > > >> > Our goal is to make it easier to split `outstanding partition` into > new > > >> > bundles. > > >> > > > >> > There are two alternative ways to achieve this. Either of them will > add > > >> a > > >> > new algorithm for bundle split. The difference is how the new bundle > > >> split > > >> > algorithm is implemented. > > >> > > > >> > One algorithm is to split bundle by `outstanding topic` which will > split > > >> > the bundle into two new bundles and each new bundle contains an > equally > > >> > `outstanding partition` once a time. > > >> > E.g, a bundle contains lots of topic partitions, and only one > > >> `outstanding > > >> > topic`(T) with 2 `outstanding partition` (T-partition-n, > > >> Tpartition-n+1). > > >> > This algorithm split this bundle at the middle point of these two > > >> > partition's hashcode. This algorithm has a disadvantage, it can > only > > >> deal > > >> > with one `outstanding topic`. > > >> > > > >> > So we raised up another algorithm. > > >> > > > >> > The other algorithm is to split the bundle at the hashcode point of > the > > >> > `outstanding partition` which will split the bundle into three > bundles > > >> once > > >> > a time. The middle one contains the only point the hashcode of the > > >> > `outstanding partition, the left one is less than the hashcode, the > > >> right > > >> > one is more than the hashcode. > > >> > E.g. if we have a bundle 0x00_0x10 contains two `outstanding > partition` > > >> > (partition-x and partition-y) whose hashcode is 0x03 and 0x07, this > > >> > algorithm is going to split bundle the bundle into five new > bundles, > > >> > 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08( for > > >> > partition-y), 0x08_0x10. > > >> > > > >> > ## API Changes > > >> > > > >> > The Admin CLI `bin/pulsar-admin namespaces split-bundle -b > > >> ${bundle_range}` > > >> > will add a new parameter "--topic" or "-t" for `outstanding topic` > > >> name. > > >> > > > >> > The split interface changed from > > >> > > > >> > ```JAVA > > >> > void splitNamespaceBundle(String namespace, String bundle, boolean > > >> > unloadSplitBundles, String splitAlgorithmName)throws > > >> PulsarAdminException; > > >> > ``` > > >> > > > >> > to > > >> > > > >> > ```java > > >> > void splitNamespaceBundle(String namespace, String bundle, boolean > > >> > unloadSplitBundles, > > >> > String splitAlgorithmName, String > topic) > > >> > throws PulsarAdminException; > > >> > ``` > > >> > > > >> > ## Implementation > > >> > > > >> > There are changes both from the Admin CLI and the broker side. > > >> > > > >> > First, Admin CLI for split bundle should support to specify the > > >> > `outstanding topic`, > > >> > > > >> > ```java > > >> > /** > > >> > * Split namespace bundle. > > >> > * > > >> > * @param namespace > > >> > * @param bundle range of bundle to split > > >> > * @param unloadSplitBundles > > >> > * @param splitAlgorithmName > > >> > * @param topic > > >> > * @throws PulsarAdminException > > >> > */ > > >> > void splitNamespaceBundle(String namespace, String bundle, > boolean > > >> > unloadSplitBundles, > > >> > String splitAlgorithmName, String > topic) > > >> > throws PulsarAdminException; > > >> > > > >> > ``` > > >> > > > >> > ```java > > >> > /** > > >> > * Split namespace bundle asynchronously. > > >> > * > > >> > * @param namespace > > >> > * @param bundle range of bundle to split > > >> > * @param unloadSplitBundles > > >> > * @param splitAlgorithmName > > >> > */ > > >> > CompletableFuture<Void> splitNamespaceBundleAsync( > > >> > String namespace, String bundle, boolean > unloadSplitBundles, > > >> > String splitAlgorithmName, String topic); > > >> > ``` > > >> > > > >> > And for the broker side, first encapsulates the parameters for > bundle > > >> split > > >> > into a new class `BundleSplitOption` > > >> > > > >> > ```java > > >> > public class BundleSplitOption { > > >> > private NamespaceService service; > > >> > private NamespaceBundle bundle; > > >> > private String topic; > > >> > } > > >> > ``` > > >> > > > >> > add a new split algorithm > > >> > > > >> > ```java > > >> > ublic class SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm > > >> implements > > >> > NamespaceBundleSplitAlgorithm { > > >> > @Override > > >> > public CompletableFuture<List<Long>> > > >> getSplitBoundary(BundleSplitOption > > >> > bundleSplitOption) { > > >> > > > >> > }); > > >> > } > > >> > } > > >> > ``` > > >> > > > >> > add the new algorithm to `NamespaceBundleSplitAlgorithm` > > >> > > > >> > ```JAVA > > >> > String SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE = > > >> > "specified_topic_count_equally_divide"; > > >> > > > >> > List<String> AVAILABLE_ALGORITHMS = > > >> > Lists.newArrayList(RANGE_EQUALLY_DIVIDE_NAME, > > >> > TOPIC_COUNT_EQUALLY_DIVIDE, > > >> > SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE); > > >> > > > >> > NamespaceBundleSplitAlgorithm > > >> SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE_ALGO = > > >> > new > SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm(); > > >> > ``` > > >> > > > >> > modify the `splitAndOwnBundle` and `splitAndOwnBundleOnceAndRetry` > for > > >> > [[NamespaceService.java]( > > >> > > > >> > https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1)](https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1 > > >> ) > > >> > > > >> > > > >> > ```java > > >> > public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle > bundle, > > >> > boolean unload, > > >> > > > >> > NamespaceBundleSplitAlgorithm splitAlgorithm, String topic) { > > >> > > > >> > final CompletableFuture<Void> unloadFuture = new > > >> > CompletableFuture<>(); > > >> > final AtomicInteger counter = new > > >> > AtomicInteger(BUNDLE_SPLIT_RETRY_LIMIT); > > >> > splitAndOwnBundleOnceAndRetry(bundle, unload, counter, > > >> > unloadFuture, splitAlgorithm, topic); > > >> > > > >> > return unloadFuture; > > >> > } > > >> > ``` > > >> > > > >> > ```java > > >> > void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle, > > >> > boolean unload, > > >> > AtomicInteger counter, > > >> > CompletableFuture<Void> > > >> > completionFuture, > > >> > NamespaceBundleSplitAlgorithm > > >> > splitAlgorithm, > > >> > String topic) { > > >> > ``` > > >> > > > >> > Also, we change the REST api and broker.conf > > >> > > > >> > ```java > > >> > public void splitNamespaceBundle( > > >> > @Suspended final AsyncResponse asyncResponse, > > >> > @PathParam("property") String property, > > >> > @PathParam("cluster") String cluster, > > >> > @PathParam("namespace") String namespace, > > >> > @PathParam("bundle") String bundleRange, > > >> > @QueryParam("authoritative") @DefaultValue("false") > boolean > > >> > authoritative, > > >> > @QueryParam("unload") @DefaultValue("false") boolean > unload, > > >> > @QueryParam("topic") @DefaultValue("") String topic) {} > > >> > ``` > > >> > > > >> > ```shell > > >> > > > >> > supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_topic_count_equally_divide > > >> > ``` > > >> > > > >> > > > > > >