> Do you have an example for affinity? I don't fully understand how this is > used > in practice.
IMO, this affinity serves the purpose of isolating an abnormal topic to some spare brokers. These brokers host these kind of topics only. Here are some cases : 1. A topic may have unexpected short spike traffic flows periodically and causing broker overloads and negative impact on other topics. Until we have more proper solutions, we can always isolate these topics first, and make the service recover time as small as possible. 2. Some users may encounter some bugs in brokers, and we can isolate the topic to exclusive brokers, and use more radical approach to locate the bug, like enable debug level logs or even add some temporary code patch. 3. User may already have configured failure domain and anti-affinity namespace, but with business logic code changes, some topic may need to migrate from one namespace to another. This will take some time for user to change the client side config. In the meanwhile, we can isolate the topic first. Thanks, Haiting On 2022/02/18 15:26:09 PengHui Li wrote: > Hi Haiting, > > > I think this approach have more potential with abnormal topic isolation. > If we can introduce > some kind of bundle isolation strategy, (like broker-bundle affinity and > anti-affinity mechanism), we can easily isolate some unexpected traffic to > some empty brokers. > IMO, this would improve the stability of broker cluster. > > if I understand correctly, it looks like if we have a partitioned topic > with 10 > partitions under a namespace with 16 bundles, if applies the anti-affinity > policy, > partition-0 map to bundle 0, partition-1 map to bundle 1, and so on. > Of course, it is not necessary for every partitioned topic to start from > bundle 0, > we can use the partition-0 hash to determine the start bundle index. > > Do you have an example for affinity? I don't fully understand how this is > used > in practice. > > Best, > Penghui > > On Fri, Feb 18, 2022 at 11:16 PM PengHui Li <peng...@apache.org> wrote: > > > Hi Aloys, > > > > > Do you mean that > > 1. First, add a new API, maybe `getHashPositioin`, to get the hash > > position in a bundle > > 2. Then use this position to split the overloaded bundle > > If so, when we split a bundle with multi partitions of a topic, we need to > > call the `getHashPositioin` multi times to get the middle position of all > > these positions. > > > > Yes, this want I mean. In this way, users can control to assign 1 topic or > > 3 topics to one bundle. This is more like increasing the transparency of > > the topic in the bundle, you can all the positions of the topics, so how > > planning for bundle splitting becomes more flexible. > > > > The new API does not necessarily have to query by topic one by one, > > we have listed all the "topic -> position" of a bundle? > > > > Thanks, > > Penghui > > > > On Fri, Feb 18, 2022 at 4:51 PM Haiting Jiang <jianghait...@apache.org> > > wrote: > > > >> Hi Aloys, > >> +1 for this great PIP. > >> > >> > The Admin CLI `bin/pulsar-admin namespaces split-bundle -b > >> ${bundle_range}` > >> > will add a new parameter "--topic" or "-t" for `outstanding topic` > >> name. > >> > >> Do we have limitation on this "topic" parameter. Can this be a > >> partitioned topic? > >> If so, will this new algorithm split the bundle into more than 2 bundles? > >> like each bundle for > >> one partition. > >> > >> > This algorithm has a disadvantage, it can only deal > >> > with one `outstanding topic`. > >> > >> For this disadvantage, I think it can be solved by extends the "topic" > >> parameter from one topic to a topic list. > >> > >> > The other algorithm is to split the bundle at the hashcode point of the > >> > `outstanding partition` which will split the bundle into three bundles > >> once > >> > a time. The middle one contains the only point the hashcode of the > >> > `outstanding partition, the left one is less than the hashcode, the > >> right > >> > one is more than the hashcode. > >> > E.g. if we have a bundle 0x00_0x10 contains two `outstanding partition` > >> > (partition-x and partition-y) whose hashcode is 0x03 and 0x07, this > >> > algorithm is going to split bundle the bundle into five new bundles, > >> > 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08( for > >> > partition-y), 0x08_0x10. > >> > >> I think this approach have more potential with abnormal topic isolation. > >> If we can introduce > >> some kind of bundle isolation strategy, (like broker-bundle affinity and > >> anti-affinity mechanism), we can easily isolate some unexpected traffic to > >> some empty brokers. > >> IMO, this would improve the stability of broker cluster. > >> > >> Thanks, > >> Haiting > >> > >> On 2022/02/17 15:47:15 Aloys Zhang wrote: > >> > Hi Pulsar Community, > >> > > >> > This is a PIP discussion on how to support split partitions belonging to > >> > specified topics in a bundle. > >> > > >> > The issue can be found: https://github.com/apache/pulsar/issues/13761 > >> > > >> > I copy the content here for convenience, any suggestions are welcome and > >> > appreciated. > >> > > >> > > >> > ## Motivation > >> > > >> > As we all know, a namespace bundle may contain lots of partitions > >> belonging > >> > to different topics. > >> > The throughput of these topics may vary greatly. Some topics may with > >> very > >> > high rate/throughput while other topics have a very low rate/throughput. > >> > > >> > These partitions with high rate/throughput can cause broker overload and > >> > bundle unloading. > >> > At this point, if we split bundle manually with `range_equally_divide` > >> or > >> > `topic_count_equally_divide` split algorithm, there may need many times > >> > split before these high rate/through partitions assigned to different > >> new > >> > bundles. > >> > > >> > For convenience, we call these high throughput topics `outstanding > >> topic` > >> > and their partitions `outstanding partition` in this PIP. > >> > > >> > ## Goal > >> > > >> > Our goal is to make it easier to split `outstanding partition` into new > >> > bundles. > >> > > >> > There are two alternative ways to achieve this. Either of them will add > >> a > >> > new algorithm for bundle split. The difference is how the new bundle > >> split > >> > algorithm is implemented. > >> > > >> > One algorithm is to split bundle by `outstanding topic` which will split > >> > the bundle into two new bundles and each new bundle contains an equally > >> > `outstanding partition` once a time. > >> > E.g, a bundle contains lots of topic partitions, and only one > >> `outstanding > >> > topic`(T) with 2 `outstanding partition` (T-partition-n, > >> Tpartition-n+1). > >> > This algorithm split this bundle at the middle point of these two > >> > partition's hashcode. This algorithm has a disadvantage, it can only > >> deal > >> > with one `outstanding topic`. > >> > > >> > So we raised up another algorithm. > >> > > >> > The other algorithm is to split the bundle at the hashcode point of the > >> > `outstanding partition` which will split the bundle into three bundles > >> once > >> > a time. The middle one contains the only point the hashcode of the > >> > `outstanding partition, the left one is less than the hashcode, the > >> right > >> > one is more than the hashcode. > >> > E.g. if we have a bundle 0x00_0x10 contains two `outstanding partition` > >> > (partition-x and partition-y) whose hashcode is 0x03 and 0x07, this > >> > algorithm is going to split bundle the bundle into five new bundles, > >> > 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08( for > >> > partition-y), 0x08_0x10. > >> > > >> > ## API Changes > >> > > >> > The Admin CLI `bin/pulsar-admin namespaces split-bundle -b > >> ${bundle_range}` > >> > will add a new parameter "--topic" or "-t" for `outstanding topic` > >> name. > >> > > >> > The split interface changed from > >> > > >> > ```JAVA > >> > void splitNamespaceBundle(String namespace, String bundle, boolean > >> > unloadSplitBundles, String splitAlgorithmName)throws > >> PulsarAdminException; > >> > ``` > >> > > >> > to > >> > > >> > ```java > >> > void splitNamespaceBundle(String namespace, String bundle, boolean > >> > unloadSplitBundles, > >> > String splitAlgorithmName, String topic) > >> > throws PulsarAdminException; > >> > ``` > >> > > >> > ## Implementation > >> > > >> > There are changes both from the Admin CLI and the broker side. > >> > > >> > First, Admin CLI for split bundle should support to specify the > >> > `outstanding topic`, > >> > > >> > ```java > >> > /** > >> > * Split namespace bundle. > >> > * > >> > * @param namespace > >> > * @param bundle range of bundle to split > >> > * @param unloadSplitBundles > >> > * @param splitAlgorithmName > >> > * @param topic > >> > * @throws PulsarAdminException > >> > */ > >> > void splitNamespaceBundle(String namespace, String bundle, boolean > >> > unloadSplitBundles, > >> > String splitAlgorithmName, String topic) > >> > throws PulsarAdminException; > >> > > >> > ``` > >> > > >> > ```java > >> > /** > >> > * Split namespace bundle asynchronously. > >> > * > >> > * @param namespace > >> > * @param bundle range of bundle to split > >> > * @param unloadSplitBundles > >> > * @param splitAlgorithmName > >> > */ > >> > CompletableFuture<Void> splitNamespaceBundleAsync( > >> > String namespace, String bundle, boolean unloadSplitBundles, > >> > String splitAlgorithmName, String topic); > >> > ``` > >> > > >> > And for the broker side, first encapsulates the parameters for bundle > >> split > >> > into a new class `BundleSplitOption` > >> > > >> > ```java > >> > public class BundleSplitOption { > >> > private NamespaceService service; > >> > private NamespaceBundle bundle; > >> > private String topic; > >> > } > >> > ``` > >> > > >> > add a new split algorithm > >> > > >> > ```java > >> > ublic class SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm > >> implements > >> > NamespaceBundleSplitAlgorithm { > >> > @Override > >> > public CompletableFuture<List<Long>> > >> getSplitBoundary(BundleSplitOption > >> > bundleSplitOption) { > >> > > >> > }); > >> > } > >> > } > >> > ``` > >> > > >> > add the new algorithm to `NamespaceBundleSplitAlgorithm` > >> > > >> > ```JAVA > >> > String SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE = > >> > "specified_topic_count_equally_divide"; > >> > > >> > List<String> AVAILABLE_ALGORITHMS = > >> > Lists.newArrayList(RANGE_EQUALLY_DIVIDE_NAME, > >> > TOPIC_COUNT_EQUALLY_DIVIDE, > >> > SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE); > >> > > >> > NamespaceBundleSplitAlgorithm > >> SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE_ALGO = > >> > new SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm(); > >> > ``` > >> > > >> > modify the `splitAndOwnBundle` and `splitAndOwnBundleOnceAndRetry` for > >> > [[NamespaceService.java]( > >> > > >> https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1)](https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1 > >> ) > >> > > >> > > >> > ```java > >> > public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle, > >> > boolean unload, > >> > > >> > NamespaceBundleSplitAlgorithm splitAlgorithm, String topic) { > >> > > >> > final CompletableFuture<Void> unloadFuture = new > >> > CompletableFuture<>(); > >> > final AtomicInteger counter = new > >> > AtomicInteger(BUNDLE_SPLIT_RETRY_LIMIT); > >> > splitAndOwnBundleOnceAndRetry(bundle, unload, counter, > >> > unloadFuture, splitAlgorithm, topic); > >> > > >> > return unloadFuture; > >> > } > >> > ``` > >> > > >> > ```java > >> > void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle, > >> > boolean unload, > >> > AtomicInteger counter, > >> > CompletableFuture<Void> > >> > completionFuture, > >> > NamespaceBundleSplitAlgorithm > >> > splitAlgorithm, > >> > String topic) { > >> > ``` > >> > > >> > Also, we change the REST api and broker.conf > >> > > >> > ```java > >> > public void splitNamespaceBundle( > >> > @Suspended final AsyncResponse asyncResponse, > >> > @PathParam("property") String property, > >> > @PathParam("cluster") String cluster, > >> > @PathParam("namespace") String namespace, > >> > @PathParam("bundle") String bundleRange, > >> > @QueryParam("authoritative") @DefaultValue("false") boolean > >> > authoritative, > >> > @QueryParam("unload") @DefaultValue("false") boolean unload, > >> > @QueryParam("topic") @DefaultValue("") String topic) {} > >> > ``` > >> > > >> > ```shell > >> > > >> supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_topic_count_equally_divide > >> > ``` > >> > > >> > > >