Hi Chia-Ping, DJ08: In my opinion, changing the format will be rare so it is acceptable if rebalances are triggered in this case on upgrade/downgrade. It is also what will happen if a cluster is downgraded from 4.1 (with this KIP) to 4.0. The rebalance won't change anything if the topology of the group is the same because assignors are sticky. The default ones are and we recommend custom ones to also be.
Best, David On Fri, Dec 20, 2024 at 2:11 AM Chia-Ping Tsai <chia7...@apache.org> wrote: > > ummm, it does not work for downgrade as the old coordinator has no idea about > new format :( > > > On 2024/12/20 00:57:27 Chia-Ping Tsai wrote: > > hi David > > > > > DJ08: > > > > That's a good question. If the "hash" lacks version control, it could > > trigger a series of unnecessary rebalances. However, adding additional > > information ("magic") to the hash does not help the upgraded coordinator > > determine the "version." This means that the upgraded coordinator would > > still trigger unnecessary rebalances because it has no way to know which > > format to use when comparing the hash. > > > > Perhaps we can add a new field to ConsumerGroupMetadataValue to indicate > > the version of the "hash." This would allow the coordinator, when handling > > subscription metadata, to compute the old hash and determine whether an > > epoch bump is necessary. Additionally, the coordinator can generate a new > > record to upgrade the hash without requiring an epoch bump. > > > > Another issue is whether the coordinator should cache all versions of the > > hash. I believe this is necessary; otherwise, during an upgrade, there > > would be extensive recomputing of old hashes. > > > > I believe this idea should also work for downgrades, and that's just my two > > cents. > > > > Best, > > Chia-Ping > > > > > > On 2024/12/19 14:39:41 David Jacot wrote: > > > Hi PoAn and Chia-Ping, > > > > > > Thanks for your responses. > > > > > > DJ02: Sorry, I was not clear. I was wondering whether we could compute the > > > hash without having to convert to bytes before. Guava has a nice interface > > > for this allowing to incrementally add primitive types to the hash. We can > > > discuss this in the PR as it is an implementation detail. > > > > > > DJ03: Thanks. I don't think that the replicas are updated when a broker > > > shuts down. What you said applies to the ISR. I suppose that we can rely > > > on > > > the ISR changes to trigger updates. It is also worth noting > > > that TopicsDelta#changedTopics is updated for every change (e.g. ISR > > > change, leader change, replicas change, etc.). I suppose that it is OK but > > > it seems that it will trigger refreshes which are not necessary. However, > > > a > > > rebalance won't be triggered because the hash won't change. > > > DJ03.1: I suppose that we will continue to rely on > > > ModernGroup#requestMetadataRefresh to notify groups that must refresh > > > their > > > hashes. Is my understanding correct? > > > > > > DJ05: Fair enough. > > > > > > DJ06: You mention in two places that you would like to combine hashes by > > > additioning them. I wonder if this is a good practice. Intuitively, I > > > would > > > have used XOR or hashed the hashed. Guava has a method for combining > > > hashes. It may be worth looking into the algorithm used. > > > > > > DJ07: I would rename "AllTopicHash" to "MetadataHash" in order to be more > > > generic. > > > > > > DJ08: Regarding the per topic hash, I wonder whether we should precise in > > > the KIP how we will compute it. I had the following in mind: > > > hash(topicName; numPartitions; [partitionId;sorted racks]). We could also > > > add a magic byte at the first element as a sort of version. I am not sure > > > whether it is needed though. I was thinking about this while imagining how > > > we would handle changing the format in the future. > > > > > > DJ09: It would be great if we could provide more details about backward > > > compatibility. What happens when the cluster is upgraded or downgraded? > > > > > > DJ10: We should update KIP-1071. It may be worth pigging them in the > > > discussion thread of KIP-1071. > > > > > > Best, > > > David > > > > > > On Tue, Dec 17, 2024 at 9:25 AM PoAn Yang <yangp...@gmail.com> wrote: > > > > > > > Hi Chia-Ping / David / Andrew, > > > > > > > > Thanks for the review and suggestions. > > > > > > > > DJ01: Removed all implementation details. > > > > > > > > DJ02: Does the “incrementally” mean that we only calculate the > > > > difference > > > > parts? > > > > For example, if the number of partition change, we only calculate the > > > > hash > > > > of number of partition and reconstruct it to the topic hash. > > > > IMO, we only calculate topic hash one time. With cache mechanism, the > > > > value can be reused in different groups on a same broker. > > > > The CPU usage for this part is not very high. > > > > > > > > DJ03: Added the update path to KIP for both cases. > > > > > > > > DJ04: Yes, it’s a good idea. With cache mechanism and single hash per > > > > group, we can balance cpu and disk usage. > > > > > > > > DJ05: Currently, the topic hash is only used in coordinator. However, > > > > the > > > > metadata image is used in many different places. > > > > How about we move the hash to metadata image when we find more use > > > > cases? > > > > > > > > AS1, AS2: Thanks for the reminder. I will simply delete > > > > ShareGroupPartitionMetadataKey/Value and add a new field to > > > > ShareGroupMetadataValue. > > > > > > > > Best, > > > > PoAn > > > > > > > > > On Dec 17, 2024, at 5:50 AM, Andrew Schofield < > > > > andrew_schofield_j...@outlook.com> wrote: > > > > > > > > > > Hi PoAn, > > > > > Thanks for the KIP. > > > > > > > > > > AS1: From the point of view of share groups, the API and record schema > > > > > definitions are unstable in AK 4.0. In AK 4.1, we will start > > > > > supporting > > > > proper > > > > > versioning. As a result, I think you do not need to deprecate the > > > > > fields > > > > in the > > > > > ShareGroupPartitionMetadataValue. Just include the schema for the > > > > > fields > > > > > which are actually needed, and I'll update the schema in the code when > > > > > the KIP is implemented. > > > > > > > > > > AS2: In the event that DJ04 actually removes the need for > > > > > ConsumerGroupPartitionMetadataKey/Value entirely, I would simply > > > > > delete ShareGroupPartitionMetadataKey/Value, assuming that it is > > > > > accepted in time for AK 4.1. > > > > > > > > > > > > > > > Thanks, > > > > > Andrew > > > > > ________________________________________ > > > > > From: Chia-Ping Tsai <chia7...@apache.org> > > > > > Sent: 16 December 2024 16:27 > > > > > To: dev@kafka.apache.org <dev@kafka.apache.org> > > > > > Subject: Re: [DISCUSS] KIP-1101: Trigger rebalance on rack topology > > > > changes > > > > > > > > > > hi David > > > > > > > > > >> DJ05 > > > > > > > > > > One of the benefits of having a single hash per group (DJ04) is the > > > > reduction in the size of stored data. Additionally, the cost of > > > > re-computing can be minimized thanks to caching. So I'm + 1 to DJ04. > > > > However, the advantage of storing the topic cache in the metadata image > > > > is > > > > somewhat unclear to me. Could you please provide more details on what > > > > you > > > > mean by "tight"?Furthermore, since the metadata image is a thread-safe > > > > object, we need to ensure that the lazy initialization is also > > > > thread-safe. > > > > If no other components require the cache, it would be better to keep the > > > > caches within the coordinator. > > > > > > > > > > Best, > > > > > Chia-Ping > > > > > > > > > > On 2024/12/16 14:01:35 David Jacot wrote: > > > > >> Hi PoAn, > > > > >> > > > > >> Thanks for the KIP. I have some comments about it. > > > > >> > > > > >> DJ01: Please, remove all the code from the KIP. We only care about > > > > public > > > > >> interface changes, not about implementation details. > > > > >> DJ02: Regarding the hash computation, I agree that we should use > > > > Murmur3. > > > > >> However, I don't quite like the implementation that you shared. I > > > > wonder if > > > > >> we could make it work incrementally instead of computing a hash of > > > > >> everything and combining them. > > > > >> DJ03: Regarding the cache, my understanding is that the cache is > > > > populated > > > > >> when a topic without hash is seen in a HB request and the cache is > > > > cleaned > > > > >> up when topics are deleted based on the metadata image. However, the > > > > update > > > > >> path is not clear. Let's say that a partition is added to a topic, > > > > >> how > > > > does > > > > >> it detect it? Let's also imagine that the racks of a partition have > > > > >> changed, how does it detect it? In the KIP, it would be nice to be > > > > >> clear > > > > >> about those. > > > > >> DJ04: I wonder whether we should go with a single hash per group. > > > > >> Your > > > > >> argument against it is that it would require to re-compute the hash > > > > >> of > > > > all > > > > >> the topics when it needs to be computed. In my opinion, we could > > > > leverage > > > > >> the cached hash per topic to compute the hash of all the subscribed > > > > ones. > > > > >> We could basically combine all the hashes without having to compute > > > > >> all > > > > of > > > > >> them. This approach has a few benefits. 1) We could get rid of > > > > >> the ConsumerGroupPartitionMetadata record as we could store the hash > > > > with > > > > >> the group epoch. 2) We could get rid of the Map that we keep in each > > > > group > > > > >> to store the hashed corresponding to the subscribed topics. > > > > >> DJ05: Regarding the cache again, I wonder if we should actually store > > > > the > > > > >> hash in the metadata image instead of maintaining it somewhere else. > > > > >> We > > > > >> could still lazily compute it. The benefit is that the value would be > > > > tight > > > > >> to the topic. I have not really looked into it. Would it be an > > > > >> option? > > > > >> > > > > >> I'll be away for two weeks starting from Saturday. I kindly ask you > > > > >> to > > > > wait > > > > >> on me if we cannot conclude this week. > > > > >> > > > > >> Best, > > > > >> David > > > > >> > > > > >> On Tue, Nov 5, 2024 at 1:43 PM Frank Yang <yangp...@gmail.com> wrote: > > > > >> > > > > >>> Hi Chia-Ping, > > > > >>> > > > > >>> Thanks for the review and suggestions. > > > > >>> > > > > >>> Q0: Add how rack change and how it affects topic partition. > > > > >>> > > > > >>> Q1: Add why we need a balance algorithm to Motivation section. > > > > >>> > > > > >>> Q2: After checking again, we don’t need to update cache when we > > > > >>> replay > > > > >>> records. We only need to renew it in consumer heartbeat. > > > > >>> > > > > >>> Q3: Add a new section “Topic Hash Function”. > > > > >>> > > > > >>> Thanks. > > > > >>> PoAn > > > > >>> > > > > >>>> On Nov 1, 2024, at 4:39 PM, Chia-Ping Tsai <chia7...@gmail.com> > > > > wrote: > > > > >>>> > > > > >>>> hi PoAn > > > > >>>> > > > > >>>> Thanks for for this KIP! > > > > >>>> > > > > >>>> Q0: Could you add more details about `A topic partition has rack > > > > change`? > > > > >>>> IIRC, the "rack change" includes both follower and leader, right? > > > > >>>> > > > > >>>> Q1: Could you please add the 'concerns' we discussed to the > > > > >>>> Motivation > > > > >>>> section? This should include topics like 'computations' and 'space > > > > >>> usage'. > > > > >>>> > > > > >>>> Q2: `The group coordinator can leverage it to add a new topic > > > > hash.`This > > > > >>>> description seems a bit off to me. Why do we need to update the > > > > >>>> cache > > > > at > > > > >>>> this phase? The cache is intended to prevent duplicate computations > > > > >>> caused > > > > >>>> by heartbeat requests that occur between two metadata change > > > > >>>> events. > > > > >>>> Therefore, we could even remove the changed topics from caches on a > > > > >>>> metadata change, as the first heartbeat request would update the > > > > caches > > > > >>> for > > > > >>>> all changed topics. > > > > >>>> > > > > >>>> Q3: Could you please include a section about the choice of hash > > > > >>>> implementation? The hash implementation must be consistent across > > > > >>> different > > > > >>>> JDKs, so we use Murmur3 to generate the hash value. > > > > >>>> > > > > >>>> Best, > > > > >>>> Chia-Ping > > > > >>>> > > > > >>>> > > > > >>>> Frank Yang <yangp...@gmail.com> 於 2024年11月1日 週五 下午3:57寫道: > > > > >>>> > > > > >>>>> Hi all, > > > > >>>>> > > > > >>>>> I would like to start a discussion thread on KIP-1101. Trigger > > > > rebalance > > > > >>>>> on rack topology changes. In this KIP, we aim to use less memory / > > > > disk > > > > >>>>> resources to detect rack changes in the new coordinator. > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > >>> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1101%3A+Trigger+rebalance+on+rack+topology+changes > > > > >>>>> > > > > >>>>> Please take a look and feel free to share any thoughts. > > > > >>>>> > > > > >>>>> Thanks. > > > > >>>>> PoAn > > > > >>> > > > > >>> > > > > >> > > > > > > > > > > > > >