Hi PoAn and Chia-Ping, Thanks for your responses.
DJ02: Sorry, I was not clear. I was wondering whether we could compute the hash without having to convert to bytes before. Guava has a nice interface for this allowing to incrementally add primitive types to the hash. We can discuss this in the PR as it is an implementation detail. DJ03: Thanks. I don't think that the replicas are updated when a broker shuts down. What you said applies to the ISR. I suppose that we can rely on the ISR changes to trigger updates. It is also worth noting that TopicsDelta#changedTopics is updated for every change (e.g. ISR change, leader change, replicas change, etc.). I suppose that it is OK but it seems that it will trigger refreshes which are not necessary. However, a rebalance won't be triggered because the hash won't change. DJ03.1: I suppose that we will continue to rely on ModernGroup#requestMetadataRefresh to notify groups that must refresh their hashes. Is my understanding correct? DJ05: Fair enough. DJ06: You mention in two places that you would like to combine hashes by additioning them. I wonder if this is a good practice. Intuitively, I would have used XOR or hashed the hashed. Guava has a method for combining hashes. It may be worth looking into the algorithm used. DJ07: I would rename "AllTopicHash" to "MetadataHash" in order to be more generic. DJ08: Regarding the per topic hash, I wonder whether we should precise in the KIP how we will compute it. I had the following in mind: hash(topicName; numPartitions; [partitionId;sorted racks]). We could also add a magic byte at the first element as a sort of version. I am not sure whether it is needed though. I was thinking about this while imagining how we would handle changing the format in the future. DJ09: It would be great if we could provide more details about backward compatibility. What happens when the cluster is upgraded or downgraded? DJ10: We should update KIP-1071. It may be worth pigging them in the discussion thread of KIP-1071. Best, David On Tue, Dec 17, 2024 at 9:25 AM PoAn Yang <yangp...@gmail.com> wrote: > Hi Chia-Ping / David / Andrew, > > Thanks for the review and suggestions. > > DJ01: Removed all implementation details. > > DJ02: Does the “incrementally” mean that we only calculate the difference > parts? > For example, if the number of partition change, we only calculate the hash > of number of partition and reconstruct it to the topic hash. > IMO, we only calculate topic hash one time. With cache mechanism, the > value can be reused in different groups on a same broker. > The CPU usage for this part is not very high. > > DJ03: Added the update path to KIP for both cases. > > DJ04: Yes, it’s a good idea. With cache mechanism and single hash per > group, we can balance cpu and disk usage. > > DJ05: Currently, the topic hash is only used in coordinator. However, the > metadata image is used in many different places. > How about we move the hash to metadata image when we find more use cases? > > AS1, AS2: Thanks for the reminder. I will simply delete > ShareGroupPartitionMetadataKey/Value and add a new field to > ShareGroupMetadataValue. > > Best, > PoAn > > > On Dec 17, 2024, at 5:50 AM, Andrew Schofield < > andrew_schofield_j...@outlook.com> wrote: > > > > Hi PoAn, > > Thanks for the KIP. > > > > AS1: From the point of view of share groups, the API and record schema > > definitions are unstable in AK 4.0. In AK 4.1, we will start supporting > proper > > versioning. As a result, I think you do not need to deprecate the fields > in the > > ShareGroupPartitionMetadataValue. Just include the schema for the fields > > which are actually needed, and I'll update the schema in the code when > > the KIP is implemented. > > > > AS2: In the event that DJ04 actually removes the need for > > ConsumerGroupPartitionMetadataKey/Value entirely, I would simply > > delete ShareGroupPartitionMetadataKey/Value, assuming that it is > > accepted in time for AK 4.1. > > > > > > Thanks, > > Andrew > > ________________________________________ > > From: Chia-Ping Tsai <chia7...@apache.org> > > Sent: 16 December 2024 16:27 > > To: dev@kafka.apache.org <dev@kafka.apache.org> > > Subject: Re: [DISCUSS] KIP-1101: Trigger rebalance on rack topology > changes > > > > hi David > > > >> DJ05 > > > > One of the benefits of having a single hash per group (DJ04) is the > reduction in the size of stored data. Additionally, the cost of > re-computing can be minimized thanks to caching. So I'm + 1 to DJ04. > However, the advantage of storing the topic cache in the metadata image is > somewhat unclear to me. Could you please provide more details on what you > mean by "tight"?Furthermore, since the metadata image is a thread-safe > object, we need to ensure that the lazy initialization is also thread-safe. > If no other components require the cache, it would be better to keep the > caches within the coordinator. > > > > Best, > > Chia-Ping > > > > On 2024/12/16 14:01:35 David Jacot wrote: > >> Hi PoAn, > >> > >> Thanks for the KIP. I have some comments about it. > >> > >> DJ01: Please, remove all the code from the KIP. We only care about > public > >> interface changes, not about implementation details. > >> DJ02: Regarding the hash computation, I agree that we should use > Murmur3. > >> However, I don't quite like the implementation that you shared. I > wonder if > >> we could make it work incrementally instead of computing a hash of > >> everything and combining them. > >> DJ03: Regarding the cache, my understanding is that the cache is > populated > >> when a topic without hash is seen in a HB request and the cache is > cleaned > >> up when topics are deleted based on the metadata image. However, the > update > >> path is not clear. Let's say that a partition is added to a topic, how > does > >> it detect it? Let's also imagine that the racks of a partition have > >> changed, how does it detect it? In the KIP, it would be nice to be clear > >> about those. > >> DJ04: I wonder whether we should go with a single hash per group. Your > >> argument against it is that it would require to re-compute the hash of > all > >> the topics when it needs to be computed. In my opinion, we could > leverage > >> the cached hash per topic to compute the hash of all the subscribed > ones. > >> We could basically combine all the hashes without having to compute all > of > >> them. This approach has a few benefits. 1) We could get rid of > >> the ConsumerGroupPartitionMetadata record as we could store the hash > with > >> the group epoch. 2) We could get rid of the Map that we keep in each > group > >> to store the hashed corresponding to the subscribed topics. > >> DJ05: Regarding the cache again, I wonder if we should actually store > the > >> hash in the metadata image instead of maintaining it somewhere else. We > >> could still lazily compute it. The benefit is that the value would be > tight > >> to the topic. I have not really looked into it. Would it be an option? > >> > >> I'll be away for two weeks starting from Saturday. I kindly ask you to > wait > >> on me if we cannot conclude this week. > >> > >> Best, > >> David > >> > >> On Tue, Nov 5, 2024 at 1:43 PM Frank Yang <yangp...@gmail.com> wrote: > >> > >>> Hi Chia-Ping, > >>> > >>> Thanks for the review and suggestions. > >>> > >>> Q0: Add how rack change and how it affects topic partition. > >>> > >>> Q1: Add why we need a balance algorithm to Motivation section. > >>> > >>> Q2: After checking again, we don’t need to update cache when we replay > >>> records. We only need to renew it in consumer heartbeat. > >>> > >>> Q3: Add a new section “Topic Hash Function”. > >>> > >>> Thanks. > >>> PoAn > >>> > >>>> On Nov 1, 2024, at 4:39 PM, Chia-Ping Tsai <chia7...@gmail.com> > wrote: > >>>> > >>>> hi PoAn > >>>> > >>>> Thanks for for this KIP! > >>>> > >>>> Q0: Could you add more details about `A topic partition has rack > change`? > >>>> IIRC, the "rack change" includes both follower and leader, right? > >>>> > >>>> Q1: Could you please add the 'concerns' we discussed to the Motivation > >>>> section? This should include topics like 'computations' and 'space > >>> usage'. > >>>> > >>>> Q2: `The group coordinator can leverage it to add a new topic > hash.`This > >>>> description seems a bit off to me. Why do we need to update the cache > at > >>>> this phase? The cache is intended to prevent duplicate computations > >>> caused > >>>> by heartbeat requests that occur between two metadata change events. > >>>> Therefore, we could even remove the changed topics from caches on a > >>>> metadata change, as the first heartbeat request would update the > caches > >>> for > >>>> all changed topics. > >>>> > >>>> Q3: Could you please include a section about the choice of hash > >>>> implementation? The hash implementation must be consistent across > >>> different > >>>> JDKs, so we use Murmur3 to generate the hash value. > >>>> > >>>> Best, > >>>> Chia-Ping > >>>> > >>>> > >>>> Frank Yang <yangp...@gmail.com> 於 2024年11月1日 週五 下午3:57寫道: > >>>> > >>>>> Hi all, > >>>>> > >>>>> I would like to start a discussion thread on KIP-1101. Trigger > rebalance > >>>>> on rack topology changes. In this KIP, we aim to use less memory / > disk > >>>>> resources to detect rack changes in the new coordinator. > >>>>> > >>>>> > >>>>> > >>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1101%3A+Trigger+rebalance+on+rack+topology+changes > >>>>> > >>>>> Please take a look and feel free to share any thoughts. > >>>>> > >>>>> Thanks. > >>>>> PoAn > >>> > >>> > >> > >