Hi David and Lucas, Thanks for the review and great suggestion.
DJ01: Add the statement about rack-aware rebalance was removed in 4.0. DJ02: Change ConsumerGroupPartitionMetadataKey and ConsumerGroupPartitionMetadataValue from removed to deprecated. DJ03: In my sample implementation <https://github.com/apache/kafka/pull/17444>, the cache is maintained in GroupCoordinatorManager. A GroupCoordinatorShard only have one GroupCoordinatorManager, so I think it’s similar to a cache per GroupCoordinatorShard instance. DJ04: Add the order about how topic data will be computed to get a topic hash. DJ05: This is a good question. In topic hash, the Guava uses “final mix” to ensure avalanche effect. However for the single hash of subscribed topics in a group, we cannot use simple sum, because we need to avoid the edge case. For example, hash(a+b) and hash(b+a) is same. To fix this case, the hash function sorts topics by name and the topic hash multiplied by the position value like hash(a + 2b) and hash(b + 2a). I add this case and regular expression case to the KIP. DJ06: It’s good to add tombstone of ConsumerGroupPartitionMetadata, so log compaction can remove old data. Add this part to compatibility section. DJ07: In previous discussion, you mentioned that assignors are sticky. If the topology of the group is the same, the upgrade/downgrade rebalance doesn’t change anything. >>>>>>>>>> DJ08: In my opinion, changing the format will be rare so it is >>>>>>>>>> acceptable if rebalances are triggered in this case on >>>>>>>>>> upgrade/downgrade. It is also what will happen if a cluster is >>>>>>>>>> downgraded from 4.1 (with this KIP) to 4.0. The rebalance won't >>>> change >>>>>>>>>> anything if the topology of the group is the same because >> assignors >>>>>>>>>> are sticky. The default ones are and we recommend custom ones to >>>> also >>>>>>>>>> be. With `group.version` change, we need to maintain two code in 4.1. One is subscription metadata without rack and another is group hash. Another drawback is that changing `group.version` will trigger all group rebalance at the same time. IMHO, I would prefer to keep the code simple. DJ08: Add ConsumerGroupHeartbeatRequestTest to test plan. LB01: Add the removal of StreamGroupPartitionMetadataKey / StreamGrouprtitionMetadataValue to public interface change section. Thank you. Regards, PoAn > On Apr 9, 2025, at 5:11 PM, Lucas Brutschy <lbruts...@confluent.io.INVALID> > wrote: > > Hi PoAn, > > LB01: please add the removal of StreamsGroupPartitionMetadataKey / > StreamsGroupPartitionMetadataValue to the KIP. Since these were not > yet included in 4.0, we can just remove them as long as this KIP hits > 4.1. I can help with the integration into the `streamsGroupHeartbeat`, > as it will be a little different, but we do not need to call this out > in the KIP. > > Thanks for your work on this! > > Cheers, > Lucas > > On Wed, Apr 9, 2025 at 9:59 AM David Jacot <dja...@confluent.io.invalid> > wrote: >> >> Hi PoAn, >> >> I finally had a bit of time to review your KIP. First, let me say that the >> KIP is well written! I have some more comments: >> >> DJ01: In the motivation, could you please mention that we actually removed >> triggering rebalances based on rack changes in 4.0? You explained very well >> why we removed it but it is not clear that we removed it. >> >> DJ02: We cannot delete ConsumerGroupPartitionMetadataKey >> and ConsumerGroupPartitionMetadataValue because the coordinator must still >> be able to read the record from the log when the cluster is upgraded. >> However, we can deprecate them. >> >> DJ03: Could you clarify how the cache will be scoped? My understanding is >> that we will maintain a cache per GroupCoordinatorShard instance. Is this >> correct? >> >> DJ04: Regarding the topic hash function, would it be possible to specify >> how the hash will be computed instead of providing the signature of the >> method? >> >> DJ05: I have a general question about how you propose to combine hashes. Is >> using a sum enough? I was looking at the Guava implementation and they also >> apply a "final mix" to ensure an avalanche effect. It would be great to >> elaborate a bit more on this in the KIP. >> >> DJ06: After the upgrade, we need to ensure that a tombstone is written for >> the ConsumerGroupPartitionMetadata record if it was present. I suppose that >> this is something that we could do when the next metadata hash is computed. >> Have you thought about it? >> >> DJ07: I wonder whether we should gate the change with `group.version`. My >> concern is that during upgrade, not all the coordinators will support the >> new way and groups may bounce between those coordinators as their >> underlying __consumer_offsets partitions are moved within the cluster >> (during the roll). This may trigger unnecessary rebalances. One way to >> avoid this is to gate the change with `group.version` which is only bumped >> at the end of the cluster upgrade process. What do you think? >> >> DJ08: Regarding the test plan, we should also extend >> ConsumerGroupHeartbeatRequestTest to cover the new feature. >> >> Thanks for your hard work on this one. Let's try to get it in 4.1. >> >> Best, >> David >> >> On Mon, Mar 24, 2025 at 8:17 PM Jeff Kim <jeff....@confluent.io.invalid> >> wrote: >> >>> Hi PoAn, >>> >>> Thanks for the clarification! >>> >>> Best, >>> Jeff >>> >>> On Mon, Mar 24, 2025 at 11:43 AM PoAn Yang <yangp...@gmail.com> wrote: >>> >>>> Hi Jeff, >>>> >>>> Thanks for the review. >>>> >>>> JK01. Yes, we already maintain `groupByTopics` data structure in the >>>> coordinator. If a group leaves, >>>> it checks whether a topic is subscribed by any group. If it’s not, remove >>>> topic hash from the cache. >>>> Please check the sample implementation about >>>> GroupMetadataManager#unsubscribeGroupFromTopic >>>> in #17444 <https://github.com/apache/kafka/pull/17444>. >>>> >>>> JK02: This KIP replaces subscription metadata by a group hash, so the >>>> coordinator calculates group >>>> hash where we calculated subscription metadata. The trigger points are >>>> like a group member is updated, >>>> , new group member, new subscribed topic names, new regular expressions, >>>> and new metadata (topic change). >>>> >>>> Best Regards, >>>> PoAn >>>> >>>>> On Mar 22, 2025, at 3:25 AM, Jeff Kim <jeff....@confluent.io.INVALID> >>>> wrote: >>>>> >>>>> Hi PoAn, >>>>> >>>>> I was late to the discussion but I had some questions. >>>>> >>>>> JK01. I foresee scenarios where the cache leaves stale topic hashes >>>> around >>>>> when a group leaves. >>>>> Should we maintain a counter for the number of groups subscribed to the >>>>> topic (hash) to only keep active topics? >>>>> >>>>> JK02. Can you help me understand when we would actually be computing >>>>> group-level hashes, is it on every >>>>> heartbeat? >>>>> >>>>> Thanks, >>>>> Jeff >>>>> >>>>> On Wed, Mar 12, 2025 at 11:33 PM PoAn Yang <yangp...@gmail.com> wrote: >>>>> >>>>>> Hi David, >>>>>> >>>>>> Yes, I will continue working on this. >>>>>> >>>>>> DJ02: As Kirk noted earlier, since the topic hash calculation is on >>>> server >>>>>> side (not client-side), we can safely introduce Guava as a dependency. >>>>>> >>>>>> If there is no other discussion, we can start voting in >>>>>> https://lists.apache.org/thread/j90htmphjk783p697jjfg1xt8thmy33p. We >>>> can >>>>>> discuss implementation details in PR >>>>>> https://github.com/apache/kafka/pull/17444. I will resolve merge >>>>>> conflicts by March 14th. >>>>>> >>>>>> Thanks, >>>>>> PoAn >>>>>> >>>>>>> On Mar 13, 2025, at 12:43 AM, David Jacot <david.ja...@gmail.com> >>>> wrote: >>>>>>> >>>>>>> Hi PoAn, >>>>>>> >>>>>>> As we are getting closer to releasing 4.0, I think that we can resume >>>>>>> working on this one if you are still interested. Are you? I would >>> like >>>>>>> to have it in 4.1. >>>>>>> >>>>>>> Best, >>>>>>> David >>>>>>> >>>>>>> On Wed, Jan 15, 2025 at 1:26 AM Kirk True <k...@kirktrue.pro> wrote: >>>>>>>> >>>>>>>> Hi all, >>>>>>>> >>>>>>>> Hopefully a quick question... >>>>>>>> >>>>>>>> KT01. Will clients calculate the topic hash on the client? Based on >>>> the >>>>>> current state of the KIP and PR, I would have thought "no", but I ask >>>> based >>>>>> on the discussion around the possible use of Guava on client. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Kirk >>>>>>>> >>>>>>>> On Mon, Jan 6, 2025, at 9:11 AM, David Jacot wrote: >>>>>>>>> Hi PoAn, >>>>>>>>> >>>>>>>>> Thanks for the update. I haven't read the updated KIP yet. >>>>>>>>> >>>>>>>>> DJ02: I am not sure about using Guava as a dependency. I mentioned >>> it >>>>>> more >>>>>>>>> as an inspiration/reference. I suppose that we could use it on the >>>>>> server >>>>>>>>> but we should definitely not use it on the client. I am not sure >>> how >>>>>> others >>>>>>>>> feel about it. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> David >>>>>>>>> >>>>>>>>> On Mon, Jan 6, 2025 at 5:21 AM PoAn Yang <yangp...@gmail.com> >>> wrote: >>>>>>>>> >>>>>>>>>> Hi Chia-Ping / David / Lucas, >>>>>>>>>> >>>>>>>>>> Happy new year and thanks for the review. >>>>>>>>>> >>>>>>>>>> DJ02: Thanks for the suggestion. I updated the PR to use Guava. >>>>>>>>>> >>>>>>>>>> DJ03: Yes, I updated the description to mention ISR change, >>>>>>>>>> add altering partition reassignment case, and mention that >>>>>>>>>> non-related topic change doesn’t trigger a rebalance. >>>>>>>>>> DJ03.1: Yes, I will keep using ModernGroup#requestMetadataRefresh >>>>>>>>>> to notify group. >>>>>>>>>> >>>>>>>>>> DJ06: Updated the PR to use Guava Hashing#combineUnordered >>>>>>>>>> function to combine topic hash. >>>>>>>>>> >>>>>>>>>> DJ07: Renamed it to MetadataHash. >>>>>>>>>> >>>>>>>>>> DJ08: Added a sample hash function to the KIP and use first byte >>> as >>>>>> magic >>>>>>>>>> byte. This is also included in latest PR. >>>>>>>>>> >>>>>>>>>> DJ09: Added two paragraphs about upgraded and downgraded. >>>>>>>>>> >>>>>>>>>> DJ10: According to Lucas’s comment, I add >>> StreamsGroupMetadataValue >>>>>> update >>>>>>>>>> to this KIP. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> PoAn >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> On Dec 20, 2024, at 3:58 PM, Chia-Ping Tsai <chia7...@gmail.com> >>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> because assignors are sticky. >>>>>>>>>>> >>>>>>>>>>> I forgot about that spec again :( >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> David Jacot <david.ja...@gmail.com> 於 2024年12月20日 週五 下午3:41寫道: >>>>>>>>>>> >>>>>>>>>>>> Hi Chia-Ping, >>>>>>>>>>>> >>>>>>>>>>>> DJ08: In my opinion, changing the format will be rare so it is >>>>>>>>>>>> acceptable if rebalances are triggered in this case on >>>>>>>>>>>> upgrade/downgrade. It is also what will happen if a cluster is >>>>>>>>>>>> downgraded from 4.1 (with this KIP) to 4.0. The rebalance won't >>>>>> change >>>>>>>>>>>> anything if the topology of the group is the same because >>>> assignors >>>>>>>>>>>> are sticky. The default ones are and we recommend custom ones to >>>>>> also >>>>>>>>>>>> be. >>>>>>>>>>>> >>>>>>>>>>>> Best, >>>>>>>>>>>> David >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Dec 20, 2024 at 2:11 AM Chia-Ping Tsai < >>>> chia7...@apache.org >>>>>>> >>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> ummm, it does not work for downgrade as the old coordinator has >>>> no >>>>>> idea >>>>>>>>>>>> about new format :( >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On 2024/12/20 00:57:27 Chia-Ping Tsai wrote: >>>>>>>>>>>>>> hi David >>>>>>>>>>>>>> >>>>>>>>>>>>>>> DJ08: >>>>>>>>>>>>>> >>>>>>>>>>>>>> That's a good question. If the "hash" lacks version control, >>> it >>>>>> could >>>>>>>>>>>> trigger a series of unnecessary rebalances. However, adding >>>>>> additional >>>>>>>>>>>> information ("magic") to the hash does not help the upgraded >>>>>> coordinator >>>>>>>>>>>> determine the "version." This means that the upgraded >>> coordinator >>>>>> would >>>>>>>>>>>> still trigger unnecessary rebalances because it has no way to >>> know >>>>>> which >>>>>>>>>>>> format to use when comparing the hash. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Perhaps we can add a new field to ConsumerGroupMetadataValue >>> to >>>>>>>>>>>> indicate the version of the "hash." This would allow the >>>>>> coordinator, >>>>>>>>>> when >>>>>>>>>>>> handling subscription metadata, to compute the old hash and >>>>>> determine >>>>>>>>>>>> whether an epoch bump is necessary. Additionally, the >>> coordinator >>>>>> can >>>>>>>>>>>> generate a new record to upgrade the hash without requiring an >>>> epoch >>>>>>>>>> bump. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Another issue is whether the coordinator should cache all >>>>>> versions of >>>>>>>>>>>> the hash. I believe this is necessary; otherwise, during an >>>> upgrade, >>>>>>>>>> there >>>>>>>>>>>> would be extensive recomputing of old hashes. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I believe this idea should also work for downgrades, and >>> that's >>>>>> just >>>>>>>>>>>> my two cents. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Best, >>>>>>>>>>>>>> Chia-Ping >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On 2024/12/19 14:39:41 David Jacot wrote: >>>>>>>>>>>>>>> Hi PoAn and Chia-Ping, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks for your responses. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> DJ02: Sorry, I was not clear. I was wondering whether we >>> could >>>>>>>>>>>> compute the >>>>>>>>>>>>>>> hash without having to convert to bytes before. Guava has a >>>> nice >>>>>>>>>>>> interface >>>>>>>>>>>>>>> for this allowing to incrementally add primitive types to the >>>>>> hash. >>>>>>>>>>>> We can >>>>>>>>>>>>>>> discuss this in the PR as it is an implementation detail. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> DJ03: Thanks. I don't think that the replicas are updated >>> when >>>> a >>>>>>>>>>>> broker >>>>>>>>>>>>>>> shuts down. What you said applies to the ISR. I suppose that >>> we >>>>>> can >>>>>>>>>>>> rely on >>>>>>>>>>>>>>> the ISR changes to trigger updates. It is also worth noting >>>>>>>>>>>>>>> that TopicsDelta#changedTopics is updated for every change >>>> (e.g. >>>>>> ISR >>>>>>>>>>>>>>> change, leader change, replicas change, etc.). I suppose that >>>> it >>>>>> is >>>>>>>>>>>> OK but >>>>>>>>>>>>>>> it seems that it will trigger refreshes which are not >>>> necessary. >>>>>>>>>>>> However, a >>>>>>>>>>>>>>> rebalance won't be triggered because the hash won't change. >>>>>>>>>>>>>>> DJ03.1: I suppose that we will continue to rely on >>>>>>>>>>>>>>> ModernGroup#requestMetadataRefresh to notify groups that must >>>>>>>>>>>> refresh their >>>>>>>>>>>>>>> hashes. Is my understanding correct? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> DJ05: Fair enough. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> DJ06: You mention in two places that you would like to >>> combine >>>>>>>>>>>> hashes by >>>>>>>>>>>>>>> additioning them. I wonder if this is a good practice. >>>>>> Intuitively, >>>>>>>>>>>> I would >>>>>>>>>>>>>>> have used XOR or hashed the hashed. Guava has a method for >>>>>> combining >>>>>>>>>>>>>>> hashes. It may be worth looking into the algorithm used. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> DJ07: I would rename "AllTopicHash" to "MetadataHash" in >>> order >>>>>> to be >>>>>>>>>>>> more >>>>>>>>>>>>>>> generic. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> DJ08: Regarding the per topic hash, I wonder whether we >>> should >>>>>>>>>>>> precise in >>>>>>>>>>>>>>> the KIP how we will compute it. I had the following in mind: >>>>>>>>>>>>>>> hash(topicName; numPartitions; [partitionId;sorted racks]). >>> We >>>>>> could >>>>>>>>>>>> also >>>>>>>>>>>>>>> add a magic byte at the first element as a sort of version. I >>>> am >>>>>> not >>>>>>>>>>>> sure >>>>>>>>>>>>>>> whether it is needed though. I was thinking about this while >>>>>>>>>>>> imagining how >>>>>>>>>>>>>>> we would handle changing the format in the future. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> DJ09: It would be great if we could provide more details >>> about >>>>>>>>>>>> backward >>>>>>>>>>>>>>> compatibility. What happens when the cluster is upgraded or >>>>>>>>>>>> downgraded? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> DJ10: We should update KIP-1071. It may be worth pigging them >>>> in >>>>>> the >>>>>>>>>>>>>>> discussion thread of KIP-1071. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>> David >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Tue, Dec 17, 2024 at 9:25 AM PoAn Yang < >>> yangp...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi Chia-Ping / David / Andrew, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks for the review and suggestions. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> DJ01: Removed all implementation details. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> DJ02: Does the “incrementally” mean that we only calculate >>> the >>>>>>>>>>>> difference >>>>>>>>>>>>>>>> parts? >>>>>>>>>>>>>>>> For example, if the number of partition change, we only >>>>>> calculate >>>>>>>>>>>> the hash >>>>>>>>>>>>>>>> of number of partition and reconstruct it to the topic hash. >>>>>>>>>>>>>>>> IMO, we only calculate topic hash one time. With cache >>>>>> mechanism, >>>>>>>>>>>> the >>>>>>>>>>>>>>>> value can be reused in different groups on a same broker. >>>>>>>>>>>>>>>> The CPU usage for this part is not very high. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> DJ03: Added the update path to KIP for both cases. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> DJ04: Yes, it’s a good idea. With cache mechanism and single >>>>>> hash >>>>>>>>>>>> per >>>>>>>>>>>>>>>> group, we can balance cpu and disk usage. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> DJ05: Currently, the topic hash is only used in coordinator. >>>>>>>>>>>> However, the >>>>>>>>>>>>>>>> metadata image is used in many different places. >>>>>>>>>>>>>>>> How about we move the hash to metadata image when we find >>> more >>>>>> use >>>>>>>>>>>> cases? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> AS1, AS2: Thanks for the reminder. I will simply delete >>>>>>>>>>>>>>>> ShareGroupPartitionMetadataKey/Value and add a new field to >>>>>>>>>>>>>>>> ShareGroupMetadataValue. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>> PoAn >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Dec 17, 2024, at 5:50 AM, Andrew Schofield < >>>>>>>>>>>>>>>> andrew_schofield_j...@outlook.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi PoAn, >>>>>>>>>>>>>>>>> Thanks for the KIP. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> AS1: From the point of view of share groups, the API and >>>> record >>>>>>>>>>>> schema >>>>>>>>>>>>>>>>> definitions are unstable in AK 4.0. In AK 4.1, we will >>> start >>>>>>>>>>>> supporting >>>>>>>>>>>>>>>> proper >>>>>>>>>>>>>>>>> versioning. As a result, I think you do not need to >>> deprecate >>>>>>>>>>>> the fields >>>>>>>>>>>>>>>> in the >>>>>>>>>>>>>>>>> ShareGroupPartitionMetadataValue. Just include the schema >>> for >>>>>>>>>>>> the fields >>>>>>>>>>>>>>>>> which are actually needed, and I'll update the schema in >>> the >>>>>>>>>>>> code when >>>>>>>>>>>>>>>>> the KIP is implemented. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> AS2: In the event that DJ04 actually removes the need for >>>>>>>>>>>>>>>>> ConsumerGroupPartitionMetadataKey/Value entirely, I would >>>>>> simply >>>>>>>>>>>>>>>>> delete ShareGroupPartitionMetadataKey/Value, assuming that >>> it >>>>>> is >>>>>>>>>>>>>>>>> accepted in time for AK 4.1. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>> Andrew >>>>>>>>>>>>>>>>> ________________________________________ >>>>>>>>>>>>>>>>> From: Chia-Ping Tsai <chia7...@apache.org> >>>>>>>>>>>>>>>>> Sent: 16 December 2024 16:27 >>>>>>>>>>>>>>>>> To: dev@kafka.apache.org <dev@kafka.apache.org> >>>>>>>>>>>>>>>>> Subject: Re: [DISCUSS] KIP-1101: Trigger rebalance on rack >>>>>>>>>>>> topology >>>>>>>>>>>>>>>> changes >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> hi David >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> DJ05 >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> One of the benefits of having a single hash per group >>> (DJ04) >>>> is >>>>>>>>>>>> the >>>>>>>>>>>>>>>> reduction in the size of stored data. Additionally, the cost >>>> of >>>>>>>>>>>>>>>> re-computing can be minimized thanks to caching. So I'm + 1 >>> to >>>>>>>>>>>> DJ04. >>>>>>>>>>>>>>>> However, the advantage of storing the topic cache in the >>>>>> metadata >>>>>>>>>>>> image is >>>>>>>>>>>>>>>> somewhat unclear to me. Could you please provide more >>> details >>>> on >>>>>>>>>>>> what you >>>>>>>>>>>>>>>> mean by "tight"?Furthermore, since the metadata image is a >>>>>>>>>>>> thread-safe >>>>>>>>>>>>>>>> object, we need to ensure that the lazy initialization is >>> also >>>>>>>>>>>> thread-safe. >>>>>>>>>>>>>>>> If no other components require the cache, it would be better >>>> to >>>>>>>>>>>> keep the >>>>>>>>>>>>>>>> caches within the coordinator. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>> Chia-Ping >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On 2024/12/16 14:01:35 David Jacot wrote: >>>>>>>>>>>>>>>>>> Hi PoAn, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks for the KIP. I have some comments about it. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> DJ01: Please, remove all the code from the KIP. We only >>> care >>>>>>>>>>>> about >>>>>>>>>>>>>>>> public >>>>>>>>>>>>>>>>>> interface changes, not about implementation details. >>>>>>>>>>>>>>>>>> DJ02: Regarding the hash computation, I agree that we >>> should >>>>>> use >>>>>>>>>>>>>>>> Murmur3. >>>>>>>>>>>>>>>>>> However, I don't quite like the implementation that you >>>>>> shared. >>>>>>>>>>>> I >>>>>>>>>>>>>>>> wonder if >>>>>>>>>>>>>>>>>> we could make it work incrementally instead of computing a >>>>>> hash >>>>>>>>>>>> of >>>>>>>>>>>>>>>>>> everything and combining them. >>>>>>>>>>>>>>>>>> DJ03: Regarding the cache, my understanding is that the >>>> cache >>>>>> is >>>>>>>>>>>>>>>> populated >>>>>>>>>>>>>>>>>> when a topic without hash is seen in a HB request and the >>>>>> cache >>>>>>>>>>>> is >>>>>>>>>>>>>>>> cleaned >>>>>>>>>>>>>>>>>> up when topics are deleted based on the metadata image. >>>>>>>>>>>> However, the >>>>>>>>>>>>>>>> update >>>>>>>>>>>>>>>>>> path is not clear. Let's say that a partition is added to >>> a >>>>>>>>>>>> topic, how >>>>>>>>>>>>>>>> does >>>>>>>>>>>>>>>>>> it detect it? Let's also imagine that the racks of a >>>> partition >>>>>>>>>>>> have >>>>>>>>>>>>>>>>>> changed, how does it detect it? In the KIP, it would be >>> nice >>>>>> to >>>>>>>>>>>> be clear >>>>>>>>>>>>>>>>>> about those. >>>>>>>>>>>>>>>>>> DJ04: I wonder whether we should go with a single hash per >>>>>>>>>>>> group. Your >>>>>>>>>>>>>>>>>> argument against it is that it would require to re-compute >>>> the >>>>>>>>>>>> hash of >>>>>>>>>>>>>>>> all >>>>>>>>>>>>>>>>>> the topics when it needs to be computed. In my opinion, we >>>>>> could >>>>>>>>>>>>>>>> leverage >>>>>>>>>>>>>>>>>> the cached hash per topic to compute the hash of all the >>>>>>>>>>>> subscribed >>>>>>>>>>>>>>>> ones. >>>>>>>>>>>>>>>>>> We could basically combine all the hashes without having >>> to >>>>>>>>>>>> compute all >>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>> them. This approach has a few benefits. 1) We could get >>> rid >>>> of >>>>>>>>>>>>>>>>>> the ConsumerGroupPartitionMetadata record as we could >>> store >>>>>> the >>>>>>>>>>>> hash >>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>> the group epoch. 2) We could get rid of the Map that we >>> keep >>>>>> in >>>>>>>>>>>> each >>>>>>>>>>>>>>>> group >>>>>>>>>>>>>>>>>> to store the hashed corresponding to the subscribed >>> topics. >>>>>>>>>>>>>>>>>> DJ05: Regarding the cache again, I wonder if we should >>>>>> actually >>>>>>>>>>>> store >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> hash in the metadata image instead of maintaining it >>>> somewhere >>>>>>>>>>>> else. We >>>>>>>>>>>>>>>>>> could still lazily compute it. The benefit is that the >>> value >>>>>>>>>>>> would be >>>>>>>>>>>>>>>> tight >>>>>>>>>>>>>>>>>> to the topic. I have not really looked into it. Would it >>> be >>>> an >>>>>>>>>>>> option? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I'll be away for two weeks starting from Saturday. I >>> kindly >>>>>> ask >>>>>>>>>>>> you to >>>>>>>>>>>>>>>> wait >>>>>>>>>>>>>>>>>> on me if we cannot conclude this week. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>> David >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Tue, Nov 5, 2024 at 1:43 PM Frank Yang < >>>> yangp...@gmail.com >>>>>>> >>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hi Chia-Ping, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thanks for the review and suggestions. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Q0: Add how rack change and how it affects topic >>> partition. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Q1: Add why we need a balance algorithm to Motivation >>>>>> section. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Q2: After checking again, we don’t need to update cache >>>> when >>>>>>>>>>>> we replay >>>>>>>>>>>>>>>>>>> records. We only need to renew it in consumer heartbeat. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Q3: Add a new section “Topic Hash Function”. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thanks. >>>>>>>>>>>>>>>>>>> PoAn >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Nov 1, 2024, at 4:39 PM, Chia-Ping Tsai < >>>>>>>>>>>> chia7...@gmail.com> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> hi PoAn >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thanks for for this KIP! >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Q0: Could you add more details about `A topic partition >>>> has >>>>>>>>>>>> rack >>>>>>>>>>>>>>>> change`? >>>>>>>>>>>>>>>>>>>> IIRC, the "rack change" includes both follower and >>> leader, >>>>>>>>>>>> right? >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Q1: Could you please add the 'concerns' we discussed to >>>> the >>>>>>>>>>>> Motivation >>>>>>>>>>>>>>>>>>>> section? This should include topics like 'computations' >>>> and >>>>>>>>>>>> 'space >>>>>>>>>>>>>>>>>>> usage'. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Q2: `The group coordinator can leverage it to add a new >>>>>> topic >>>>>>>>>>>>>>>> hash.`This >>>>>>>>>>>>>>>>>>>> description seems a bit off to me. Why do we need to >>>> update >>>>>>>>>>>> the cache >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>>> this phase? The cache is intended to prevent duplicate >>>>>>>>>>>> computations >>>>>>>>>>>>>>>>>>> caused >>>>>>>>>>>>>>>>>>>> by heartbeat requests that occur between two metadata >>>> change >>>>>>>>>>>> events. >>>>>>>>>>>>>>>>>>>> Therefore, we could even remove the changed topics from >>>>>>>>>>>> caches on a >>>>>>>>>>>>>>>>>>>> metadata change, as the first heartbeat request would >>>> update >>>>>>>>>>>> the >>>>>>>>>>>>>>>> caches >>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>> all changed topics. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Q3: Could you please include a section about the choice >>> of >>>>>>>>>>>> hash >>>>>>>>>>>>>>>>>>>> implementation? The hash implementation must be >>> consistent >>>>>>>>>>>> across >>>>>>>>>>>>>>>>>>> different >>>>>>>>>>>>>>>>>>>> JDKs, so we use Murmur3 to generate the hash value. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>> Chia-Ping >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Frank Yang <yangp...@gmail.com> 於 2024年11月1日 週五 >>> 下午3:57寫道: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I would like to start a discussion thread on KIP-1101. >>>>>>>>>>>> Trigger >>>>>>>>>>>>>>>> rebalance >>>>>>>>>>>>>>>>>>>>> on rack topology changes. In this KIP, we aim to use >>> less >>>>>>>>>>>> memory / >>>>>>>>>>>>>>>> disk >>>>>>>>>>>>>>>>>>>>> resources to detect rack changes in the new >>> coordinator. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>> >>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1101%3A+Trigger+rebalance+on+rack+topology+changes >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Please take a look and feel free to share any thoughts. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Thanks. >>>>>>>>>>>>>>>>>>>>> PoAn >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>> >>>>>> >>>> >>>> >>>