Hi PoAn, from my side, we are good to move to a vote on this one.
Cheers, Lucas On Thu, Apr 10, 2025 at 11:29 AM PoAn Yang <yangp...@gmail.com> wrote: > > Hi David and Lucas, > > Thanks for the review and great suggestion. > > DJ01: Add the statement about rack-aware rebalance was removed in 4.0. > > DJ02: Change ConsumerGroupPartitionMetadataKey and > ConsumerGroupPartitionMetadataValue from removed to deprecated. > > DJ03: In my sample implementation > <https://github.com/apache/kafka/pull/17444>, the cache is maintained in > GroupCoordinatorManager. > A GroupCoordinatorShard only have one GroupCoordinatorManager, so I think > it’s similar to > a cache per GroupCoordinatorShard instance. > > DJ04: Add the order about how topic data will be computed to get a topic hash. > > DJ05: This is a good question. In topic hash, the Guava uses “final mix” to > ensure avalanche > effect. However for the single hash of subscribed topics in a group, we > cannot use simple > sum, because we need to avoid the edge case. For example, hash(a+b) and > hash(b+a) is > same. To fix this case, the hash function sorts topics by name and the topic > hash multiplied > by the position value like hash(a + 2b) and hash(b + 2a). I add this case and > regular expression case to the KIP. > > DJ06: It’s good to add tombstone of ConsumerGroupPartitionMetadata, so log > compaction > can remove old data. Add this part to compatibility section. > > DJ07: In previous discussion, you mentioned that assignors are sticky. If the > topology of the > group is the same, the upgrade/downgrade rebalance doesn’t change anything. > > >>>>>>>>>> DJ08: In my opinion, changing the format will be rare so it is > >>>>>>>>>> acceptable if rebalances are triggered in this case on > >>>>>>>>>> upgrade/downgrade. It is also what will happen if a cluster is > >>>>>>>>>> downgraded from 4.1 (with this KIP) to 4.0. The rebalance won't > >>>> change > >>>>>>>>>> anything if the topology of the group is the same because > >> assignors > >>>>>>>>>> are sticky. The default ones are and we recommend custom ones to > >>>> also > >>>>>>>>>> be. > > With `group.version` change, we need to maintain two code in 4.1. One is > subscription > metadata without rack and another is group hash. Another drawback is that > changing > `group.version` will trigger all group rebalance at the same time. IMHO, I > would prefer > to keep the code simple. > > DJ08: Add ConsumerGroupHeartbeatRequestTest to test plan. > > LB01: Add the removal of StreamGroupPartitionMetadataKey / > StreamGrouprtitionMetadataValue to public interface change section. > > Thank you. > > Regards, > PoAn > > > On Apr 9, 2025, at 5:11 PM, Lucas Brutschy <lbruts...@confluent.io.INVALID> > > wrote: > > > > Hi PoAn, > > > > LB01: please add the removal of StreamsGroupPartitionMetadataKey / > > StreamsGroupPartitionMetadataValue to the KIP. Since these were not > > yet included in 4.0, we can just remove them as long as this KIP hits > > 4.1. I can help with the integration into the `streamsGroupHeartbeat`, > > as it will be a little different, but we do not need to call this out > > in the KIP. > > > > Thanks for your work on this! > > > > Cheers, > > Lucas > > > > On Wed, Apr 9, 2025 at 9:59 AM David Jacot <dja...@confluent.io.invalid> > > wrote: > >> > >> Hi PoAn, > >> > >> I finally had a bit of time to review your KIP. First, let me say that the > >> KIP is well written! I have some more comments: > >> > >> DJ01: In the motivation, could you please mention that we actually removed > >> triggering rebalances based on rack changes in 4.0? You explained very well > >> why we removed it but it is not clear that we removed it. > >> > >> DJ02: We cannot delete ConsumerGroupPartitionMetadataKey > >> and ConsumerGroupPartitionMetadataValue because the coordinator must still > >> be able to read the record from the log when the cluster is upgraded. > >> However, we can deprecate them. > >> > >> DJ03: Could you clarify how the cache will be scoped? My understanding is > >> that we will maintain a cache per GroupCoordinatorShard instance. Is this > >> correct? > >> > >> DJ04: Regarding the topic hash function, would it be possible to specify > >> how the hash will be computed instead of providing the signature of the > >> method? > >> > >> DJ05: I have a general question about how you propose to combine hashes. Is > >> using a sum enough? I was looking at the Guava implementation and they also > >> apply a "final mix" to ensure an avalanche effect. It would be great to > >> elaborate a bit more on this in the KIP. > >> > >> DJ06: After the upgrade, we need to ensure that a tombstone is written for > >> the ConsumerGroupPartitionMetadata record if it was present. I suppose that > >> this is something that we could do when the next metadata hash is computed. > >> Have you thought about it? > >> > >> DJ07: I wonder whether we should gate the change with `group.version`. My > >> concern is that during upgrade, not all the coordinators will support the > >> new way and groups may bounce between those coordinators as their > >> underlying __consumer_offsets partitions are moved within the cluster > >> (during the roll). This may trigger unnecessary rebalances. One way to > >> avoid this is to gate the change with `group.version` which is only bumped > >> at the end of the cluster upgrade process. What do you think? > >> > >> DJ08: Regarding the test plan, we should also extend > >> ConsumerGroupHeartbeatRequestTest to cover the new feature. > >> > >> Thanks for your hard work on this one. Let's try to get it in 4.1. > >> > >> Best, > >> David > >> > >> On Mon, Mar 24, 2025 at 8:17 PM Jeff Kim <jeff....@confluent.io.invalid> > >> wrote: > >> > >>> Hi PoAn, > >>> > >>> Thanks for the clarification! > >>> > >>> Best, > >>> Jeff > >>> > >>> On Mon, Mar 24, 2025 at 11:43 AM PoAn Yang <yangp...@gmail.com> wrote: > >>> > >>>> Hi Jeff, > >>>> > >>>> Thanks for the review. > >>>> > >>>> JK01. Yes, we already maintain `groupByTopics` data structure in the > >>>> coordinator. If a group leaves, > >>>> it checks whether a topic is subscribed by any group. If it’s not, remove > >>>> topic hash from the cache. > >>>> Please check the sample implementation about > >>>> GroupMetadataManager#unsubscribeGroupFromTopic > >>>> in #17444 <https://github.com/apache/kafka/pull/17444>. > >>>> > >>>> JK02: This KIP replaces subscription metadata by a group hash, so the > >>>> coordinator calculates group > >>>> hash where we calculated subscription metadata. The trigger points are > >>>> like a group member is updated, > >>>> , new group member, new subscribed topic names, new regular expressions, > >>>> and new metadata (topic change). > >>>> > >>>> Best Regards, > >>>> PoAn > >>>> > >>>>> On Mar 22, 2025, at 3:25 AM, Jeff Kim <jeff....@confluent.io.INVALID> > >>>> wrote: > >>>>> > >>>>> Hi PoAn, > >>>>> > >>>>> I was late to the discussion but I had some questions. > >>>>> > >>>>> JK01. I foresee scenarios where the cache leaves stale topic hashes > >>>> around > >>>>> when a group leaves. > >>>>> Should we maintain a counter for the number of groups subscribed to the > >>>>> topic (hash) to only keep active topics? > >>>>> > >>>>> JK02. Can you help me understand when we would actually be computing > >>>>> group-level hashes, is it on every > >>>>> heartbeat? > >>>>> > >>>>> Thanks, > >>>>> Jeff > >>>>> > >>>>> On Wed, Mar 12, 2025 at 11:33 PM PoAn Yang <yangp...@gmail.com> wrote: > >>>>> > >>>>>> Hi David, > >>>>>> > >>>>>> Yes, I will continue working on this. > >>>>>> > >>>>>> DJ02: As Kirk noted earlier, since the topic hash calculation is on > >>>> server > >>>>>> side (not client-side), we can safely introduce Guava as a dependency. > >>>>>> > >>>>>> If there is no other discussion, we can start voting in > >>>>>> https://lists.apache.org/thread/j90htmphjk783p697jjfg1xt8thmy33p. We > >>>> can > >>>>>> discuss implementation details in PR > >>>>>> https://github.com/apache/kafka/pull/17444. I will resolve merge > >>>>>> conflicts by March 14th. > >>>>>> > >>>>>> Thanks, > >>>>>> PoAn > >>>>>> > >>>>>>> On Mar 13, 2025, at 12:43 AM, David Jacot <david.ja...@gmail.com> > >>>> wrote: > >>>>>>> > >>>>>>> Hi PoAn, > >>>>>>> > >>>>>>> As we are getting closer to releasing 4.0, I think that we can resume > >>>>>>> working on this one if you are still interested. Are you? I would > >>> like > >>>>>>> to have it in 4.1. > >>>>>>> > >>>>>>> Best, > >>>>>>> David > >>>>>>> > >>>>>>> On Wed, Jan 15, 2025 at 1:26 AM Kirk True <k...@kirktrue.pro> wrote: > >>>>>>>> > >>>>>>>> Hi all, > >>>>>>>> > >>>>>>>> Hopefully a quick question... > >>>>>>>> > >>>>>>>> KT01. Will clients calculate the topic hash on the client? Based on > >>>> the > >>>>>> current state of the KIP and PR, I would have thought "no", but I ask > >>>> based > >>>>>> on the discussion around the possible use of Guava on client. > >>>>>>>> > >>>>>>>> Thanks, > >>>>>>>> Kirk > >>>>>>>> > >>>>>>>> On Mon, Jan 6, 2025, at 9:11 AM, David Jacot wrote: > >>>>>>>>> Hi PoAn, > >>>>>>>>> > >>>>>>>>> Thanks for the update. I haven't read the updated KIP yet. > >>>>>>>>> > >>>>>>>>> DJ02: I am not sure about using Guava as a dependency. I mentioned > >>> it > >>>>>> more > >>>>>>>>> as an inspiration/reference. I suppose that we could use it on the > >>>>>> server > >>>>>>>>> but we should definitely not use it on the client. I am not sure > >>> how > >>>>>> others > >>>>>>>>> feel about it. > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> David > >>>>>>>>> > >>>>>>>>> On Mon, Jan 6, 2025 at 5:21 AM PoAn Yang <yangp...@gmail.com> > >>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi Chia-Ping / David / Lucas, > >>>>>>>>>> > >>>>>>>>>> Happy new year and thanks for the review. > >>>>>>>>>> > >>>>>>>>>> DJ02: Thanks for the suggestion. I updated the PR to use Guava. > >>>>>>>>>> > >>>>>>>>>> DJ03: Yes, I updated the description to mention ISR change, > >>>>>>>>>> add altering partition reassignment case, and mention that > >>>>>>>>>> non-related topic change doesn’t trigger a rebalance. > >>>>>>>>>> DJ03.1: Yes, I will keep using ModernGroup#requestMetadataRefresh > >>>>>>>>>> to notify group. > >>>>>>>>>> > >>>>>>>>>> DJ06: Updated the PR to use Guava Hashing#combineUnordered > >>>>>>>>>> function to combine topic hash. > >>>>>>>>>> > >>>>>>>>>> DJ07: Renamed it to MetadataHash. > >>>>>>>>>> > >>>>>>>>>> DJ08: Added a sample hash function to the KIP and use first byte > >>> as > >>>>>> magic > >>>>>>>>>> byte. This is also included in latest PR. > >>>>>>>>>> > >>>>>>>>>> DJ09: Added two paragraphs about upgraded and downgraded. > >>>>>>>>>> > >>>>>>>>>> DJ10: According to Lucas’s comment, I add > >>> StreamsGroupMetadataValue > >>>>>> update > >>>>>>>>>> to this KIP. > >>>>>>>>>> > >>>>>>>>>> Thanks, > >>>>>>>>>> PoAn > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>>> On Dec 20, 2024, at 3:58 PM, Chia-Ping Tsai <chia7...@gmail.com> > >>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> because assignors are sticky. > >>>>>>>>>>> > >>>>>>>>>>> I forgot about that spec again :( > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> David Jacot <david.ja...@gmail.com> 於 2024年12月20日 週五 下午3:41寫道: > >>>>>>>>>>> > >>>>>>>>>>>> Hi Chia-Ping, > >>>>>>>>>>>> > >>>>>>>>>>>> DJ08: In my opinion, changing the format will be rare so it is > >>>>>>>>>>>> acceptable if rebalances are triggered in this case on > >>>>>>>>>>>> upgrade/downgrade. It is also what will happen if a cluster is > >>>>>>>>>>>> downgraded from 4.1 (with this KIP) to 4.0. The rebalance won't > >>>>>> change > >>>>>>>>>>>> anything if the topology of the group is the same because > >>>> assignors > >>>>>>>>>>>> are sticky. The default ones are and we recommend custom ones to > >>>>>> also > >>>>>>>>>>>> be. > >>>>>>>>>>>> > >>>>>>>>>>>> Best, > >>>>>>>>>>>> David > >>>>>>>>>>>> > >>>>>>>>>>>> On Fri, Dec 20, 2024 at 2:11 AM Chia-Ping Tsai < > >>>> chia7...@apache.org > >>>>>>> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>> ummm, it does not work for downgrade as the old coordinator has > >>>> no > >>>>>> idea > >>>>>>>>>>>> about new format :( > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> On 2024/12/20 00:57:27 Chia-Ping Tsai wrote: > >>>>>>>>>>>>>> hi David > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> DJ08: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> That's a good question. If the "hash" lacks version control, > >>> it > >>>>>> could > >>>>>>>>>>>> trigger a series of unnecessary rebalances. However, adding > >>>>>> additional > >>>>>>>>>>>> information ("magic") to the hash does not help the upgraded > >>>>>> coordinator > >>>>>>>>>>>> determine the "version." This means that the upgraded > >>> coordinator > >>>>>> would > >>>>>>>>>>>> still trigger unnecessary rebalances because it has no way to > >>> know > >>>>>> which > >>>>>>>>>>>> format to use when comparing the hash. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Perhaps we can add a new field to ConsumerGroupMetadataValue > >>> to > >>>>>>>>>>>> indicate the version of the "hash." This would allow the > >>>>>> coordinator, > >>>>>>>>>> when > >>>>>>>>>>>> handling subscription metadata, to compute the old hash and > >>>>>> determine > >>>>>>>>>>>> whether an epoch bump is necessary. Additionally, the > >>> coordinator > >>>>>> can > >>>>>>>>>>>> generate a new record to upgrade the hash without requiring an > >>>> epoch > >>>>>>>>>> bump. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Another issue is whether the coordinator should cache all > >>>>>> versions of > >>>>>>>>>>>> the hash. I believe this is necessary; otherwise, during an > >>>> upgrade, > >>>>>>>>>> there > >>>>>>>>>>>> would be extensive recomputing of old hashes. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I believe this idea should also work for downgrades, and > >>> that's > >>>>>> just > >>>>>>>>>>>> my two cents. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>> Chia-Ping > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On 2024/12/19 14:39:41 David Jacot wrote: > >>>>>>>>>>>>>>> Hi PoAn and Chia-Ping, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks for your responses. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> DJ02: Sorry, I was not clear. I was wondering whether we > >>> could > >>>>>>>>>>>> compute the > >>>>>>>>>>>>>>> hash without having to convert to bytes before. Guava has a > >>>> nice > >>>>>>>>>>>> interface > >>>>>>>>>>>>>>> for this allowing to incrementally add primitive types to the > >>>>>> hash. > >>>>>>>>>>>> We can > >>>>>>>>>>>>>>> discuss this in the PR as it is an implementation detail. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> DJ03: Thanks. I don't think that the replicas are updated > >>> when > >>>> a > >>>>>>>>>>>> broker > >>>>>>>>>>>>>>> shuts down. What you said applies to the ISR. I suppose that > >>> we > >>>>>> can > >>>>>>>>>>>> rely on > >>>>>>>>>>>>>>> the ISR changes to trigger updates. It is also worth noting > >>>>>>>>>>>>>>> that TopicsDelta#changedTopics is updated for every change > >>>> (e.g. > >>>>>> ISR > >>>>>>>>>>>>>>> change, leader change, replicas change, etc.). I suppose that > >>>> it > >>>>>> is > >>>>>>>>>>>> OK but > >>>>>>>>>>>>>>> it seems that it will trigger refreshes which are not > >>>> necessary. > >>>>>>>>>>>> However, a > >>>>>>>>>>>>>>> rebalance won't be triggered because the hash won't change. > >>>>>>>>>>>>>>> DJ03.1: I suppose that we will continue to rely on > >>>>>>>>>>>>>>> ModernGroup#requestMetadataRefresh to notify groups that must > >>>>>>>>>>>> refresh their > >>>>>>>>>>>>>>> hashes. Is my understanding correct? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> DJ05: Fair enough. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> DJ06: You mention in two places that you would like to > >>> combine > >>>>>>>>>>>> hashes by > >>>>>>>>>>>>>>> additioning them. I wonder if this is a good practice. > >>>>>> Intuitively, > >>>>>>>>>>>> I would > >>>>>>>>>>>>>>> have used XOR or hashed the hashed. Guava has a method for > >>>>>> combining > >>>>>>>>>>>>>>> hashes. It may be worth looking into the algorithm used. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> DJ07: I would rename "AllTopicHash" to "MetadataHash" in > >>> order > >>>>>> to be > >>>>>>>>>>>> more > >>>>>>>>>>>>>>> generic. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> DJ08: Regarding the per topic hash, I wonder whether we > >>> should > >>>>>>>>>>>> precise in > >>>>>>>>>>>>>>> the KIP how we will compute it. I had the following in mind: > >>>>>>>>>>>>>>> hash(topicName; numPartitions; [partitionId;sorted racks]). > >>> We > >>>>>> could > >>>>>>>>>>>> also > >>>>>>>>>>>>>>> add a magic byte at the first element as a sort of version. I > >>>> am > >>>>>> not > >>>>>>>>>>>> sure > >>>>>>>>>>>>>>> whether it is needed though. I was thinking about this while > >>>>>>>>>>>> imagining how > >>>>>>>>>>>>>>> we would handle changing the format in the future. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> DJ09: It would be great if we could provide more details > >>> about > >>>>>>>>>>>> backward > >>>>>>>>>>>>>>> compatibility. What happens when the cluster is upgraded or > >>>>>>>>>>>> downgraded? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> DJ10: We should update KIP-1071. It may be worth pigging them > >>>> in > >>>>>> the > >>>>>>>>>>>>>>> discussion thread of KIP-1071. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>> David > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Tue, Dec 17, 2024 at 9:25 AM PoAn Yang < > >>> yangp...@gmail.com> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hi Chia-Ping / David / Andrew, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Thanks for the review and suggestions. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> DJ01: Removed all implementation details. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> DJ02: Does the “incrementally” mean that we only calculate > >>> the > >>>>>>>>>>>> difference > >>>>>>>>>>>>>>>> parts? > >>>>>>>>>>>>>>>> For example, if the number of partition change, we only > >>>>>> calculate > >>>>>>>>>>>> the hash > >>>>>>>>>>>>>>>> of number of partition and reconstruct it to the topic hash. > >>>>>>>>>>>>>>>> IMO, we only calculate topic hash one time. With cache > >>>>>> mechanism, > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>> value can be reused in different groups on a same broker. > >>>>>>>>>>>>>>>> The CPU usage for this part is not very high. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> DJ03: Added the update path to KIP for both cases. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> DJ04: Yes, it’s a good idea. With cache mechanism and single > >>>>>> hash > >>>>>>>>>>>> per > >>>>>>>>>>>>>>>> group, we can balance cpu and disk usage. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> DJ05: Currently, the topic hash is only used in coordinator. > >>>>>>>>>>>> However, the > >>>>>>>>>>>>>>>> metadata image is used in many different places. > >>>>>>>>>>>>>>>> How about we move the hash to metadata image when we find > >>> more > >>>>>> use > >>>>>>>>>>>> cases? > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> AS1, AS2: Thanks for the reminder. I will simply delete > >>>>>>>>>>>>>>>> ShareGroupPartitionMetadataKey/Value and add a new field to > >>>>>>>>>>>>>>>> ShareGroupMetadataValue. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>> PoAn > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Dec 17, 2024, at 5:50 AM, Andrew Schofield < > >>>>>>>>>>>>>>>> andrew_schofield_j...@outlook.com> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Hi PoAn, > >>>>>>>>>>>>>>>>> Thanks for the KIP. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> AS1: From the point of view of share groups, the API and > >>>> record > >>>>>>>>>>>> schema > >>>>>>>>>>>>>>>>> definitions are unstable in AK 4.0. In AK 4.1, we will > >>> start > >>>>>>>>>>>> supporting > >>>>>>>>>>>>>>>> proper > >>>>>>>>>>>>>>>>> versioning. As a result, I think you do not need to > >>> deprecate > >>>>>>>>>>>> the fields > >>>>>>>>>>>>>>>> in the > >>>>>>>>>>>>>>>>> ShareGroupPartitionMetadataValue. Just include the schema > >>> for > >>>>>>>>>>>> the fields > >>>>>>>>>>>>>>>>> which are actually needed, and I'll update the schema in > >>> the > >>>>>>>>>>>> code when > >>>>>>>>>>>>>>>>> the KIP is implemented. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> AS2: In the event that DJ04 actually removes the need for > >>>>>>>>>>>>>>>>> ConsumerGroupPartitionMetadataKey/Value entirely, I would > >>>>>> simply > >>>>>>>>>>>>>>>>> delete ShareGroupPartitionMetadataKey/Value, assuming that > >>> it > >>>>>> is > >>>>>>>>>>>>>>>>> accepted in time for AK 4.1. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>> Andrew > >>>>>>>>>>>>>>>>> ________________________________________ > >>>>>>>>>>>>>>>>> From: Chia-Ping Tsai <chia7...@apache.org> > >>>>>>>>>>>>>>>>> Sent: 16 December 2024 16:27 > >>>>>>>>>>>>>>>>> To: dev@kafka.apache.org <dev@kafka.apache.org> > >>>>>>>>>>>>>>>>> Subject: Re: [DISCUSS] KIP-1101: Trigger rebalance on rack > >>>>>>>>>>>> topology > >>>>>>>>>>>>>>>> changes > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> hi David > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> DJ05 > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> One of the benefits of having a single hash per group > >>> (DJ04) > >>>> is > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>> reduction in the size of stored data. Additionally, the cost > >>>> of > >>>>>>>>>>>>>>>> re-computing can be minimized thanks to caching. So I'm + 1 > >>> to > >>>>>>>>>>>> DJ04. > >>>>>>>>>>>>>>>> However, the advantage of storing the topic cache in the > >>>>>> metadata > >>>>>>>>>>>> image is > >>>>>>>>>>>>>>>> somewhat unclear to me. Could you please provide more > >>> details > >>>> on > >>>>>>>>>>>> what you > >>>>>>>>>>>>>>>> mean by "tight"?Furthermore, since the metadata image is a > >>>>>>>>>>>> thread-safe > >>>>>>>>>>>>>>>> object, we need to ensure that the lazy initialization is > >>> also > >>>>>>>>>>>> thread-safe. > >>>>>>>>>>>>>>>> If no other components require the cache, it would be better > >>>> to > >>>>>>>>>>>> keep the > >>>>>>>>>>>>>>>> caches within the coordinator. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>> Chia-Ping > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On 2024/12/16 14:01:35 David Jacot wrote: > >>>>>>>>>>>>>>>>>> Hi PoAn, > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Thanks for the KIP. I have some comments about it. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> DJ01: Please, remove all the code from the KIP. We only > >>> care > >>>>>>>>>>>> about > >>>>>>>>>>>>>>>> public > >>>>>>>>>>>>>>>>>> interface changes, not about implementation details. > >>>>>>>>>>>>>>>>>> DJ02: Regarding the hash computation, I agree that we > >>> should > >>>>>> use > >>>>>>>>>>>>>>>> Murmur3. > >>>>>>>>>>>>>>>>>> However, I don't quite like the implementation that you > >>>>>> shared. > >>>>>>>>>>>> I > >>>>>>>>>>>>>>>> wonder if > >>>>>>>>>>>>>>>>>> we could make it work incrementally instead of computing a > >>>>>> hash > >>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>> everything and combining them. > >>>>>>>>>>>>>>>>>> DJ03: Regarding the cache, my understanding is that the > >>>> cache > >>>>>> is > >>>>>>>>>>>>>>>> populated > >>>>>>>>>>>>>>>>>> when a topic without hash is seen in a HB request and the > >>>>>> cache > >>>>>>>>>>>> is > >>>>>>>>>>>>>>>> cleaned > >>>>>>>>>>>>>>>>>> up when topics are deleted based on the metadata image. > >>>>>>>>>>>> However, the > >>>>>>>>>>>>>>>> update > >>>>>>>>>>>>>>>>>> path is not clear. Let's say that a partition is added to > >>> a > >>>>>>>>>>>> topic, how > >>>>>>>>>>>>>>>> does > >>>>>>>>>>>>>>>>>> it detect it? Let's also imagine that the racks of a > >>>> partition > >>>>>>>>>>>> have > >>>>>>>>>>>>>>>>>> changed, how does it detect it? In the KIP, it would be > >>> nice > >>>>>> to > >>>>>>>>>>>> be clear > >>>>>>>>>>>>>>>>>> about those. > >>>>>>>>>>>>>>>>>> DJ04: I wonder whether we should go with a single hash per > >>>>>>>>>>>> group. Your > >>>>>>>>>>>>>>>>>> argument against it is that it would require to re-compute > >>>> the > >>>>>>>>>>>> hash of > >>>>>>>>>>>>>>>> all > >>>>>>>>>>>>>>>>>> the topics when it needs to be computed. In my opinion, we > >>>>>> could > >>>>>>>>>>>>>>>> leverage > >>>>>>>>>>>>>>>>>> the cached hash per topic to compute the hash of all the > >>>>>>>>>>>> subscribed > >>>>>>>>>>>>>>>> ones. > >>>>>>>>>>>>>>>>>> We could basically combine all the hashes without having > >>> to > >>>>>>>>>>>> compute all > >>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>> them. This approach has a few benefits. 1) We could get > >>> rid > >>>> of > >>>>>>>>>>>>>>>>>> the ConsumerGroupPartitionMetadata record as we could > >>> store > >>>>>> the > >>>>>>>>>>>> hash > >>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>> the group epoch. 2) We could get rid of the Map that we > >>> keep > >>>>>> in > >>>>>>>>>>>> each > >>>>>>>>>>>>>>>> group > >>>>>>>>>>>>>>>>>> to store the hashed corresponding to the subscribed > >>> topics. > >>>>>>>>>>>>>>>>>> DJ05: Regarding the cache again, I wonder if we should > >>>>>> actually > >>>>>>>>>>>> store > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> hash in the metadata image instead of maintaining it > >>>> somewhere > >>>>>>>>>>>> else. We > >>>>>>>>>>>>>>>>>> could still lazily compute it. The benefit is that the > >>> value > >>>>>>>>>>>> would be > >>>>>>>>>>>>>>>> tight > >>>>>>>>>>>>>>>>>> to the topic. I have not really looked into it. Would it > >>> be > >>>> an > >>>>>>>>>>>> option? > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> I'll be away for two weeks starting from Saturday. I > >>> kindly > >>>>>> ask > >>>>>>>>>>>> you to > >>>>>>>>>>>>>>>> wait > >>>>>>>>>>>>>>>>>> on me if we cannot conclude this week. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>> David > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On Tue, Nov 5, 2024 at 1:43 PM Frank Yang < > >>>> yangp...@gmail.com > >>>>>>> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Hi Chia-Ping, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Thanks for the review and suggestions. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Q0: Add how rack change and how it affects topic > >>> partition. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Q1: Add why we need a balance algorithm to Motivation > >>>>>> section. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Q2: After checking again, we don’t need to update cache > >>>> when > >>>>>>>>>>>> we replay > >>>>>>>>>>>>>>>>>>> records. We only need to renew it in consumer heartbeat. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Q3: Add a new section “Topic Hash Function”. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Thanks. > >>>>>>>>>>>>>>>>>>> PoAn > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> On Nov 1, 2024, at 4:39 PM, Chia-Ping Tsai < > >>>>>>>>>>>> chia7...@gmail.com> > >>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> hi PoAn > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Thanks for for this KIP! > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Q0: Could you add more details about `A topic partition > >>>> has > >>>>>>>>>>>> rack > >>>>>>>>>>>>>>>> change`? > >>>>>>>>>>>>>>>>>>>> IIRC, the "rack change" includes both follower and > >>> leader, > >>>>>>>>>>>> right? > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Q1: Could you please add the 'concerns' we discussed to > >>>> the > >>>>>>>>>>>> Motivation > >>>>>>>>>>>>>>>>>>>> section? This should include topics like 'computations' > >>>> and > >>>>>>>>>>>> 'space > >>>>>>>>>>>>>>>>>>> usage'. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Q2: `The group coordinator can leverage it to add a new > >>>>>> topic > >>>>>>>>>>>>>>>> hash.`This > >>>>>>>>>>>>>>>>>>>> description seems a bit off to me. Why do we need to > >>>> update > >>>>>>>>>>>> the cache > >>>>>>>>>>>>>>>> at > >>>>>>>>>>>>>>>>>>>> this phase? The cache is intended to prevent duplicate > >>>>>>>>>>>> computations > >>>>>>>>>>>>>>>>>>> caused > >>>>>>>>>>>>>>>>>>>> by heartbeat requests that occur between two metadata > >>>> change > >>>>>>>>>>>> events. > >>>>>>>>>>>>>>>>>>>> Therefore, we could even remove the changed topics from > >>>>>>>>>>>> caches on a > >>>>>>>>>>>>>>>>>>>> metadata change, as the first heartbeat request would > >>>> update > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>> caches > >>>>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>> all changed topics. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Q3: Could you please include a section about the choice > >>> of > >>>>>>>>>>>> hash > >>>>>>>>>>>>>>>>>>>> implementation? The hash implementation must be > >>> consistent > >>>>>>>>>>>> across > >>>>>>>>>>>>>>>>>>> different > >>>>>>>>>>>>>>>>>>>> JDKs, so we use Murmur3 to generate the hash value. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>>>> Chia-Ping > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Frank Yang <yangp...@gmail.com> 於 2024年11月1日 週五 > >>> 下午3:57寫道: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> I would like to start a discussion thread on KIP-1101. > >>>>>>>>>>>> Trigger > >>>>>>>>>>>>>>>> rebalance > >>>>>>>>>>>>>>>>>>>>> on rack topology changes. In this KIP, we aim to use > >>> less > >>>>>>>>>>>> memory / > >>>>>>>>>>>>>>>> disk > >>>>>>>>>>>>>>>>>>>>> resources to detect rack changes in the new > >>> coordinator. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>> > >>>> > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1101%3A+Trigger+rebalance+on+rack+topology+changes > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Please take a look and feel free to share any thoughts. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Thanks. > >>>>>>>>>>>>>>>>>>>>> PoAn > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>> > >>>>>> > >>>> > >>>> > >>> >