Hi Chia-Ping / David / Lucas, Happy new year and thanks for the review.
DJ02: Thanks for the suggestion. I updated the PR to use Guava. DJ03: Yes, I updated the description to mention ISR change, add altering partition reassignment case, and mention that non-related topic change doesn’t trigger a rebalance. DJ03.1: Yes, I will keep using ModernGroup#requestMetadataRefresh to notify group. DJ06: Updated the PR to use Guava Hashing#combineUnordered function to combine topic hash. DJ07: Renamed it to MetadataHash. DJ08: Added a sample hash function to the KIP and use first byte as magic byte. This is also included in latest PR. DJ09: Added two paragraphs about upgraded and downgraded. DJ10: According to Lucas’s comment, I add StreamsGroupMetadataValue update to this KIP. Thanks, PoAn > On Dec 20, 2024, at 3:58 PM, Chia-Ping Tsai <chia7...@gmail.com> wrote: > >> because assignors are sticky. > > I forgot about that spec again :( > > > > > David Jacot <david.ja...@gmail.com> 於 2024年12月20日 週五 下午3:41寫道: > >> Hi Chia-Ping, >> >> DJ08: In my opinion, changing the format will be rare so it is >> acceptable if rebalances are triggered in this case on >> upgrade/downgrade. It is also what will happen if a cluster is >> downgraded from 4.1 (with this KIP) to 4.0. The rebalance won't change >> anything if the topology of the group is the same because assignors >> are sticky. The default ones are and we recommend custom ones to also >> be. >> >> Best, >> David >> >> On Fri, Dec 20, 2024 at 2:11 AM Chia-Ping Tsai <chia7...@apache.org> >> wrote: >>> >>> ummm, it does not work for downgrade as the old coordinator has no idea >> about new format :( >>> >>> >>> On 2024/12/20 00:57:27 Chia-Ping Tsai wrote: >>>> hi David >>>> >>>>> DJ08: >>>> >>>> That's a good question. If the "hash" lacks version control, it could >> trigger a series of unnecessary rebalances. However, adding additional >> information ("magic") to the hash does not help the upgraded coordinator >> determine the "version." This means that the upgraded coordinator would >> still trigger unnecessary rebalances because it has no way to know which >> format to use when comparing the hash. >>>> >>>> Perhaps we can add a new field to ConsumerGroupMetadataValue to >> indicate the version of the "hash." This would allow the coordinator, when >> handling subscription metadata, to compute the old hash and determine >> whether an epoch bump is necessary. Additionally, the coordinator can >> generate a new record to upgrade the hash without requiring an epoch bump. >>>> >>>> Another issue is whether the coordinator should cache all versions of >> the hash. I believe this is necessary; otherwise, during an upgrade, there >> would be extensive recomputing of old hashes. >>>> >>>> I believe this idea should also work for downgrades, and that's just >> my two cents. >>>> >>>> Best, >>>> Chia-Ping >>>> >>>> >>>> On 2024/12/19 14:39:41 David Jacot wrote: >>>>> Hi PoAn and Chia-Ping, >>>>> >>>>> Thanks for your responses. >>>>> >>>>> DJ02: Sorry, I was not clear. I was wondering whether we could >> compute the >>>>> hash without having to convert to bytes before. Guava has a nice >> interface >>>>> for this allowing to incrementally add primitive types to the hash. >> We can >>>>> discuss this in the PR as it is an implementation detail. >>>>> >>>>> DJ03: Thanks. I don't think that the replicas are updated when a >> broker >>>>> shuts down. What you said applies to the ISR. I suppose that we can >> rely on >>>>> the ISR changes to trigger updates. It is also worth noting >>>>> that TopicsDelta#changedTopics is updated for every change (e.g. ISR >>>>> change, leader change, replicas change, etc.). I suppose that it is >> OK but >>>>> it seems that it will trigger refreshes which are not necessary. >> However, a >>>>> rebalance won't be triggered because the hash won't change. >>>>> DJ03.1: I suppose that we will continue to rely on >>>>> ModernGroup#requestMetadataRefresh to notify groups that must >> refresh their >>>>> hashes. Is my understanding correct? >>>>> >>>>> DJ05: Fair enough. >>>>> >>>>> DJ06: You mention in two places that you would like to combine >> hashes by >>>>> additioning them. I wonder if this is a good practice. Intuitively, >> I would >>>>> have used XOR or hashed the hashed. Guava has a method for combining >>>>> hashes. It may be worth looking into the algorithm used. >>>>> >>>>> DJ07: I would rename "AllTopicHash" to "MetadataHash" in order to be >> more >>>>> generic. >>>>> >>>>> DJ08: Regarding the per topic hash, I wonder whether we should >> precise in >>>>> the KIP how we will compute it. I had the following in mind: >>>>> hash(topicName; numPartitions; [partitionId;sorted racks]). We could >> also >>>>> add a magic byte at the first element as a sort of version. I am not >> sure >>>>> whether it is needed though. I was thinking about this while >> imagining how >>>>> we would handle changing the format in the future. >>>>> >>>>> DJ09: It would be great if we could provide more details about >> backward >>>>> compatibility. What happens when the cluster is upgraded or >> downgraded? >>>>> >>>>> DJ10: We should update KIP-1071. It may be worth pigging them in the >>>>> discussion thread of KIP-1071. >>>>> >>>>> Best, >>>>> David >>>>> >>>>> On Tue, Dec 17, 2024 at 9:25 AM PoAn Yang <yangp...@gmail.com> >> wrote: >>>>> >>>>>> Hi Chia-Ping / David / Andrew, >>>>>> >>>>>> Thanks for the review and suggestions. >>>>>> >>>>>> DJ01: Removed all implementation details. >>>>>> >>>>>> DJ02: Does the “incrementally” mean that we only calculate the >> difference >>>>>> parts? >>>>>> For example, if the number of partition change, we only calculate >> the hash >>>>>> of number of partition and reconstruct it to the topic hash. >>>>>> IMO, we only calculate topic hash one time. With cache mechanism, >> the >>>>>> value can be reused in different groups on a same broker. >>>>>> The CPU usage for this part is not very high. >>>>>> >>>>>> DJ03: Added the update path to KIP for both cases. >>>>>> >>>>>> DJ04: Yes, it’s a good idea. With cache mechanism and single hash >> per >>>>>> group, we can balance cpu and disk usage. >>>>>> >>>>>> DJ05: Currently, the topic hash is only used in coordinator. >> However, the >>>>>> metadata image is used in many different places. >>>>>> How about we move the hash to metadata image when we find more use >> cases? >>>>>> >>>>>> AS1, AS2: Thanks for the reminder. I will simply delete >>>>>> ShareGroupPartitionMetadataKey/Value and add a new field to >>>>>> ShareGroupMetadataValue. >>>>>> >>>>>> Best, >>>>>> PoAn >>>>>> >>>>>>> On Dec 17, 2024, at 5:50 AM, Andrew Schofield < >>>>>> andrew_schofield_j...@outlook.com> wrote: >>>>>>> >>>>>>> Hi PoAn, >>>>>>> Thanks for the KIP. >>>>>>> >>>>>>> AS1: From the point of view of share groups, the API and record >> schema >>>>>>> definitions are unstable in AK 4.0. In AK 4.1, we will start >> supporting >>>>>> proper >>>>>>> versioning. As a result, I think you do not need to deprecate >> the fields >>>>>> in the >>>>>>> ShareGroupPartitionMetadataValue. Just include the schema for >> the fields >>>>>>> which are actually needed, and I'll update the schema in the >> code when >>>>>>> the KIP is implemented. >>>>>>> >>>>>>> AS2: In the event that DJ04 actually removes the need for >>>>>>> ConsumerGroupPartitionMetadataKey/Value entirely, I would simply >>>>>>> delete ShareGroupPartitionMetadataKey/Value, assuming that it is >>>>>>> accepted in time for AK 4.1. >>>>>>> >>>>>>> >>>>>>> Thanks, >>>>>>> Andrew >>>>>>> ________________________________________ >>>>>>> From: Chia-Ping Tsai <chia7...@apache.org> >>>>>>> Sent: 16 December 2024 16:27 >>>>>>> To: dev@kafka.apache.org <dev@kafka.apache.org> >>>>>>> Subject: Re: [DISCUSS] KIP-1101: Trigger rebalance on rack >> topology >>>>>> changes >>>>>>> >>>>>>> hi David >>>>>>> >>>>>>>> DJ05 >>>>>>> >>>>>>> One of the benefits of having a single hash per group (DJ04) is >> the >>>>>> reduction in the size of stored data. Additionally, the cost of >>>>>> re-computing can be minimized thanks to caching. So I'm + 1 to >> DJ04. >>>>>> However, the advantage of storing the topic cache in the metadata >> image is >>>>>> somewhat unclear to me. Could you please provide more details on >> what you >>>>>> mean by "tight"?Furthermore, since the metadata image is a >> thread-safe >>>>>> object, we need to ensure that the lazy initialization is also >> thread-safe. >>>>>> If no other components require the cache, it would be better to >> keep the >>>>>> caches within the coordinator. >>>>>>> >>>>>>> Best, >>>>>>> Chia-Ping >>>>>>> >>>>>>> On 2024/12/16 14:01:35 David Jacot wrote: >>>>>>>> Hi PoAn, >>>>>>>> >>>>>>>> Thanks for the KIP. I have some comments about it. >>>>>>>> >>>>>>>> DJ01: Please, remove all the code from the KIP. We only care >> about >>>>>> public >>>>>>>> interface changes, not about implementation details. >>>>>>>> DJ02: Regarding the hash computation, I agree that we should use >>>>>> Murmur3. >>>>>>>> However, I don't quite like the implementation that you shared. >> I >>>>>> wonder if >>>>>>>> we could make it work incrementally instead of computing a hash >> of >>>>>>>> everything and combining them. >>>>>>>> DJ03: Regarding the cache, my understanding is that the cache is >>>>>> populated >>>>>>>> when a topic without hash is seen in a HB request and the cache >> is >>>>>> cleaned >>>>>>>> up when topics are deleted based on the metadata image. >> However, the >>>>>> update >>>>>>>> path is not clear. Let's say that a partition is added to a >> topic, how >>>>>> does >>>>>>>> it detect it? Let's also imagine that the racks of a partition >> have >>>>>>>> changed, how does it detect it? In the KIP, it would be nice to >> be clear >>>>>>>> about those. >>>>>>>> DJ04: I wonder whether we should go with a single hash per >> group. Your >>>>>>>> argument against it is that it would require to re-compute the >> hash of >>>>>> all >>>>>>>> the topics when it needs to be computed. In my opinion, we could >>>>>> leverage >>>>>>>> the cached hash per topic to compute the hash of all the >> subscribed >>>>>> ones. >>>>>>>> We could basically combine all the hashes without having to >> compute all >>>>>> of >>>>>>>> them. This approach has a few benefits. 1) We could get rid of >>>>>>>> the ConsumerGroupPartitionMetadata record as we could store the >> hash >>>>>> with >>>>>>>> the group epoch. 2) We could get rid of the Map that we keep in >> each >>>>>> group >>>>>>>> to store the hashed corresponding to the subscribed topics. >>>>>>>> DJ05: Regarding the cache again, I wonder if we should actually >> store >>>>>> the >>>>>>>> hash in the metadata image instead of maintaining it somewhere >> else. We >>>>>>>> could still lazily compute it. The benefit is that the value >> would be >>>>>> tight >>>>>>>> to the topic. I have not really looked into it. Would it be an >> option? >>>>>>>> >>>>>>>> I'll be away for two weeks starting from Saturday. I kindly ask >> you to >>>>>> wait >>>>>>>> on me if we cannot conclude this week. >>>>>>>> >>>>>>>> Best, >>>>>>>> David >>>>>>>> >>>>>>>> On Tue, Nov 5, 2024 at 1:43 PM Frank Yang <yangp...@gmail.com> >> wrote: >>>>>>>> >>>>>>>>> Hi Chia-Ping, >>>>>>>>> >>>>>>>>> Thanks for the review and suggestions. >>>>>>>>> >>>>>>>>> Q0: Add how rack change and how it affects topic partition. >>>>>>>>> >>>>>>>>> Q1: Add why we need a balance algorithm to Motivation section. >>>>>>>>> >>>>>>>>> Q2: After checking again, we don’t need to update cache when >> we replay >>>>>>>>> records. We only need to renew it in consumer heartbeat. >>>>>>>>> >>>>>>>>> Q3: Add a new section “Topic Hash Function”. >>>>>>>>> >>>>>>>>> Thanks. >>>>>>>>> PoAn >>>>>>>>> >>>>>>>>>> On Nov 1, 2024, at 4:39 PM, Chia-Ping Tsai < >> chia7...@gmail.com> >>>>>> wrote: >>>>>>>>>> >>>>>>>>>> hi PoAn >>>>>>>>>> >>>>>>>>>> Thanks for for this KIP! >>>>>>>>>> >>>>>>>>>> Q0: Could you add more details about `A topic partition has >> rack >>>>>> change`? >>>>>>>>>> IIRC, the "rack change" includes both follower and leader, >> right? >>>>>>>>>> >>>>>>>>>> Q1: Could you please add the 'concerns' we discussed to the >> Motivation >>>>>>>>>> section? This should include topics like 'computations' and >> 'space >>>>>>>>> usage'. >>>>>>>>>> >>>>>>>>>> Q2: `The group coordinator can leverage it to add a new topic >>>>>> hash.`This >>>>>>>>>> description seems a bit off to me. Why do we need to update >> the cache >>>>>> at >>>>>>>>>> this phase? The cache is intended to prevent duplicate >> computations >>>>>>>>> caused >>>>>>>>>> by heartbeat requests that occur between two metadata change >> events. >>>>>>>>>> Therefore, we could even remove the changed topics from >> caches on a >>>>>>>>>> metadata change, as the first heartbeat request would update >> the >>>>>> caches >>>>>>>>> for >>>>>>>>>> all changed topics. >>>>>>>>>> >>>>>>>>>> Q3: Could you please include a section about the choice of >> hash >>>>>>>>>> implementation? The hash implementation must be consistent >> across >>>>>>>>> different >>>>>>>>>> JDKs, so we use Murmur3 to generate the hash value. >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Chia-Ping >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Frank Yang <yangp...@gmail.com> 於 2024年11月1日 週五 下午3:57寫道: >>>>>>>>>> >>>>>>>>>>> Hi all, >>>>>>>>>>> >>>>>>>>>>> I would like to start a discussion thread on KIP-1101. >> Trigger >>>>>> rebalance >>>>>>>>>>> on rack topology changes. In this KIP, we aim to use less >> memory / >>>>>> disk >>>>>>>>>>> resources to detect rack changes in the new coordinator. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1101%3A+Trigger+rebalance+on+rack+topology+changes >>>>>>>>>>> >>>>>>>>>>> Please take a look and feel free to share any thoughts. >>>>>>>>>>> >>>>>>>>>>> Thanks. >>>>>>>>>>> PoAn >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>> >>>>>> >>>>> >>>> >>