Hi Lucas, Thank you. Here is vote thread https://lists.apache.org/thread/j90htmphjk783p697jjfg1xt8thmy33p.
Regards, PoAn > On Apr 16, 2025, at 12:07 AM, Lucas Brutschy <lbruts...@confluent.io.INVALID> > wrote: > > Hi PoAn, > > from my side, we are good to move to a vote on this one. > > Cheers, > Lucas > > On Thu, Apr 10, 2025 at 11:29 AM PoAn Yang <yangp...@gmail.com > <mailto:yangp...@gmail.com>> wrote: >> >> Hi David and Lucas, >> >> Thanks for the review and great suggestion. >> >> DJ01: Add the statement about rack-aware rebalance was removed in 4.0. >> >> DJ02: Change ConsumerGroupPartitionMetadataKey and >> ConsumerGroupPartitionMetadataValue from removed to deprecated. >> >> DJ03: In my sample implementation >> <https://github.com/apache/kafka/pull/17444>, the cache is maintained in >> GroupCoordinatorManager. >> A GroupCoordinatorShard only have one GroupCoordinatorManager, so I think >> it’s similar to >> a cache per GroupCoordinatorShard instance. >> >> DJ04: Add the order about how topic data will be computed to get a topic >> hash. >> >> DJ05: This is a good question. In topic hash, the Guava uses “final mix” to >> ensure avalanche >> effect. However for the single hash of subscribed topics in a group, we >> cannot use simple >> sum, because we need to avoid the edge case. For example, hash(a+b) and >> hash(b+a) is >> same. To fix this case, the hash function sorts topics by name and the topic >> hash multiplied >> by the position value like hash(a + 2b) and hash(b + 2a). I add this case >> and regular expression case to the KIP. >> >> DJ06: It’s good to add tombstone of ConsumerGroupPartitionMetadata, so log >> compaction >> can remove old data. Add this part to compatibility section. >> >> DJ07: In previous discussion, you mentioned that assignors are sticky. If >> the topology of the >> group is the same, the upgrade/downgrade rebalance doesn’t change anything. >> >>>>>>>>>>>> DJ08: In my opinion, changing the format will be rare so it is >>>>>>>>>>>> acceptable if rebalances are triggered in this case on >>>>>>>>>>>> upgrade/downgrade. It is also what will happen if a cluster is >>>>>>>>>>>> downgraded from 4.1 (with this KIP) to 4.0. The rebalance won't >>>>>> change >>>>>>>>>>>> anything if the topology of the group is the same because >>>> assignors >>>>>>>>>>>> are sticky. The default ones are and we recommend custom ones to >>>>>> also >>>>>>>>>>>> be. >> >> With `group.version` change, we need to maintain two code in 4.1. One is >> subscription >> metadata without rack and another is group hash. Another drawback is that >> changing >> `group.version` will trigger all group rebalance at the same time. IMHO, I >> would prefer >> to keep the code simple. >> >> DJ08: Add ConsumerGroupHeartbeatRequestTest to test plan. >> >> LB01: Add the removal of StreamGroupPartitionMetadataKey / >> StreamGrouprtitionMetadataValue to public interface change section. >> >> Thank you. >> >> Regards, >> PoAn >> >>> On Apr 9, 2025, at 5:11 PM, Lucas Brutschy <lbruts...@confluent.io.INVALID> >>> wrote: >>> >>> Hi PoAn, >>> >>> LB01: please add the removal of StreamsGroupPartitionMetadataKey / >>> StreamsGroupPartitionMetadataValue to the KIP. Since these were not >>> yet included in 4.0, we can just remove them as long as this KIP hits >>> 4.1. I can help with the integration into the `streamsGroupHeartbeat`, >>> as it will be a little different, but we do not need to call this out >>> in the KIP. >>> >>> Thanks for your work on this! >>> >>> Cheers, >>> Lucas >>> >>> On Wed, Apr 9, 2025 at 9:59 AM David Jacot <dja...@confluent.io.invalid> >>> wrote: >>>> >>>> Hi PoAn, >>>> >>>> I finally had a bit of time to review your KIP. First, let me say that the >>>> KIP is well written! I have some more comments: >>>> >>>> DJ01: In the motivation, could you please mention that we actually removed >>>> triggering rebalances based on rack changes in 4.0? You explained very well >>>> why we removed it but it is not clear that we removed it. >>>> >>>> DJ02: We cannot delete ConsumerGroupPartitionMetadataKey >>>> and ConsumerGroupPartitionMetadataValue because the coordinator must still >>>> be able to read the record from the log when the cluster is upgraded. >>>> However, we can deprecate them. >>>> >>>> DJ03: Could you clarify how the cache will be scoped? My understanding is >>>> that we will maintain a cache per GroupCoordinatorShard instance. Is this >>>> correct? >>>> >>>> DJ04: Regarding the topic hash function, would it be possible to specify >>>> how the hash will be computed instead of providing the signature of the >>>> method? >>>> >>>> DJ05: I have a general question about how you propose to combine hashes. Is >>>> using a sum enough? I was looking at the Guava implementation and they also >>>> apply a "final mix" to ensure an avalanche effect. It would be great to >>>> elaborate a bit more on this in the KIP. >>>> >>>> DJ06: After the upgrade, we need to ensure that a tombstone is written for >>>> the ConsumerGroupPartitionMetadata record if it was present. I suppose that >>>> this is something that we could do when the next metadata hash is computed. >>>> Have you thought about it? >>>> >>>> DJ07: I wonder whether we should gate the change with `group.version`. My >>>> concern is that during upgrade, not all the coordinators will support the >>>> new way and groups may bounce between those coordinators as their >>>> underlying __consumer_offsets partitions are moved within the cluster >>>> (during the roll). This may trigger unnecessary rebalances. One way to >>>> avoid this is to gate the change with `group.version` which is only bumped >>>> at the end of the cluster upgrade process. What do you think? >>>> >>>> DJ08: Regarding the test plan, we should also extend >>>> ConsumerGroupHeartbeatRequestTest to cover the new feature. >>>> >>>> Thanks for your hard work on this one. Let's try to get it in 4.1. >>>> >>>> Best, >>>> David >>>> >>>> On Mon, Mar 24, 2025 at 8:17 PM Jeff Kim <jeff....@confluent.io.invalid> >>>> wrote: >>>> >>>>> Hi PoAn, >>>>> >>>>> Thanks for the clarification! >>>>> >>>>> Best, >>>>> Jeff >>>>> >>>>> On Mon, Mar 24, 2025 at 11:43 AM PoAn Yang <yangp...@gmail.com> wrote: >>>>> >>>>>> Hi Jeff, >>>>>> >>>>>> Thanks for the review. >>>>>> >>>>>> JK01. Yes, we already maintain `groupByTopics` data structure in the >>>>>> coordinator. If a group leaves, >>>>>> it checks whether a topic is subscribed by any group. If it’s not, remove >>>>>> topic hash from the cache. >>>>>> Please check the sample implementation about >>>>>> GroupMetadataManager#unsubscribeGroupFromTopic >>>>>> in #17444 <https://github.com/apache/kafka/pull/17444>. >>>>>> >>>>>> JK02: This KIP replaces subscription metadata by a group hash, so the >>>>>> coordinator calculates group >>>>>> hash where we calculated subscription metadata. The trigger points are >>>>>> like a group member is updated, >>>>>> , new group member, new subscribed topic names, new regular expressions, >>>>>> and new metadata (topic change). >>>>>> >>>>>> Best Regards, >>>>>> PoAn >>>>>> >>>>>>> On Mar 22, 2025, at 3:25 AM, Jeff Kim <jeff....@confluent.io.INVALID> >>>>>> wrote: >>>>>>> >>>>>>> Hi PoAn, >>>>>>> >>>>>>> I was late to the discussion but I had some questions. >>>>>>> >>>>>>> JK01. I foresee scenarios where the cache leaves stale topic hashes >>>>>> around >>>>>>> when a group leaves. >>>>>>> Should we maintain a counter for the number of groups subscribed to the >>>>>>> topic (hash) to only keep active topics? >>>>>>> >>>>>>> JK02. Can you help me understand when we would actually be computing >>>>>>> group-level hashes, is it on every >>>>>>> heartbeat? >>>>>>> >>>>>>> Thanks, >>>>>>> Jeff >>>>>>> >>>>>>> On Wed, Mar 12, 2025 at 11:33 PM PoAn Yang <yangp...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi David, >>>>>>>> >>>>>>>> Yes, I will continue working on this. >>>>>>>> >>>>>>>> DJ02: As Kirk noted earlier, since the topic hash calculation is on >>>>>> server >>>>>>>> side (not client-side), we can safely introduce Guava as a dependency. >>>>>>>> >>>>>>>> If there is no other discussion, we can start voting in >>>>>>>> https://lists.apache.org/thread/j90htmphjk783p697jjfg1xt8thmy33p. We >>>>>> can >>>>>>>> discuss implementation details in PR >>>>>>>> https://github.com/apache/kafka/pull/17444. I will resolve merge >>>>>>>> conflicts by March 14th. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> PoAn >>>>>>>> >>>>>>>>> On Mar 13, 2025, at 12:43 AM, David Jacot <david.ja...@gmail.com> >>>>>> wrote: >>>>>>>>> >>>>>>>>> Hi PoAn, >>>>>>>>> >>>>>>>>> As we are getting closer to releasing 4.0, I think that we can resume >>>>>>>>> working on this one if you are still interested. Are you? I would >>>>> like >>>>>>>>> to have it in 4.1. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> David >>>>>>>>> >>>>>>>>> On Wed, Jan 15, 2025 at 1:26 AM Kirk True <k...@kirktrue.pro> wrote: >>>>>>>>>> >>>>>>>>>> Hi all, >>>>>>>>>> >>>>>>>>>> Hopefully a quick question... >>>>>>>>>> >>>>>>>>>> KT01. Will clients calculate the topic hash on the client? Based on >>>>>> the >>>>>>>> current state of the KIP and PR, I would have thought "no", but I ask >>>>>> based >>>>>>>> on the discussion around the possible use of Guava on client. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Kirk >>>>>>>>>> >>>>>>>>>> On Mon, Jan 6, 2025, at 9:11 AM, David Jacot wrote: >>>>>>>>>>> Hi PoAn, >>>>>>>>>>> >>>>>>>>>>> Thanks for the update. I haven't read the updated KIP yet. >>>>>>>>>>> >>>>>>>>>>> DJ02: I am not sure about using Guava as a dependency. I mentioned >>>>> it >>>>>>>> more >>>>>>>>>>> as an inspiration/reference. I suppose that we could use it on the >>>>>>>> server >>>>>>>>>>> but we should definitely not use it on the client. I am not sure >>>>> how >>>>>>>> others >>>>>>>>>>> feel about it. >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> David >>>>>>>>>>> >>>>>>>>>>> On Mon, Jan 6, 2025 at 5:21 AM PoAn Yang <yangp...@gmail.com> >>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Chia-Ping / David / Lucas, >>>>>>>>>>>> >>>>>>>>>>>> Happy new year and thanks for the review. >>>>>>>>>>>> >>>>>>>>>>>> DJ02: Thanks for the suggestion. I updated the PR to use Guava. >>>>>>>>>>>> >>>>>>>>>>>> DJ03: Yes, I updated the description to mention ISR change, >>>>>>>>>>>> add altering partition reassignment case, and mention that >>>>>>>>>>>> non-related topic change doesn’t trigger a rebalance. >>>>>>>>>>>> DJ03.1: Yes, I will keep using ModernGroup#requestMetadataRefresh >>>>>>>>>>>> to notify group. >>>>>>>>>>>> >>>>>>>>>>>> DJ06: Updated the PR to use Guava Hashing#combineUnordered >>>>>>>>>>>> function to combine topic hash. >>>>>>>>>>>> >>>>>>>>>>>> DJ07: Renamed it to MetadataHash. >>>>>>>>>>>> >>>>>>>>>>>> DJ08: Added a sample hash function to the KIP and use first byte >>>>> as >>>>>>>> magic >>>>>>>>>>>> byte. This is also included in latest PR. >>>>>>>>>>>> >>>>>>>>>>>> DJ09: Added two paragraphs about upgraded and downgraded. >>>>>>>>>>>> >>>>>>>>>>>> DJ10: According to Lucas’s comment, I add >>>>> StreamsGroupMetadataValue >>>>>>>> update >>>>>>>>>>>> to this KIP. >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> PoAn >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> On Dec 20, 2024, at 3:58 PM, Chia-Ping Tsai <chia7...@gmail.com> >>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> because assignors are sticky. >>>>>>>>>>>>> >>>>>>>>>>>>> I forgot about that spec again :( >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> David Jacot <david.ja...@gmail.com> 於 2024年12月20日 週五 下午3:41寫道: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Chia-Ping, >>>>>>>>>>>>>> >>>>>>>>>>>>>> DJ08: In my opinion, changing the format will be rare so it is >>>>>>>>>>>>>> acceptable if rebalances are triggered in this case on >>>>>>>>>>>>>> upgrade/downgrade. It is also what will happen if a cluster is >>>>>>>>>>>>>> downgraded from 4.1 (with this KIP) to 4.0. The rebalance won't >>>>>>>> change >>>>>>>>>>>>>> anything if the topology of the group is the same because >>>>>> assignors >>>>>>>>>>>>>> are sticky. The default ones are and we recommend custom ones to >>>>>>>> also >>>>>>>>>>>>>> be. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Best, >>>>>>>>>>>>>> David >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, Dec 20, 2024 at 2:11 AM Chia-Ping Tsai < >>>>>> chia7...@apache.org >>>>>>>>> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> ummm, it does not work for downgrade as the old coordinator has >>>>>> no >>>>>>>> idea >>>>>>>>>>>>>> about new format :( >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On 2024/12/20 00:57:27 Chia-Ping Tsai wrote: >>>>>>>>>>>>>>>> hi David >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> DJ08: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> That's a good question. If the "hash" lacks version control, >>>>> it >>>>>>>> could >>>>>>>>>>>>>> trigger a series of unnecessary rebalances. However, adding >>>>>>>> additional >>>>>>>>>>>>>> information ("magic") to the hash does not help the upgraded >>>>>>>> coordinator >>>>>>>>>>>>>> determine the "version." This means that the upgraded >>>>> coordinator >>>>>>>> would >>>>>>>>>>>>>> still trigger unnecessary rebalances because it has no way to >>>>> know >>>>>>>> which >>>>>>>>>>>>>> format to use when comparing the hash. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Perhaps we can add a new field to ConsumerGroupMetadataValue >>>>> to >>>>>>>>>>>>>> indicate the version of the "hash." This would allow the >>>>>>>> coordinator, >>>>>>>>>>>> when >>>>>>>>>>>>>> handling subscription metadata, to compute the old hash and >>>>>>>> determine >>>>>>>>>>>>>> whether an epoch bump is necessary. Additionally, the >>>>> coordinator >>>>>>>> can >>>>>>>>>>>>>> generate a new record to upgrade the hash without requiring an >>>>>> epoch >>>>>>>>>>>> bump. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Another issue is whether the coordinator should cache all >>>>>>>> versions of >>>>>>>>>>>>>> the hash. I believe this is necessary; otherwise, during an >>>>>> upgrade, >>>>>>>>>>>> there >>>>>>>>>>>>>> would be extensive recomputing of old hashes. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I believe this idea should also work for downgrades, and >>>>> that's >>>>>>>> just >>>>>>>>>>>>>> my two cents. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>> Chia-Ping >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On 2024/12/19 14:39:41 David Jacot wrote: >>>>>>>>>>>>>>>>> Hi PoAn and Chia-Ping, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks for your responses. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> DJ02: Sorry, I was not clear. I was wondering whether we >>>>> could >>>>>>>>>>>>>> compute the >>>>>>>>>>>>>>>>> hash without having to convert to bytes before. Guava has a >>>>>> nice >>>>>>>>>>>>>> interface >>>>>>>>>>>>>>>>> for this allowing to incrementally add primitive types to the >>>>>>>> hash. >>>>>>>>>>>>>> We can >>>>>>>>>>>>>>>>> discuss this in the PR as it is an implementation detail. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> DJ03: Thanks. I don't think that the replicas are updated >>>>> when >>>>>> a >>>>>>>>>>>>>> broker >>>>>>>>>>>>>>>>> shuts down. What you said applies to the ISR. I suppose that >>>>> we >>>>>>>> can >>>>>>>>>>>>>> rely on >>>>>>>>>>>>>>>>> the ISR changes to trigger updates. It is also worth noting >>>>>>>>>>>>>>>>> that TopicsDelta#changedTopics is updated for every change >>>>>> (e.g. >>>>>>>> ISR >>>>>>>>>>>>>>>>> change, leader change, replicas change, etc.). I suppose that >>>>>> it >>>>>>>> is >>>>>>>>>>>>>> OK but >>>>>>>>>>>>>>>>> it seems that it will trigger refreshes which are not >>>>>> necessary. >>>>>>>>>>>>>> However, a >>>>>>>>>>>>>>>>> rebalance won't be triggered because the hash won't change. >>>>>>>>>>>>>>>>> DJ03.1: I suppose that we will continue to rely on >>>>>>>>>>>>>>>>> ModernGroup#requestMetadataRefresh to notify groups that must >>>>>>>>>>>>>> refresh their >>>>>>>>>>>>>>>>> hashes. Is my understanding correct? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> DJ05: Fair enough. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> DJ06: You mention in two places that you would like to >>>>> combine >>>>>>>>>>>>>> hashes by >>>>>>>>>>>>>>>>> additioning them. I wonder if this is a good practice. >>>>>>>> Intuitively, >>>>>>>>>>>>>> I would >>>>>>>>>>>>>>>>> have used XOR or hashed the hashed. Guava has a method for >>>>>>>> combining >>>>>>>>>>>>>>>>> hashes. It may be worth looking into the algorithm used. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> DJ07: I would rename "AllTopicHash" to "MetadataHash" in >>>>> order >>>>>>>> to be >>>>>>>>>>>>>> more >>>>>>>>>>>>>>>>> generic. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> DJ08: Regarding the per topic hash, I wonder whether we >>>>> should >>>>>>>>>>>>>> precise in >>>>>>>>>>>>>>>>> the KIP how we will compute it. I had the following in mind: >>>>>>>>>>>>>>>>> hash(topicName; numPartitions; [partitionId;sorted racks]). >>>>> We >>>>>>>> could >>>>>>>>>>>>>> also >>>>>>>>>>>>>>>>> add a magic byte at the first element as a sort of version. I >>>>>> am >>>>>>>> not >>>>>>>>>>>>>> sure >>>>>>>>>>>>>>>>> whether it is needed though. I was thinking about this while >>>>>>>>>>>>>> imagining how >>>>>>>>>>>>>>>>> we would handle changing the format in the future. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> DJ09: It would be great if we could provide more details >>>>> about >>>>>>>>>>>>>> backward >>>>>>>>>>>>>>>>> compatibility. What happens when the cluster is upgraded or >>>>>>>>>>>>>> downgraded? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> DJ10: We should update KIP-1071. It may be worth pigging them >>>>>> in >>>>>>>> the >>>>>>>>>>>>>>>>> discussion thread of KIP-1071. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>> David >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Tue, Dec 17, 2024 at 9:25 AM PoAn Yang < >>>>> yangp...@gmail.com> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi Chia-Ping / David / Andrew, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks for the review and suggestions. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> DJ01: Removed all implementation details. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> DJ02: Does the “incrementally” mean that we only calculate >>>>> the >>>>>>>>>>>>>> difference >>>>>>>>>>>>>>>>>> parts? >>>>>>>>>>>>>>>>>> For example, if the number of partition change, we only >>>>>>>> calculate >>>>>>>>>>>>>> the hash >>>>>>>>>>>>>>>>>> of number of partition and reconstruct it to the topic hash. >>>>>>>>>>>>>>>>>> IMO, we only calculate topic hash one time. With cache >>>>>>>> mechanism, >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> value can be reused in different groups on a same broker. >>>>>>>>>>>>>>>>>> The CPU usage for this part is not very high. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> DJ03: Added the update path to KIP for both cases. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> DJ04: Yes, it’s a good idea. With cache mechanism and single >>>>>>>> hash >>>>>>>>>>>>>> per >>>>>>>>>>>>>>>>>> group, we can balance cpu and disk usage. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> DJ05: Currently, the topic hash is only used in coordinator. >>>>>>>>>>>>>> However, the >>>>>>>>>>>>>>>>>> metadata image is used in many different places. >>>>>>>>>>>>>>>>>> How about we move the hash to metadata image when we find >>>>> more >>>>>>>> use >>>>>>>>>>>>>> cases? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> AS1, AS2: Thanks for the reminder. I will simply delete >>>>>>>>>>>>>>>>>> ShareGroupPartitionMetadataKey/Value and add a new field to >>>>>>>>>>>>>>>>>> ShareGroupMetadataValue. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>> PoAn >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Dec 17, 2024, at 5:50 AM, Andrew Schofield < >>>>>>>>>>>>>>>>>> andrew_schofield_j...@outlook.com> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hi PoAn, >>>>>>>>>>>>>>>>>>> Thanks for the KIP. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> AS1: From the point of view of share groups, the API and >>>>>> record >>>>>>>>>>>>>> schema >>>>>>>>>>>>>>>>>>> definitions are unstable in AK 4.0. In AK 4.1, we will >>>>> start >>>>>>>>>>>>>> supporting >>>>>>>>>>>>>>>>>> proper >>>>>>>>>>>>>>>>>>> versioning. As a result, I think you do not need to >>>>> deprecate >>>>>>>>>>>>>> the fields >>>>>>>>>>>>>>>>>> in the >>>>>>>>>>>>>>>>>>> ShareGroupPartitionMetadataValue. Just include the schema >>>>> for >>>>>>>>>>>>>> the fields >>>>>>>>>>>>>>>>>>> which are actually needed, and I'll update the schema in >>>>> the >>>>>>>>>>>>>> code when >>>>>>>>>>>>>>>>>>> the KIP is implemented. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> AS2: In the event that DJ04 actually removes the need for >>>>>>>>>>>>>>>>>>> ConsumerGroupPartitionMetadataKey/Value entirely, I would >>>>>>>> simply >>>>>>>>>>>>>>>>>>> delete ShareGroupPartitionMetadataKey/Value, assuming that >>>>> it >>>>>>>> is >>>>>>>>>>>>>>>>>>> accepted in time for AK 4.1. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>> Andrew >>>>>>>>>>>>>>>>>>> ________________________________________ >>>>>>>>>>>>>>>>>>> From: Chia-Ping Tsai <chia7...@apache.org> >>>>>>>>>>>>>>>>>>> Sent: 16 December 2024 16:27 >>>>>>>>>>>>>>>>>>> To: dev@kafka.apache.org <dev@kafka.apache.org> >>>>>>>>>>>>>>>>>>> Subject: Re: [DISCUSS] KIP-1101: Trigger rebalance on rack >>>>>>>>>>>>>> topology >>>>>>>>>>>>>>>>>> changes >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> hi David >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> DJ05 >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> One of the benefits of having a single hash per group >>>>> (DJ04) >>>>>> is >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> reduction in the size of stored data. Additionally, the cost >>>>>> of >>>>>>>>>>>>>>>>>> re-computing can be minimized thanks to caching. So I'm + 1 >>>>> to >>>>>>>>>>>>>> DJ04. >>>>>>>>>>>>>>>>>> However, the advantage of storing the topic cache in the >>>>>>>> metadata >>>>>>>>>>>>>> image is >>>>>>>>>>>>>>>>>> somewhat unclear to me. Could you please provide more >>>>> details >>>>>> on >>>>>>>>>>>>>> what you >>>>>>>>>>>>>>>>>> mean by "tight"?Furthermore, since the metadata image is a >>>>>>>>>>>>>> thread-safe >>>>>>>>>>>>>>>>>> object, we need to ensure that the lazy initialization is >>>>> also >>>>>>>>>>>>>> thread-safe. >>>>>>>>>>>>>>>>>> If no other components require the cache, it would be better >>>>>> to >>>>>>>>>>>>>> keep the >>>>>>>>>>>>>>>>>> caches within the coordinator. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>> Chia-Ping >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On 2024/12/16 14:01:35 David Jacot wrote: >>>>>>>>>>>>>>>>>>>> Hi PoAn, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thanks for the KIP. I have some comments about it. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> DJ01: Please, remove all the code from the KIP. We only >>>>> care >>>>>>>>>>>>>> about >>>>>>>>>>>>>>>>>> public >>>>>>>>>>>>>>>>>>>> interface changes, not about implementation details. >>>>>>>>>>>>>>>>>>>> DJ02: Regarding the hash computation, I agree that we >>>>> should >>>>>>>> use >>>>>>>>>>>>>>>>>> Murmur3. >>>>>>>>>>>>>>>>>>>> However, I don't quite like the implementation that you >>>>>>>> shared. >>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>>> wonder if >>>>>>>>>>>>>>>>>>>> we could make it work incrementally instead of computing a >>>>>>>> hash >>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>> everything and combining them. >>>>>>>>>>>>>>>>>>>> DJ03: Regarding the cache, my understanding is that the >>>>>> cache >>>>>>>> is >>>>>>>>>>>>>>>>>> populated >>>>>>>>>>>>>>>>>>>> when a topic without hash is seen in a HB request and the >>>>>>>> cache >>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>> cleaned >>>>>>>>>>>>>>>>>>>> up when topics are deleted based on the metadata image. >>>>>>>>>>>>>> However, the >>>>>>>>>>>>>>>>>> update >>>>>>>>>>>>>>>>>>>> path is not clear. Let's say that a partition is added to >>>>> a >>>>>>>>>>>>>> topic, how >>>>>>>>>>>>>>>>>> does >>>>>>>>>>>>>>>>>>>> it detect it? Let's also imagine that the racks of a >>>>>> partition >>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>>>>> changed, how does it detect it? In the KIP, it would be >>>>> nice >>>>>>>> to >>>>>>>>>>>>>> be clear >>>>>>>>>>>>>>>>>>>> about those. >>>>>>>>>>>>>>>>>>>> DJ04: I wonder whether we should go with a single hash per >>>>>>>>>>>>>> group. Your >>>>>>>>>>>>>>>>>>>> argument against it is that it would require to re-compute >>>>>> the >>>>>>>>>>>>>> hash of >>>>>>>>>>>>>>>>>> all >>>>>>>>>>>>>>>>>>>> the topics when it needs to be computed. In my opinion, we >>>>>>>> could >>>>>>>>>>>>>>>>>> leverage >>>>>>>>>>>>>>>>>>>> the cached hash per topic to compute the hash of all the >>>>>>>>>>>>>> subscribed >>>>>>>>>>>>>>>>>> ones. >>>>>>>>>>>>>>>>>>>> We could basically combine all the hashes without having >>>>> to >>>>>>>>>>>>>> compute all >>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>> them. This approach has a few benefits. 1) We could get >>>>> rid >>>>>> of >>>>>>>>>>>>>>>>>>>> the ConsumerGroupPartitionMetadata record as we could >>>>> store >>>>>>>> the >>>>>>>>>>>>>> hash >>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>> the group epoch. 2) We could get rid of the Map that we >>>>> keep >>>>>>>> in >>>>>>>>>>>>>> each >>>>>>>>>>>>>>>>>> group >>>>>>>>>>>>>>>>>>>> to store the hashed corresponding to the subscribed >>>>> topics. >>>>>>>>>>>>>>>>>>>> DJ05: Regarding the cache again, I wonder if we should >>>>>>>> actually >>>>>>>>>>>>>> store >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> hash in the metadata image instead of maintaining it >>>>>> somewhere >>>>>>>>>>>>>> else. We >>>>>>>>>>>>>>>>>>>> could still lazily compute it. The benefit is that the >>>>> value >>>>>>>>>>>>>> would be >>>>>>>>>>>>>>>>>> tight >>>>>>>>>>>>>>>>>>>> to the topic. I have not really looked into it. Would it >>>>> be >>>>>> an >>>>>>>>>>>>>> option? >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I'll be away for two weeks starting from Saturday. I >>>>> kindly >>>>>>>> ask >>>>>>>>>>>>>> you to >>>>>>>>>>>>>>>>>> wait >>>>>>>>>>>>>>>>>>>> on me if we cannot conclude this week. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>> David >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Tue, Nov 5, 2024 at 1:43 PM Frank Yang < >>>>>> yangp...@gmail.com >>>>>>>>> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Hi Chia-Ping, >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Thanks for the review and suggestions. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Q0: Add how rack change and how it affects topic >>>>> partition. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Q1: Add why we need a balance algorithm to Motivation >>>>>>>> section. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Q2: After checking again, we don’t need to update cache >>>>>> when >>>>>>>>>>>>>> we replay >>>>>>>>>>>>>>>>>>>>> records. We only need to renew it in consumer heartbeat. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Q3: Add a new section “Topic Hash Function”. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Thanks. >>>>>>>>>>>>>>>>>>>>> PoAn >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Nov 1, 2024, at 4:39 PM, Chia-Ping Tsai < >>>>>>>>>>>>>> chia7...@gmail.com> >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> hi PoAn >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Thanks for for this KIP! >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Q0: Could you add more details about `A topic partition >>>>>> has >>>>>>>>>>>>>> rack >>>>>>>>>>>>>>>>>> change`? >>>>>>>>>>>>>>>>>>>>>> IIRC, the "rack change" includes both follower and >>>>> leader, >>>>>>>>>>>>>> right? >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Q1: Could you please add the 'concerns' we discussed to >>>>>> the >>>>>>>>>>>>>> Motivation >>>>>>>>>>>>>>>>>>>>>> section? This should include topics like 'computations' >>>>>> and >>>>>>>>>>>>>> 'space >>>>>>>>>>>>>>>>>>>>> usage'. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Q2: `The group coordinator can leverage it to add a new >>>>>>>> topic >>>>>>>>>>>>>>>>>> hash.`This >>>>>>>>>>>>>>>>>>>>>> description seems a bit off to me. Why do we need to >>>>>> update >>>>>>>>>>>>>> the cache >>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>>>>> this phase? The cache is intended to prevent duplicate >>>>>>>>>>>>>> computations >>>>>>>>>>>>>>>>>>>>> caused >>>>>>>>>>>>>>>>>>>>>> by heartbeat requests that occur between two metadata >>>>>> change >>>>>>>>>>>>>> events. >>>>>>>>>>>>>>>>>>>>>> Therefore, we could even remove the changed topics from >>>>>>>>>>>>>> caches on a >>>>>>>>>>>>>>>>>>>>>> metadata change, as the first heartbeat request would >>>>>> update >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> caches >>>>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>> all changed topics. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Q3: Could you please include a section about the choice >>>>> of >>>>>>>>>>>>>> hash >>>>>>>>>>>>>>>>>>>>>> implementation? The hash implementation must be >>>>> consistent >>>>>>>>>>>>>> across >>>>>>>>>>>>>>>>>>>>> different >>>>>>>>>>>>>>>>>>>>>> JDKs, so we use Murmur3 to generate the hash value. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>> Chia-Ping >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Frank Yang <yangp...@gmail.com> 於 2024年11月1日 週五 >>>>> 下午3:57寫道: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> I would like to start a discussion thread on KIP-1101. >>>>>>>>>>>>>> Trigger >>>>>>>>>>>>>>>>>> rebalance >>>>>>>>>>>>>>>>>>>>>>> on rack topology changes. In this KIP, we aim to use >>>>> less >>>>>>>>>>>>>> memory / >>>>>>>>>>>>>>>>>> disk >>>>>>>>>>>>>>>>>>>>>>> resources to detect rack changes in the new >>>>> coordinator. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>> >>>>>> >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1101%3A+Trigger+rebalance+on+rack+topology+changes >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Please take a look and feel free to share any thoughts. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Thanks. >>>>>>>>>>>>>>>>>>>>>>> PoAn