Hi Lucas,

Thank you. Here is vote thread 
https://lists.apache.org/thread/j90htmphjk783p697jjfg1xt8thmy33p.

Regards,
PoAn

> On Apr 16, 2025, at 12:07 AM, Lucas Brutschy <lbruts...@confluent.io.INVALID> 
> wrote:
> 
> Hi PoAn,
> 
> from my side, we are good to move to a vote on this one.
> 
> Cheers,
> Lucas
> 
> On Thu, Apr 10, 2025 at 11:29 AM PoAn Yang <yangp...@gmail.com 
> <mailto:yangp...@gmail.com>> wrote:
>> 
>> Hi David and Lucas,
>> 
>> Thanks for the review and great suggestion.
>> 
>> DJ01: Add the statement about rack-aware rebalance was removed in 4.0.
>> 
>> DJ02: Change ConsumerGroupPartitionMetadataKey and
>> ConsumerGroupPartitionMetadataValue from removed to deprecated.
>> 
>> DJ03: In my sample implementation 
>> <https://github.com/apache/kafka/pull/17444>, the cache is maintained in 
>> GroupCoordinatorManager.
>> A GroupCoordinatorShard only have one GroupCoordinatorManager, so I think 
>> it’s similar to
>> a cache per GroupCoordinatorShard instance.
>> 
>> DJ04: Add the order about how topic data will be computed to get a topic 
>> hash.
>> 
>> DJ05: This is a good question. In topic hash, the Guava uses “final mix” to 
>> ensure avalanche
>> effect. However for the single hash of subscribed topics in a group, we 
>> cannot use simple
>> sum, because we need to avoid the edge case. For example, hash(a+b) and 
>> hash(b+a) is
>> same. To fix this case, the hash function sorts topics by name and the topic 
>> hash multiplied
>> by the position value like hash(a + 2b) and hash(b + 2a). I add this case 
>> and regular expression case to the KIP.
>> 
>> DJ06: It’s good to add tombstone of ConsumerGroupPartitionMetadata, so log 
>> compaction
>> can remove old data. Add this part to compatibility section.
>> 
>> DJ07: In previous discussion, you mentioned that assignors are sticky. If 
>> the topology of the
>> group is the same, the upgrade/downgrade rebalance doesn’t change anything.
>> 
>>>>>>>>>>>> DJ08: In my opinion, changing the format will be rare so it is
>>>>>>>>>>>> acceptable if rebalances are triggered in this case on
>>>>>>>>>>>> upgrade/downgrade. It is also what will happen if a cluster is
>>>>>>>>>>>> downgraded from 4.1 (with this KIP) to 4.0. The rebalance won't
>>>>>> change
>>>>>>>>>>>> anything if the topology of the group is the same because
>>>> assignors
>>>>>>>>>>>> are sticky. The default ones are and we recommend custom ones to
>>>>>> also
>>>>>>>>>>>> be.
>> 
>> With `group.version` change, we need to maintain two code in 4.1. One is 
>> subscription
>> metadata without rack and another is group hash. Another drawback is that 
>> changing
>> `group.version` will trigger all group rebalance at the same time. IMHO, I 
>> would prefer
>> to keep the code simple.
>> 
>> DJ08: Add ConsumerGroupHeartbeatRequestTest to test plan.
>> 
>> LB01: Add the removal of StreamGroupPartitionMetadataKey /
>> StreamGrouprtitionMetadataValue to public interface change section.
>> 
>> Thank you.
>> 
>> Regards,
>> PoAn
>> 
>>> On Apr 9, 2025, at 5:11 PM, Lucas Brutschy <lbruts...@confluent.io.INVALID> 
>>> wrote:
>>> 
>>> Hi PoAn,
>>> 
>>> LB01: please add the removal of StreamsGroupPartitionMetadataKey /
>>> StreamsGroupPartitionMetadataValue to the KIP. Since these were not
>>> yet included in 4.0, we can just remove them as long as this KIP hits
>>> 4.1. I can help with the integration into the `streamsGroupHeartbeat`,
>>> as it will be a little different, but we do not need to call this out
>>> in the KIP.
>>> 
>>> Thanks for your work on this!
>>> 
>>> Cheers,
>>> Lucas
>>> 
>>> On Wed, Apr 9, 2025 at 9:59 AM David Jacot <dja...@confluent.io.invalid> 
>>> wrote:
>>>> 
>>>> Hi PoAn,
>>>> 
>>>> I finally had a bit of time to review your KIP. First, let me say that the
>>>> KIP is well written! I have some more comments:
>>>> 
>>>> DJ01: In the motivation, could you please mention that we actually removed
>>>> triggering rebalances based on rack changes in 4.0? You explained very well
>>>> why we removed it but it is not clear that we removed it.
>>>> 
>>>> DJ02: We cannot delete ConsumerGroupPartitionMetadataKey
>>>> and ConsumerGroupPartitionMetadataValue because the coordinator must still
>>>> be able to read the record from the log when the cluster is upgraded.
>>>> However, we can deprecate them.
>>>> 
>>>> DJ03: Could you clarify how the cache will be scoped? My understanding is
>>>> that we will maintain a cache per GroupCoordinatorShard instance. Is this
>>>> correct?
>>>> 
>>>> DJ04: Regarding the topic hash function, would it be possible to specify
>>>> how the hash will be computed instead of providing the signature of the
>>>> method?
>>>> 
>>>> DJ05: I have a general question about how you propose to combine hashes. Is
>>>> using a sum enough? I was looking at the Guava implementation and they also
>>>> apply a "final mix" to ensure an avalanche effect. It would be great to
>>>> elaborate a bit more on this in the KIP.
>>>> 
>>>> DJ06: After the upgrade, we need to ensure that a tombstone is written for
>>>> the ConsumerGroupPartitionMetadata record if it was present. I suppose that
>>>> this is something that we could do when the next metadata hash is computed.
>>>> Have you thought about it?
>>>> 
>>>> DJ07: I wonder whether we should gate the change with `group.version`. My
>>>> concern is that during upgrade, not all the coordinators will support the
>>>> new way and groups may bounce between those coordinators as their
>>>> underlying __consumer_offsets partitions are moved within the cluster
>>>> (during the roll). This may trigger unnecessary rebalances. One way to
>>>> avoid this is to gate the change with `group.version` which is only bumped
>>>> at the end of the cluster upgrade process. What do you think?
>>>> 
>>>> DJ08: Regarding the test plan, we should also extend
>>>> ConsumerGroupHeartbeatRequestTest to cover the new feature.
>>>> 
>>>> Thanks for your hard work on this one. Let's try to get it in 4.1.
>>>> 
>>>> Best,
>>>> David
>>>> 
>>>> On Mon, Mar 24, 2025 at 8:17 PM Jeff Kim <jeff....@confluent.io.invalid>
>>>> wrote:
>>>> 
>>>>> Hi PoAn,
>>>>> 
>>>>> Thanks for the clarification!
>>>>> 
>>>>> Best,
>>>>> Jeff
>>>>> 
>>>>> On Mon, Mar 24, 2025 at 11:43 AM PoAn Yang <yangp...@gmail.com> wrote:
>>>>> 
>>>>>> Hi Jeff,
>>>>>> 
>>>>>> Thanks for the review.
>>>>>> 
>>>>>> JK01. Yes, we already maintain `groupByTopics` data structure in the
>>>>>> coordinator. If a group leaves,
>>>>>> it checks whether a topic is subscribed by any group. If it’s not, remove
>>>>>> topic hash from the cache.
>>>>>> Please check the sample implementation about
>>>>>> GroupMetadataManager#unsubscribeGroupFromTopic
>>>>>> in #17444 <https://github.com/apache/kafka/pull/17444>.
>>>>>> 
>>>>>> JK02: This KIP replaces subscription metadata by a group hash, so the
>>>>>> coordinator calculates group
>>>>>> hash where we calculated subscription metadata. The trigger points are
>>>>>> like a group member is updated,
>>>>>> , new group member, new subscribed topic names, new regular expressions,
>>>>>> and new metadata (topic change).
>>>>>> 
>>>>>> Best Regards,
>>>>>> PoAn
>>>>>> 
>>>>>>> On Mar 22, 2025, at 3:25 AM, Jeff Kim <jeff....@confluent.io.INVALID>
>>>>>> wrote:
>>>>>>> 
>>>>>>> Hi PoAn,
>>>>>>> 
>>>>>>> I was late to the discussion but I had some questions.
>>>>>>> 
>>>>>>> JK01. I foresee scenarios where the cache leaves stale topic hashes
>>>>>> around
>>>>>>> when a group leaves.
>>>>>>> Should we maintain a counter for the number of groups subscribed to the
>>>>>>> topic (hash) to only keep active topics?
>>>>>>> 
>>>>>>> JK02. Can you help me understand when we would actually be computing
>>>>>>> group-level hashes, is it on every
>>>>>>> heartbeat?
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> Jeff
>>>>>>> 
>>>>>>> On Wed, Mar 12, 2025 at 11:33 PM PoAn Yang <yangp...@gmail.com> wrote:
>>>>>>> 
>>>>>>>> Hi David,
>>>>>>>> 
>>>>>>>> Yes, I will continue working on this.
>>>>>>>> 
>>>>>>>> DJ02: As Kirk noted earlier, since the topic hash calculation is on
>>>>>> server
>>>>>>>> side (not client-side), we can safely introduce Guava as a dependency.
>>>>>>>> 
>>>>>>>> If there is no other discussion, we can start voting in
>>>>>>>> https://lists.apache.org/thread/j90htmphjk783p697jjfg1xt8thmy33p. We
>>>>>> can
>>>>>>>> discuss implementation details in PR
>>>>>>>> https://github.com/apache/kafka/pull/17444. I will resolve merge
>>>>>>>> conflicts by March 14th.
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> PoAn
>>>>>>>> 
>>>>>>>>> On Mar 13, 2025, at 12:43 AM, David Jacot <david.ja...@gmail.com>
>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>> Hi PoAn,
>>>>>>>>> 
>>>>>>>>> As we are getting closer to releasing 4.0, I think that we can resume
>>>>>>>>> working on this one if you are still interested. Are you? I would
>>>>> like
>>>>>>>>> to have it in 4.1.
>>>>>>>>> 
>>>>>>>>> Best,
>>>>>>>>> David
>>>>>>>>> 
>>>>>>>>> On Wed, Jan 15, 2025 at 1:26 AM Kirk True <k...@kirktrue.pro> wrote:
>>>>>>>>>> 
>>>>>>>>>> Hi all,
>>>>>>>>>> 
>>>>>>>>>> Hopefully a quick question...
>>>>>>>>>> 
>>>>>>>>>> KT01. Will clients calculate the topic hash on the client? Based on
>>>>>> the
>>>>>>>> current state of the KIP and PR, I would have thought "no", but I ask
>>>>>> based
>>>>>>>> on the discussion around the possible use of Guava on client.
>>>>>>>>>> 
>>>>>>>>>> Thanks,
>>>>>>>>>> Kirk
>>>>>>>>>> 
>>>>>>>>>> On Mon, Jan 6, 2025, at 9:11 AM, David Jacot wrote:
>>>>>>>>>>> Hi PoAn,
>>>>>>>>>>> 
>>>>>>>>>>> Thanks for the update. I haven't read the updated KIP yet.
>>>>>>>>>>> 
>>>>>>>>>>> DJ02: I am not sure about using Guava as a dependency. I mentioned
>>>>> it
>>>>>>>> more
>>>>>>>>>>> as an inspiration/reference. I suppose that we could use it on the
>>>>>>>> server
>>>>>>>>>>> but we should definitely not use it on the client. I am not sure
>>>>> how
>>>>>>>> others
>>>>>>>>>>> feel about it.
>>>>>>>>>>> 
>>>>>>>>>>> Best,
>>>>>>>>>>> David
>>>>>>>>>>> 
>>>>>>>>>>> On Mon, Jan 6, 2025 at 5:21 AM PoAn Yang <yangp...@gmail.com>
>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> Hi Chia-Ping / David / Lucas,
>>>>>>>>>>>> 
>>>>>>>>>>>> Happy new year and thanks for the review.
>>>>>>>>>>>> 
>>>>>>>>>>>> DJ02: Thanks for the suggestion. I updated the PR to use Guava.
>>>>>>>>>>>> 
>>>>>>>>>>>> DJ03: Yes, I updated the description to mention ISR change,
>>>>>>>>>>>> add altering partition reassignment case, and mention that
>>>>>>>>>>>> non-related topic change doesn’t trigger a rebalance.
>>>>>>>>>>>> DJ03.1: Yes, I will keep using ModernGroup#requestMetadataRefresh
>>>>>>>>>>>> to notify group.
>>>>>>>>>>>> 
>>>>>>>>>>>> DJ06: Updated the PR to use Guava Hashing#combineUnordered
>>>>>>>>>>>> function to combine topic hash.
>>>>>>>>>>>> 
>>>>>>>>>>>> DJ07: Renamed it to MetadataHash.
>>>>>>>>>>>> 
>>>>>>>>>>>> DJ08: Added a sample hash function to the KIP and use first byte
>>>>> as
>>>>>>>> magic
>>>>>>>>>>>> byte. This is also included in latest PR.
>>>>>>>>>>>> 
>>>>>>>>>>>> DJ09: Added two paragraphs about upgraded and downgraded.
>>>>>>>>>>>> 
>>>>>>>>>>>> DJ10: According to Lucas’s comment, I add
>>>>> StreamsGroupMetadataValue
>>>>>>>> update
>>>>>>>>>>>> to this KIP.
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> PoAn
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>>> On Dec 20, 2024, at 3:58 PM, Chia-Ping Tsai <chia7...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> because assignors are sticky.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I forgot about that spec again :(
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> David Jacot <david.ja...@gmail.com> 於 2024年12月20日 週五 下午3:41寫道:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Hi Chia-Ping,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> DJ08: In my opinion, changing the format will be rare so it is
>>>>>>>>>>>>>> acceptable if rebalances are triggered in this case on
>>>>>>>>>>>>>> upgrade/downgrade. It is also what will happen if a cluster is
>>>>>>>>>>>>>> downgraded from 4.1 (with this KIP) to 4.0. The rebalance won't
>>>>>>>> change
>>>>>>>>>>>>>> anything if the topology of the group is the same because
>>>>>> assignors
>>>>>>>>>>>>>> are sticky. The default ones are and we recommend custom ones to
>>>>>>>> also
>>>>>>>>>>>>>> be.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>> David
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Fri, Dec 20, 2024 at 2:11 AM Chia-Ping Tsai <
>>>>>> chia7...@apache.org
>>>>>>>>> 
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> ummm, it does not work for downgrade as the old coordinator has
>>>>>> no
>>>>>>>> idea
>>>>>>>>>>>>>> about new format :(
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On 2024/12/20 00:57:27 Chia-Ping Tsai wrote:
>>>>>>>>>>>>>>>> hi David
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> DJ08:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> That's a good question. If the "hash" lacks version control,
>>>>> it
>>>>>>>> could
>>>>>>>>>>>>>> trigger a series of unnecessary rebalances. However, adding
>>>>>>>> additional
>>>>>>>>>>>>>> information ("magic") to the hash does not help the upgraded
>>>>>>>> coordinator
>>>>>>>>>>>>>> determine the "version." This means that the upgraded
>>>>> coordinator
>>>>>>>> would
>>>>>>>>>>>>>> still trigger unnecessary rebalances because it has no way to
>>>>> know
>>>>>>>> which
>>>>>>>>>>>>>> format to use when comparing the hash.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Perhaps we can add a new field to ConsumerGroupMetadataValue
>>>>> to
>>>>>>>>>>>>>> indicate the version of the "hash." This would allow the
>>>>>>>> coordinator,
>>>>>>>>>>>> when
>>>>>>>>>>>>>> handling subscription metadata, to compute the old hash and
>>>>>>>> determine
>>>>>>>>>>>>>> whether an epoch bump is necessary. Additionally, the
>>>>> coordinator
>>>>>>>> can
>>>>>>>>>>>>>> generate a new record to upgrade the hash without requiring an
>>>>>> epoch
>>>>>>>>>>>> bump.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Another issue is whether the coordinator should cache all
>>>>>>>> versions of
>>>>>>>>>>>>>> the hash. I believe this is necessary; otherwise, during an
>>>>>> upgrade,
>>>>>>>>>>>> there
>>>>>>>>>>>>>> would be extensive recomputing of old hashes.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> I believe this idea should also work for downgrades, and
>>>>> that's
>>>>>>>> just
>>>>>>>>>>>>>> my two cents.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>> Chia-Ping
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> On 2024/12/19 14:39:41 David Jacot wrote:
>>>>>>>>>>>>>>>>> Hi PoAn and Chia-Ping,
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Thanks for your responses.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> DJ02: Sorry, I was not clear. I was wondering whether we
>>>>> could
>>>>>>>>>>>>>> compute the
>>>>>>>>>>>>>>>>> hash without having to convert to bytes before. Guava has a
>>>>>> nice
>>>>>>>>>>>>>> interface
>>>>>>>>>>>>>>>>> for this allowing to incrementally add primitive types to the
>>>>>>>> hash.
>>>>>>>>>>>>>> We can
>>>>>>>>>>>>>>>>> discuss this in the PR as it is an implementation detail.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> DJ03: Thanks. I don't think that the replicas are updated
>>>>> when
>>>>>> a
>>>>>>>>>>>>>> broker
>>>>>>>>>>>>>>>>> shuts down. What you said applies to the ISR. I suppose that
>>>>> we
>>>>>>>> can
>>>>>>>>>>>>>> rely on
>>>>>>>>>>>>>>>>> the ISR changes to trigger updates. It is also worth noting
>>>>>>>>>>>>>>>>> that TopicsDelta#changedTopics is updated for every change
>>>>>> (e.g.
>>>>>>>> ISR
>>>>>>>>>>>>>>>>> change, leader change, replicas change, etc.). I suppose that
>>>>>> it
>>>>>>>> is
>>>>>>>>>>>>>> OK but
>>>>>>>>>>>>>>>>> it seems that it will trigger refreshes which are not
>>>>>> necessary.
>>>>>>>>>>>>>> However, a
>>>>>>>>>>>>>>>>> rebalance won't be triggered because the hash won't change.
>>>>>>>>>>>>>>>>> DJ03.1: I suppose that we will continue to rely on
>>>>>>>>>>>>>>>>> ModernGroup#requestMetadataRefresh to notify groups that must
>>>>>>>>>>>>>> refresh their
>>>>>>>>>>>>>>>>> hashes. Is my understanding correct?
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> DJ05: Fair enough.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> DJ06: You mention in two places that you would like to
>>>>> combine
>>>>>>>>>>>>>> hashes by
>>>>>>>>>>>>>>>>> additioning them. I wonder if this is a good practice.
>>>>>>>> Intuitively,
>>>>>>>>>>>>>> I would
>>>>>>>>>>>>>>>>> have used XOR or hashed the hashed. Guava has a method for
>>>>>>>> combining
>>>>>>>>>>>>>>>>> hashes. It may be worth looking into the algorithm used.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> DJ07: I would rename "AllTopicHash" to "MetadataHash" in
>>>>> order
>>>>>>>> to be
>>>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>> generic.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> DJ08: Regarding the per topic hash, I wonder whether we
>>>>> should
>>>>>>>>>>>>>> precise in
>>>>>>>>>>>>>>>>> the KIP how we will compute it. I had the following in mind:
>>>>>>>>>>>>>>>>> hash(topicName; numPartitions; [partitionId;sorted racks]).
>>>>> We
>>>>>>>> could
>>>>>>>>>>>>>> also
>>>>>>>>>>>>>>>>> add a magic byte at the first element as a sort of version. I
>>>>>> am
>>>>>>>> not
>>>>>>>>>>>>>> sure
>>>>>>>>>>>>>>>>> whether it is needed though. I was thinking about this while
>>>>>>>>>>>>>> imagining how
>>>>>>>>>>>>>>>>> we would handle changing the format in the future.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> DJ09: It would be great if we could provide more details
>>>>> about
>>>>>>>>>>>>>> backward
>>>>>>>>>>>>>>>>> compatibility. What happens when the cluster is upgraded or
>>>>>>>>>>>>>> downgraded?
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> DJ10: We should update KIP-1071. It may be worth pigging them
>>>>>> in
>>>>>>>> the
>>>>>>>>>>>>>>>>> discussion thread of KIP-1071.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>> David
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> On Tue, Dec 17, 2024 at 9:25 AM PoAn Yang <
>>>>> yangp...@gmail.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Hi Chia-Ping / David / Andrew,
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Thanks for the review and suggestions.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> DJ01: Removed all implementation details.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> DJ02: Does the “incrementally” mean that we only calculate
>>>>> the
>>>>>>>>>>>>>> difference
>>>>>>>>>>>>>>>>>> parts?
>>>>>>>>>>>>>>>>>> For example, if the number of partition change, we only
>>>>>>>> calculate
>>>>>>>>>>>>>> the hash
>>>>>>>>>>>>>>>>>> of number of partition and reconstruct it to the topic hash.
>>>>>>>>>>>>>>>>>> IMO, we only calculate topic hash one time. With cache
>>>>>>>> mechanism,
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> value can be reused in different groups on a same broker.
>>>>>>>>>>>>>>>>>> The CPU usage for this part is not very high.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> DJ03: Added the update path to KIP for both cases.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> DJ04: Yes, it’s a good idea. With cache mechanism and single
>>>>>>>> hash
>>>>>>>>>>>>>> per
>>>>>>>>>>>>>>>>>> group, we can balance cpu and disk usage.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> DJ05: Currently, the topic hash is only used in coordinator.
>>>>>>>>>>>>>> However, the
>>>>>>>>>>>>>>>>>> metadata image is used in many different places.
>>>>>>>>>>>>>>>>>> How about we move the hash to metadata image when we find
>>>>> more
>>>>>>>> use
>>>>>>>>>>>>>> cases?
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> AS1, AS2: Thanks for the reminder. I will simply delete
>>>>>>>>>>>>>>>>>> ShareGroupPartitionMetadataKey/Value and add a new field to
>>>>>>>>>>>>>>>>>> ShareGroupMetadataValue.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>> PoAn
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> On Dec 17, 2024, at 5:50 AM, Andrew Schofield <
>>>>>>>>>>>>>>>>>> andrew_schofield_j...@outlook.com> wrote:
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Hi PoAn,
>>>>>>>>>>>>>>>>>>> Thanks for the KIP.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> AS1: From the point of view of share groups, the API and
>>>>>> record
>>>>>>>>>>>>>> schema
>>>>>>>>>>>>>>>>>>> definitions are unstable in AK 4.0. In AK 4.1, we will
>>>>> start
>>>>>>>>>>>>>> supporting
>>>>>>>>>>>>>>>>>> proper
>>>>>>>>>>>>>>>>>>> versioning. As a result, I think you do not need to
>>>>> deprecate
>>>>>>>>>>>>>> the fields
>>>>>>>>>>>>>>>>>> in the
>>>>>>>>>>>>>>>>>>> ShareGroupPartitionMetadataValue. Just include the schema
>>>>> for
>>>>>>>>>>>>>> the fields
>>>>>>>>>>>>>>>>>>> which are actually needed, and I'll update the schema in
>>>>> the
>>>>>>>>>>>>>> code when
>>>>>>>>>>>>>>>>>>> the KIP is implemented.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> AS2: In the event that DJ04 actually removes the need for
>>>>>>>>>>>>>>>>>>> ConsumerGroupPartitionMetadataKey/Value entirely, I would
>>>>>>>> simply
>>>>>>>>>>>>>>>>>>> delete ShareGroupPartitionMetadataKey/Value, assuming that
>>>>> it
>>>>>>>> is
>>>>>>>>>>>>>>>>>>> accepted in time for AK 4.1.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>> Andrew
>>>>>>>>>>>>>>>>>>> ________________________________________
>>>>>>>>>>>>>>>>>>> From: Chia-Ping Tsai <chia7...@apache.org>
>>>>>>>>>>>>>>>>>>> Sent: 16 December 2024 16:27
>>>>>>>>>>>>>>>>>>> To: dev@kafka.apache.org <dev@kafka.apache.org>
>>>>>>>>>>>>>>>>>>> Subject: Re: [DISCUSS] KIP-1101: Trigger rebalance on rack
>>>>>>>>>>>>>> topology
>>>>>>>>>>>>>>>>>> changes
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> hi David
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> DJ05
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> One of the benefits of having a single hash per group
>>>>> (DJ04)
>>>>>> is
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> reduction in the size of stored data. Additionally, the cost
>>>>>> of
>>>>>>>>>>>>>>>>>> re-computing can be minimized thanks to caching. So I'm + 1
>>>>> to
>>>>>>>>>>>>>> DJ04.
>>>>>>>>>>>>>>>>>> However, the advantage of storing the topic cache in the
>>>>>>>> metadata
>>>>>>>>>>>>>> image is
>>>>>>>>>>>>>>>>>> somewhat unclear to me. Could you please provide more
>>>>> details
>>>>>> on
>>>>>>>>>>>>>> what you
>>>>>>>>>>>>>>>>>> mean by "tight"?Furthermore, since the metadata image is a
>>>>>>>>>>>>>> thread-safe
>>>>>>>>>>>>>>>>>> object, we need to ensure that the lazy initialization is
>>>>> also
>>>>>>>>>>>>>> thread-safe.
>>>>>>>>>>>>>>>>>> If no other components require the cache, it would be better
>>>>>> to
>>>>>>>>>>>>>> keep the
>>>>>>>>>>>>>>>>>> caches within the coordinator.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>> Chia-Ping
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> On 2024/12/16 14:01:35 David Jacot wrote:
>>>>>>>>>>>>>>>>>>>> Hi PoAn,
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Thanks for the KIP. I have some comments about it.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> DJ01: Please, remove all the code from the KIP. We only
>>>>> care
>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>>>>> public
>>>>>>>>>>>>>>>>>>>> interface changes, not about implementation details.
>>>>>>>>>>>>>>>>>>>> DJ02: Regarding the hash computation, I agree that we
>>>>> should
>>>>>>>> use
>>>>>>>>>>>>>>>>>> Murmur3.
>>>>>>>>>>>>>>>>>>>> However, I don't quite like the implementation that you
>>>>>>>> shared.
>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>> wonder if
>>>>>>>>>>>>>>>>>>>> we could make it work incrementally instead of computing a
>>>>>>>> hash
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>> everything and combining them.
>>>>>>>>>>>>>>>>>>>> DJ03: Regarding the cache, my understanding is that the
>>>>>> cache
>>>>>>>> is
>>>>>>>>>>>>>>>>>> populated
>>>>>>>>>>>>>>>>>>>> when a topic without hash is seen in a HB request and the
>>>>>>>> cache
>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>> cleaned
>>>>>>>>>>>>>>>>>>>> up when topics are deleted based on the metadata image.
>>>>>>>>>>>>>> However, the
>>>>>>>>>>>>>>>>>> update
>>>>>>>>>>>>>>>>>>>> path is not clear. Let's say that a partition is added to
>>>>> a
>>>>>>>>>>>>>> topic, how
>>>>>>>>>>>>>>>>>> does
>>>>>>>>>>>>>>>>>>>> it detect it? Let's also imagine that the racks of a
>>>>>> partition
>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>> changed, how does it detect it? In the KIP, it would be
>>>>> nice
>>>>>>>> to
>>>>>>>>>>>>>> be clear
>>>>>>>>>>>>>>>>>>>> about those.
>>>>>>>>>>>>>>>>>>>> DJ04: I wonder whether we should go with a single hash per
>>>>>>>>>>>>>> group. Your
>>>>>>>>>>>>>>>>>>>> argument against it is that it would require to re-compute
>>>>>> the
>>>>>>>>>>>>>> hash of
>>>>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>>>> the topics when it needs to be computed. In my opinion, we
>>>>>>>> could
>>>>>>>>>>>>>>>>>> leverage
>>>>>>>>>>>>>>>>>>>> the cached hash per topic to compute the hash of all the
>>>>>>>>>>>>>> subscribed
>>>>>>>>>>>>>>>>>> ones.
>>>>>>>>>>>>>>>>>>>> We could basically combine all the hashes without having
>>>>> to
>>>>>>>>>>>>>> compute all
>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>> them. This approach has a few benefits. 1) We could get
>>>>> rid
>>>>>> of
>>>>>>>>>>>>>>>>>>>> the ConsumerGroupPartitionMetadata record as we could
>>>>> store
>>>>>>>> the
>>>>>>>>>>>>>> hash
>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>> the group epoch. 2) We could get rid of the Map that we
>>>>> keep
>>>>>>>> in
>>>>>>>>>>>>>> each
>>>>>>>>>>>>>>>>>> group
>>>>>>>>>>>>>>>>>>>> to store the hashed corresponding to the subscribed
>>>>> topics.
>>>>>>>>>>>>>>>>>>>> DJ05: Regarding the cache again, I wonder if we should
>>>>>>>> actually
>>>>>>>>>>>>>> store
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> hash in the metadata image instead of maintaining it
>>>>>> somewhere
>>>>>>>>>>>>>> else. We
>>>>>>>>>>>>>>>>>>>> could still lazily compute it. The benefit is that the
>>>>> value
>>>>>>>>>>>>>> would be
>>>>>>>>>>>>>>>>>> tight
>>>>>>>>>>>>>>>>>>>> to the topic. I have not really looked into it. Would it
>>>>> be
>>>>>> an
>>>>>>>>>>>>>> option?
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> I'll be away for two weeks starting from Saturday. I
>>>>> kindly
>>>>>>>> ask
>>>>>>>>>>>>>> you to
>>>>>>>>>>>>>>>>>> wait
>>>>>>>>>>>>>>>>>>>> on me if we cannot conclude this week.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>> David
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> On Tue, Nov 5, 2024 at 1:43 PM Frank Yang <
>>>>>> yangp...@gmail.com
>>>>>>>>> 
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Hi Chia-Ping,
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Thanks for the review and suggestions.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Q0: Add how rack change and how it affects topic
>>>>> partition.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Q1: Add why we need a balance algorithm to Motivation
>>>>>>>> section.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Q2: After checking again, we don’t need to update cache
>>>>>> when
>>>>>>>>>>>>>> we replay
>>>>>>>>>>>>>>>>>>>>> records. We only need to renew it in consumer heartbeat.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Q3: Add a new section “Topic Hash Function”.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>>>>>>>> PoAn
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> On Nov 1, 2024, at 4:39 PM, Chia-Ping Tsai <
>>>>>>>>>>>>>> chia7...@gmail.com>
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> hi PoAn
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Thanks for for this KIP!
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Q0: Could you add more details about `A topic partition
>>>>>> has
>>>>>>>>>>>>>> rack
>>>>>>>>>>>>>>>>>> change`?
>>>>>>>>>>>>>>>>>>>>>> IIRC, the "rack change" includes both follower and
>>>>> leader,
>>>>>>>>>>>>>> right?
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Q1: Could you please add the 'concerns' we discussed to
>>>>>> the
>>>>>>>>>>>>>> Motivation
>>>>>>>>>>>>>>>>>>>>>> section? This should include topics like 'computations'
>>>>>> and
>>>>>>>>>>>>>> 'space
>>>>>>>>>>>>>>>>>>>>> usage'.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Q2: `The group coordinator can leverage it to add a new
>>>>>>>> topic
>>>>>>>>>>>>>>>>>> hash.`This
>>>>>>>>>>>>>>>>>>>>>> description seems a bit off to me. Why do we need to
>>>>>> update
>>>>>>>>>>>>>> the cache
>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>>>> this phase? The cache is intended to prevent duplicate
>>>>>>>>>>>>>> computations
>>>>>>>>>>>>>>>>>>>>> caused
>>>>>>>>>>>>>>>>>>>>>> by heartbeat requests that occur between two metadata
>>>>>> change
>>>>>>>>>>>>>> events.
>>>>>>>>>>>>>>>>>>>>>> Therefore, we could even remove the changed topics from
>>>>>>>>>>>>>> caches on a
>>>>>>>>>>>>>>>>>>>>>> metadata change, as the first heartbeat request would
>>>>>> update
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> caches
>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>> all changed topics.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Q3: Could you please include a section about the choice
>>>>> of
>>>>>>>>>>>>>> hash
>>>>>>>>>>>>>>>>>>>>>> implementation? The hash implementation must be
>>>>> consistent
>>>>>>>>>>>>>> across
>>>>>>>>>>>>>>>>>>>>> different
>>>>>>>>>>>>>>>>>>>>>> JDKs, so we use Murmur3 to generate the hash value.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>> Chia-Ping
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Frank Yang <yangp...@gmail.com> 於 2024年11月1日 週五
>>>>> 下午3:57寫道:
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> I would like to start a discussion thread on KIP-1101.
>>>>>>>>>>>>>> Trigger
>>>>>>>>>>>>>>>>>> rebalance
>>>>>>>>>>>>>>>>>>>>>>> on rack topology changes. In this KIP, we aim to use
>>>>> less
>>>>>>>>>>>>>> memory /
>>>>>>>>>>>>>>>>>> disk
>>>>>>>>>>>>>>>>>>>>>>> resources to detect rack changes in the new
>>>>> coordinator.
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1101%3A+Trigger+rebalance+on+rack+topology+changes
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Please take a look and feel free to share any thoughts.
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>>>>>>>>>> PoAn

Reply via email to