Hi David and Lucas,

Thanks for the review and great suggestion.

DJ01: Add the statement about rack-aware rebalance was removed in 4.0.

DJ02: Change ConsumerGroupPartitionMetadataKey and
ConsumerGroupPartitionMetadataValue from removed to deprecated.

DJ03: In my sample implementation <https://github.com/apache/kafka/pull/17444>, 
the cache is maintained in GroupCoordinatorManager.
A GroupCoordinatorShard only have one GroupCoordinatorManager, so I think it’s 
similar to
a cache per GroupCoordinatorShard instance.

DJ04: Add the order about how topic data will be computed to get a topic hash.

DJ05: This is a good question. In topic hash, the Guava uses “final mix” to 
ensure avalanche
effect. However for the single hash of subscribed topics in a group, we cannot 
use simple
sum, because we need to avoid the edge case. For example, hash(a+b) and 
hash(b+a) is
same. To fix this case, the hash function sorts topics by name and the topic 
hash multiplied
by the position value like hash(a + 2b) and hash(b + 2a). I add this case and 
regular expression case to the KIP.

DJ06: It’s good to add tombstone of ConsumerGroupPartitionMetadata, so log 
compaction
can remove old data. Add this part to compatibility section.

DJ07: In previous discussion, you mentioned that assignors are sticky. If the 
topology of the
group is the same, the upgrade/downgrade rebalance doesn’t change anything.

>>>>>>>>>> DJ08: In my opinion, changing the format will be rare so it is
>>>>>>>>>> acceptable if rebalances are triggered in this case on
>>>>>>>>>> upgrade/downgrade. It is also what will happen if a cluster is
>>>>>>>>>> downgraded from 4.1 (with this KIP) to 4.0. The rebalance won't
>>>> change
>>>>>>>>>> anything if the topology of the group is the same because
>> assignors
>>>>>>>>>> are sticky. The default ones are and we recommend custom ones to
>>>> also
>>>>>>>>>> be.

With `group.version` change, we need to maintain two code in 4.1. One is 
subscription
metadata without rack and another is group hash. Another drawback is that 
changing
`group.version` will trigger all group rebalance at the same time. IMHO, I 
would prefer
to keep the code simple.

DJ08: Add ConsumerGroupHeartbeatRequestTest to test plan.

LB01: Add the removal of StreamGroupPartitionMetadataKey / 
StreamGrouprtitionMetadataValue to public interface change section.

Thank you.

Regards,
PoAn

> On Apr 9, 2025, at 5:11 PM, Lucas Brutschy <lbruts...@confluent.io.INVALID> 
> wrote:
> 
> Hi PoAn,
> 
> LB01: please add the removal of StreamsGroupPartitionMetadataKey /
> StreamsGroupPartitionMetadataValue to the KIP. Since these were not
> yet included in 4.0, we can just remove them as long as this KIP hits
> 4.1. I can help with the integration into the `streamsGroupHeartbeat`,
> as it will be a little different, but we do not need to call this out
> in the KIP.
> 
> Thanks for your work on this!
> 
> Cheers,
> Lucas
> 
> On Wed, Apr 9, 2025 at 9:59 AM David Jacot <dja...@confluent.io.invalid> 
> wrote:
>> 
>> Hi PoAn,
>> 
>> I finally had a bit of time to review your KIP. First, let me say that the
>> KIP is well written! I have some more comments:
>> 
>> DJ01: In the motivation, could you please mention that we actually removed
>> triggering rebalances based on rack changes in 4.0? You explained very well
>> why we removed it but it is not clear that we removed it.
>> 
>> DJ02: We cannot delete ConsumerGroupPartitionMetadataKey
>> and ConsumerGroupPartitionMetadataValue because the coordinator must still
>> be able to read the record from the log when the cluster is upgraded.
>> However, we can deprecate them.
>> 
>> DJ03: Could you clarify how the cache will be scoped? My understanding is
>> that we will maintain a cache per GroupCoordinatorShard instance. Is this
>> correct?
>> 
>> DJ04: Regarding the topic hash function, would it be possible to specify
>> how the hash will be computed instead of providing the signature of the
>> method?
>> 
>> DJ05: I have a general question about how you propose to combine hashes. Is
>> using a sum enough? I was looking at the Guava implementation and they also
>> apply a "final mix" to ensure an avalanche effect. It would be great to
>> elaborate a bit more on this in the KIP.
>> 
>> DJ06: After the upgrade, we need to ensure that a tombstone is written for
>> the ConsumerGroupPartitionMetadata record if it was present. I suppose that
>> this is something that we could do when the next metadata hash is computed.
>> Have you thought about it?
>> 
>> DJ07: I wonder whether we should gate the change with `group.version`. My
>> concern is that during upgrade, not all the coordinators will support the
>> new way and groups may bounce between those coordinators as their
>> underlying __consumer_offsets partitions are moved within the cluster
>> (during the roll). This may trigger unnecessary rebalances. One way to
>> avoid this is to gate the change with `group.version` which is only bumped
>> at the end of the cluster upgrade process. What do you think?
>> 
>> DJ08: Regarding the test plan, we should also extend
>> ConsumerGroupHeartbeatRequestTest to cover the new feature.
>> 
>> Thanks for your hard work on this one. Let's try to get it in 4.1.
>> 
>> Best,
>> David
>> 
>> On Mon, Mar 24, 2025 at 8:17 PM Jeff Kim <jeff....@confluent.io.invalid>
>> wrote:
>> 
>>> Hi PoAn,
>>> 
>>> Thanks for the clarification!
>>> 
>>> Best,
>>> Jeff
>>> 
>>> On Mon, Mar 24, 2025 at 11:43 AM PoAn Yang <yangp...@gmail.com> wrote:
>>> 
>>>> Hi Jeff,
>>>> 
>>>> Thanks for the review.
>>>> 
>>>> JK01. Yes, we already maintain `groupByTopics` data structure in the
>>>> coordinator. If a group leaves,
>>>> it checks whether a topic is subscribed by any group. If it’s not, remove
>>>> topic hash from the cache.
>>>> Please check the sample implementation about
>>>> GroupMetadataManager#unsubscribeGroupFromTopic
>>>> in #17444 <https://github.com/apache/kafka/pull/17444>.
>>>> 
>>>> JK02: This KIP replaces subscription metadata by a group hash, so the
>>>> coordinator calculates group
>>>> hash where we calculated subscription metadata. The trigger points are
>>>> like a group member is updated,
>>>> , new group member, new subscribed topic names, new regular expressions,
>>>> and new metadata (topic change).
>>>> 
>>>> Best Regards,
>>>> PoAn
>>>> 
>>>>> On Mar 22, 2025, at 3:25 AM, Jeff Kim <jeff....@confluent.io.INVALID>
>>>> wrote:
>>>>> 
>>>>> Hi PoAn,
>>>>> 
>>>>> I was late to the discussion but I had some questions.
>>>>> 
>>>>> JK01. I foresee scenarios where the cache leaves stale topic hashes
>>>> around
>>>>> when a group leaves.
>>>>> Should we maintain a counter for the number of groups subscribed to the
>>>>> topic (hash) to only keep active topics?
>>>>> 
>>>>> JK02. Can you help me understand when we would actually be computing
>>>>> group-level hashes, is it on every
>>>>> heartbeat?
>>>>> 
>>>>> Thanks,
>>>>> Jeff
>>>>> 
>>>>> On Wed, Mar 12, 2025 at 11:33 PM PoAn Yang <yangp...@gmail.com> wrote:
>>>>> 
>>>>>> Hi David,
>>>>>> 
>>>>>> Yes, I will continue working on this.
>>>>>> 
>>>>>> DJ02: As Kirk noted earlier, since the topic hash calculation is on
>>>> server
>>>>>> side (not client-side), we can safely introduce Guava as a dependency.
>>>>>> 
>>>>>> If there is no other discussion, we can start voting in
>>>>>> https://lists.apache.org/thread/j90htmphjk783p697jjfg1xt8thmy33p. We
>>>> can
>>>>>> discuss implementation details in PR
>>>>>> https://github.com/apache/kafka/pull/17444. I will resolve merge
>>>>>> conflicts by March 14th.
>>>>>> 
>>>>>> Thanks,
>>>>>> PoAn
>>>>>> 
>>>>>>> On Mar 13, 2025, at 12:43 AM, David Jacot <david.ja...@gmail.com>
>>>> wrote:
>>>>>>> 
>>>>>>> Hi PoAn,
>>>>>>> 
>>>>>>> As we are getting closer to releasing 4.0, I think that we can resume
>>>>>>> working on this one if you are still interested. Are you? I would
>>> like
>>>>>>> to have it in 4.1.
>>>>>>> 
>>>>>>> Best,
>>>>>>> David
>>>>>>> 
>>>>>>> On Wed, Jan 15, 2025 at 1:26 AM Kirk True <k...@kirktrue.pro> wrote:
>>>>>>>> 
>>>>>>>> Hi all,
>>>>>>>> 
>>>>>>>> Hopefully a quick question...
>>>>>>>> 
>>>>>>>> KT01. Will clients calculate the topic hash on the client? Based on
>>>> the
>>>>>> current state of the KIP and PR, I would have thought "no", but I ask
>>>> based
>>>>>> on the discussion around the possible use of Guava on client.
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> Kirk
>>>>>>>> 
>>>>>>>> On Mon, Jan 6, 2025, at 9:11 AM, David Jacot wrote:
>>>>>>>>> Hi PoAn,
>>>>>>>>> 
>>>>>>>>> Thanks for the update. I haven't read the updated KIP yet.
>>>>>>>>> 
>>>>>>>>> DJ02: I am not sure about using Guava as a dependency. I mentioned
>>> it
>>>>>> more
>>>>>>>>> as an inspiration/reference. I suppose that we could use it on the
>>>>>> server
>>>>>>>>> but we should definitely not use it on the client. I am not sure
>>> how
>>>>>> others
>>>>>>>>> feel about it.
>>>>>>>>> 
>>>>>>>>> Best,
>>>>>>>>> David
>>>>>>>>> 
>>>>>>>>> On Mon, Jan 6, 2025 at 5:21 AM PoAn Yang <yangp...@gmail.com>
>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Hi Chia-Ping / David / Lucas,
>>>>>>>>>> 
>>>>>>>>>> Happy new year and thanks for the review.
>>>>>>>>>> 
>>>>>>>>>> DJ02: Thanks for the suggestion. I updated the PR to use Guava.
>>>>>>>>>> 
>>>>>>>>>> DJ03: Yes, I updated the description to mention ISR change,
>>>>>>>>>> add altering partition reassignment case, and mention that
>>>>>>>>>> non-related topic change doesn’t trigger a rebalance.
>>>>>>>>>> DJ03.1: Yes, I will keep using ModernGroup#requestMetadataRefresh
>>>>>>>>>> to notify group.
>>>>>>>>>> 
>>>>>>>>>> DJ06: Updated the PR to use Guava Hashing#combineUnordered
>>>>>>>>>> function to combine topic hash.
>>>>>>>>>> 
>>>>>>>>>> DJ07: Renamed it to MetadataHash.
>>>>>>>>>> 
>>>>>>>>>> DJ08: Added a sample hash function to the KIP and use first byte
>>> as
>>>>>> magic
>>>>>>>>>> byte. This is also included in latest PR.
>>>>>>>>>> 
>>>>>>>>>> DJ09: Added two paragraphs about upgraded and downgraded.
>>>>>>>>>> 
>>>>>>>>>> DJ10: According to Lucas’s comment, I add
>>> StreamsGroupMetadataValue
>>>>>> update
>>>>>>>>>> to this KIP.
>>>>>>>>>> 
>>>>>>>>>> Thanks,
>>>>>>>>>> PoAn
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>>> On Dec 20, 2024, at 3:58 PM, Chia-Ping Tsai <chia7...@gmail.com>
>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> because assignors are sticky.
>>>>>>>>>>> 
>>>>>>>>>>> I forgot about that spec again :(
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> David Jacot <david.ja...@gmail.com> 於 2024年12月20日 週五 下午3:41寫道:
>>>>>>>>>>> 
>>>>>>>>>>>> Hi Chia-Ping,
>>>>>>>>>>>> 
>>>>>>>>>>>> DJ08: In my opinion, changing the format will be rare so it is
>>>>>>>>>>>> acceptable if rebalances are triggered in this case on
>>>>>>>>>>>> upgrade/downgrade. It is also what will happen if a cluster is
>>>>>>>>>>>> downgraded from 4.1 (with this KIP) to 4.0. The rebalance won't
>>>>>> change
>>>>>>>>>>>> anything if the topology of the group is the same because
>>>> assignors
>>>>>>>>>>>> are sticky. The default ones are and we recommend custom ones to
>>>>>> also
>>>>>>>>>>>> be.
>>>>>>>>>>>> 
>>>>>>>>>>>> Best,
>>>>>>>>>>>> David
>>>>>>>>>>>> 
>>>>>>>>>>>> On Fri, Dec 20, 2024 at 2:11 AM Chia-Ping Tsai <
>>>> chia7...@apache.org
>>>>>>> 
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> ummm, it does not work for downgrade as the old coordinator has
>>>> no
>>>>>> idea
>>>>>>>>>>>> about new format :(
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On 2024/12/20 00:57:27 Chia-Ping Tsai wrote:
>>>>>>>>>>>>>> hi David
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> DJ08:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> That's a good question. If the "hash" lacks version control,
>>> it
>>>>>> could
>>>>>>>>>>>> trigger a series of unnecessary rebalances. However, adding
>>>>>> additional
>>>>>>>>>>>> information ("magic") to the hash does not help the upgraded
>>>>>> coordinator
>>>>>>>>>>>> determine the "version." This means that the upgraded
>>> coordinator
>>>>>> would
>>>>>>>>>>>> still trigger unnecessary rebalances because it has no way to
>>> know
>>>>>> which
>>>>>>>>>>>> format to use when comparing the hash.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Perhaps we can add a new field to ConsumerGroupMetadataValue
>>> to
>>>>>>>>>>>> indicate the version of the "hash." This would allow the
>>>>>> coordinator,
>>>>>>>>>> when
>>>>>>>>>>>> handling subscription metadata, to compute the old hash and
>>>>>> determine
>>>>>>>>>>>> whether an epoch bump is necessary. Additionally, the
>>> coordinator
>>>>>> can
>>>>>>>>>>>> generate a new record to upgrade the hash without requiring an
>>>> epoch
>>>>>>>>>> bump.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Another issue is whether the coordinator should cache all
>>>>>> versions of
>>>>>>>>>>>> the hash. I believe this is necessary; otherwise, during an
>>>> upgrade,
>>>>>>>>>> there
>>>>>>>>>>>> would be extensive recomputing of old hashes.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I believe this idea should also work for downgrades, and
>>> that's
>>>>>> just
>>>>>>>>>>>> my two cents.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>> Chia-Ping
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On 2024/12/19 14:39:41 David Jacot wrote:
>>>>>>>>>>>>>>> Hi PoAn and Chia-Ping,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Thanks for your responses.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> DJ02: Sorry, I was not clear. I was wondering whether we
>>> could
>>>>>>>>>>>> compute the
>>>>>>>>>>>>>>> hash without having to convert to bytes before. Guava has a
>>>> nice
>>>>>>>>>>>> interface
>>>>>>>>>>>>>>> for this allowing to incrementally add primitive types to the
>>>>>> hash.
>>>>>>>>>>>> We can
>>>>>>>>>>>>>>> discuss this in the PR as it is an implementation detail.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> DJ03: Thanks. I don't think that the replicas are updated
>>> when
>>>> a
>>>>>>>>>>>> broker
>>>>>>>>>>>>>>> shuts down. What you said applies to the ISR. I suppose that
>>> we
>>>>>> can
>>>>>>>>>>>> rely on
>>>>>>>>>>>>>>> the ISR changes to trigger updates. It is also worth noting
>>>>>>>>>>>>>>> that TopicsDelta#changedTopics is updated for every change
>>>> (e.g.
>>>>>> ISR
>>>>>>>>>>>>>>> change, leader change, replicas change, etc.). I suppose that
>>>> it
>>>>>> is
>>>>>>>>>>>> OK but
>>>>>>>>>>>>>>> it seems that it will trigger refreshes which are not
>>>> necessary.
>>>>>>>>>>>> However, a
>>>>>>>>>>>>>>> rebalance won't be triggered because the hash won't change.
>>>>>>>>>>>>>>> DJ03.1: I suppose that we will continue to rely on
>>>>>>>>>>>>>>> ModernGroup#requestMetadataRefresh to notify groups that must
>>>>>>>>>>>> refresh their
>>>>>>>>>>>>>>> hashes. Is my understanding correct?
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> DJ05: Fair enough.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> DJ06: You mention in two places that you would like to
>>> combine
>>>>>>>>>>>> hashes by
>>>>>>>>>>>>>>> additioning them. I wonder if this is a good practice.
>>>>>> Intuitively,
>>>>>>>>>>>> I would
>>>>>>>>>>>>>>> have used XOR or hashed the hashed. Guava has a method for
>>>>>> combining
>>>>>>>>>>>>>>> hashes. It may be worth looking into the algorithm used.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> DJ07: I would rename "AllTopicHash" to "MetadataHash" in
>>> order
>>>>>> to be
>>>>>>>>>>>> more
>>>>>>>>>>>>>>> generic.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> DJ08: Regarding the per topic hash, I wonder whether we
>>> should
>>>>>>>>>>>> precise in
>>>>>>>>>>>>>>> the KIP how we will compute it. I had the following in mind:
>>>>>>>>>>>>>>> hash(topicName; numPartitions; [partitionId;sorted racks]).
>>> We
>>>>>> could
>>>>>>>>>>>> also
>>>>>>>>>>>>>>> add a magic byte at the first element as a sort of version. I
>>>> am
>>>>>> not
>>>>>>>>>>>> sure
>>>>>>>>>>>>>>> whether it is needed though. I was thinking about this while
>>>>>>>>>>>> imagining how
>>>>>>>>>>>>>>> we would handle changing the format in the future.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> DJ09: It would be great if we could provide more details
>>> about
>>>>>>>>>>>> backward
>>>>>>>>>>>>>>> compatibility. What happens when the cluster is upgraded or
>>>>>>>>>>>> downgraded?
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> DJ10: We should update KIP-1071. It may be worth pigging them
>>>> in
>>>>>> the
>>>>>>>>>>>>>>> discussion thread of KIP-1071.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>> David
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On Tue, Dec 17, 2024 at 9:25 AM PoAn Yang <
>>> yangp...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Hi Chia-Ping / David / Andrew,
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Thanks for the review and suggestions.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> DJ01: Removed all implementation details.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> DJ02: Does the “incrementally” mean that we only calculate
>>> the
>>>>>>>>>>>> difference
>>>>>>>>>>>>>>>> parts?
>>>>>>>>>>>>>>>> For example, if the number of partition change, we only
>>>>>> calculate
>>>>>>>>>>>> the hash
>>>>>>>>>>>>>>>> of number of partition and reconstruct it to the topic hash.
>>>>>>>>>>>>>>>> IMO, we only calculate topic hash one time. With cache
>>>>>> mechanism,
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> value can be reused in different groups on a same broker.
>>>>>>>>>>>>>>>> The CPU usage for this part is not very high.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> DJ03: Added the update path to KIP for both cases.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> DJ04: Yes, it’s a good idea. With cache mechanism and single
>>>>>> hash
>>>>>>>>>>>> per
>>>>>>>>>>>>>>>> group, we can balance cpu and disk usage.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> DJ05: Currently, the topic hash is only used in coordinator.
>>>>>>>>>>>> However, the
>>>>>>>>>>>>>>>> metadata image is used in many different places.
>>>>>>>>>>>>>>>> How about we move the hash to metadata image when we find
>>> more
>>>>>> use
>>>>>>>>>>>> cases?
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> AS1, AS2: Thanks for the reminder. I will simply delete
>>>>>>>>>>>>>>>> ShareGroupPartitionMetadataKey/Value and add a new field to
>>>>>>>>>>>>>>>> ShareGroupMetadataValue.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>> PoAn
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> On Dec 17, 2024, at 5:50 AM, Andrew Schofield <
>>>>>>>>>>>>>>>> andrew_schofield_j...@outlook.com> wrote:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Hi PoAn,
>>>>>>>>>>>>>>>>> Thanks for the KIP.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> AS1: From the point of view of share groups, the API and
>>>> record
>>>>>>>>>>>> schema
>>>>>>>>>>>>>>>>> definitions are unstable in AK 4.0. In AK 4.1, we will
>>> start
>>>>>>>>>>>> supporting
>>>>>>>>>>>>>>>> proper
>>>>>>>>>>>>>>>>> versioning. As a result, I think you do not need to
>>> deprecate
>>>>>>>>>>>> the fields
>>>>>>>>>>>>>>>> in the
>>>>>>>>>>>>>>>>> ShareGroupPartitionMetadataValue. Just include the schema
>>> for
>>>>>>>>>>>> the fields
>>>>>>>>>>>>>>>>> which are actually needed, and I'll update the schema in
>>> the
>>>>>>>>>>>> code when
>>>>>>>>>>>>>>>>> the KIP is implemented.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> AS2: In the event that DJ04 actually removes the need for
>>>>>>>>>>>>>>>>> ConsumerGroupPartitionMetadataKey/Value entirely, I would
>>>>>> simply
>>>>>>>>>>>>>>>>> delete ShareGroupPartitionMetadataKey/Value, assuming that
>>> it
>>>>>> is
>>>>>>>>>>>>>>>>> accepted in time for AK 4.1.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>> Andrew
>>>>>>>>>>>>>>>>> ________________________________________
>>>>>>>>>>>>>>>>> From: Chia-Ping Tsai <chia7...@apache.org>
>>>>>>>>>>>>>>>>> Sent: 16 December 2024 16:27
>>>>>>>>>>>>>>>>> To: dev@kafka.apache.org <dev@kafka.apache.org>
>>>>>>>>>>>>>>>>> Subject: Re: [DISCUSS] KIP-1101: Trigger rebalance on rack
>>>>>>>>>>>> topology
>>>>>>>>>>>>>>>> changes
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> hi David
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> DJ05
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> One of the benefits of having a single hash per group
>>> (DJ04)
>>>> is
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> reduction in the size of stored data. Additionally, the cost
>>>> of
>>>>>>>>>>>>>>>> re-computing can be minimized thanks to caching. So I'm + 1
>>> to
>>>>>>>>>>>> DJ04.
>>>>>>>>>>>>>>>> However, the advantage of storing the topic cache in the
>>>>>> metadata
>>>>>>>>>>>> image is
>>>>>>>>>>>>>>>> somewhat unclear to me. Could you please provide more
>>> details
>>>> on
>>>>>>>>>>>> what you
>>>>>>>>>>>>>>>> mean by "tight"?Furthermore, since the metadata image is a
>>>>>>>>>>>> thread-safe
>>>>>>>>>>>>>>>> object, we need to ensure that the lazy initialization is
>>> also
>>>>>>>>>>>> thread-safe.
>>>>>>>>>>>>>>>> If no other components require the cache, it would be better
>>>> to
>>>>>>>>>>>> keep the
>>>>>>>>>>>>>>>> caches within the coordinator.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>> Chia-Ping
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> On 2024/12/16 14:01:35 David Jacot wrote:
>>>>>>>>>>>>>>>>>> Hi PoAn,
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Thanks for the KIP. I have some comments about it.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> DJ01: Please, remove all the code from the KIP. We only
>>> care
>>>>>>>>>>>> about
>>>>>>>>>>>>>>>> public
>>>>>>>>>>>>>>>>>> interface changes, not about implementation details.
>>>>>>>>>>>>>>>>>> DJ02: Regarding the hash computation, I agree that we
>>> should
>>>>>> use
>>>>>>>>>>>>>>>> Murmur3.
>>>>>>>>>>>>>>>>>> However, I don't quite like the implementation that you
>>>>>> shared.
>>>>>>>>>>>> I
>>>>>>>>>>>>>>>> wonder if
>>>>>>>>>>>>>>>>>> we could make it work incrementally instead of computing a
>>>>>> hash
>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>> everything and combining them.
>>>>>>>>>>>>>>>>>> DJ03: Regarding the cache, my understanding is that the
>>>> cache
>>>>>> is
>>>>>>>>>>>>>>>> populated
>>>>>>>>>>>>>>>>>> when a topic without hash is seen in a HB request and the
>>>>>> cache
>>>>>>>>>>>> is
>>>>>>>>>>>>>>>> cleaned
>>>>>>>>>>>>>>>>>> up when topics are deleted based on the metadata image.
>>>>>>>>>>>> However, the
>>>>>>>>>>>>>>>> update
>>>>>>>>>>>>>>>>>> path is not clear. Let's say that a partition is added to
>>> a
>>>>>>>>>>>> topic, how
>>>>>>>>>>>>>>>> does
>>>>>>>>>>>>>>>>>> it detect it? Let's also imagine that the racks of a
>>>> partition
>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>> changed, how does it detect it? In the KIP, it would be
>>> nice
>>>>>> to
>>>>>>>>>>>> be clear
>>>>>>>>>>>>>>>>>> about those.
>>>>>>>>>>>>>>>>>> DJ04: I wonder whether we should go with a single hash per
>>>>>>>>>>>> group. Your
>>>>>>>>>>>>>>>>>> argument against it is that it would require to re-compute
>>>> the
>>>>>>>>>>>> hash of
>>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>> the topics when it needs to be computed. In my opinion, we
>>>>>> could
>>>>>>>>>>>>>>>> leverage
>>>>>>>>>>>>>>>>>> the cached hash per topic to compute the hash of all the
>>>>>>>>>>>> subscribed
>>>>>>>>>>>>>>>> ones.
>>>>>>>>>>>>>>>>>> We could basically combine all the hashes without having
>>> to
>>>>>>>>>>>> compute all
>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>> them. This approach has a few benefits. 1) We could get
>>> rid
>>>> of
>>>>>>>>>>>>>>>>>> the ConsumerGroupPartitionMetadata record as we could
>>> store
>>>>>> the
>>>>>>>>>>>> hash
>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>> the group epoch. 2) We could get rid of the Map that we
>>> keep
>>>>>> in
>>>>>>>>>>>> each
>>>>>>>>>>>>>>>> group
>>>>>>>>>>>>>>>>>> to store the hashed corresponding to the subscribed
>>> topics.
>>>>>>>>>>>>>>>>>> DJ05: Regarding the cache again, I wonder if we should
>>>>>> actually
>>>>>>>>>>>> store
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> hash in the metadata image instead of maintaining it
>>>> somewhere
>>>>>>>>>>>> else. We
>>>>>>>>>>>>>>>>>> could still lazily compute it. The benefit is that the
>>> value
>>>>>>>>>>>> would be
>>>>>>>>>>>>>>>> tight
>>>>>>>>>>>>>>>>>> to the topic. I have not really looked into it. Would it
>>> be
>>>> an
>>>>>>>>>>>> option?
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> I'll be away for two weeks starting from Saturday. I
>>> kindly
>>>>>> ask
>>>>>>>>>>>> you to
>>>>>>>>>>>>>>>> wait
>>>>>>>>>>>>>>>>>> on me if we cannot conclude this week.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>> David
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> On Tue, Nov 5, 2024 at 1:43 PM Frank Yang <
>>>> yangp...@gmail.com
>>>>>>> 
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Hi Chia-Ping,
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Thanks for the review and suggestions.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Q0: Add how rack change and how it affects topic
>>> partition.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Q1: Add why we need a balance algorithm to Motivation
>>>>>> section.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Q2: After checking again, we don’t need to update cache
>>>> when
>>>>>>>>>>>> we replay
>>>>>>>>>>>>>>>>>>> records. We only need to renew it in consumer heartbeat.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Q3: Add a new section “Topic Hash Function”.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>>>>>> PoAn
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> On Nov 1, 2024, at 4:39 PM, Chia-Ping Tsai <
>>>>>>>>>>>> chia7...@gmail.com>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> hi PoAn
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Thanks for for this KIP!
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Q0: Could you add more details about `A topic partition
>>>> has
>>>>>>>>>>>> rack
>>>>>>>>>>>>>>>> change`?
>>>>>>>>>>>>>>>>>>>> IIRC, the "rack change" includes both follower and
>>> leader,
>>>>>>>>>>>> right?
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Q1: Could you please add the 'concerns' we discussed to
>>>> the
>>>>>>>>>>>> Motivation
>>>>>>>>>>>>>>>>>>>> section? This should include topics like 'computations'
>>>> and
>>>>>>>>>>>> 'space
>>>>>>>>>>>>>>>>>>> usage'.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Q2: `The group coordinator can leverage it to add a new
>>>>>> topic
>>>>>>>>>>>>>>>> hash.`This
>>>>>>>>>>>>>>>>>>>> description seems a bit off to me. Why do we need to
>>>> update
>>>>>>>>>>>> the cache
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>> this phase? The cache is intended to prevent duplicate
>>>>>>>>>>>> computations
>>>>>>>>>>>>>>>>>>> caused
>>>>>>>>>>>>>>>>>>>> by heartbeat requests that occur between two metadata
>>>> change
>>>>>>>>>>>> events.
>>>>>>>>>>>>>>>>>>>> Therefore, we could even remove the changed topics from
>>>>>>>>>>>> caches on a
>>>>>>>>>>>>>>>>>>>> metadata change, as the first heartbeat request would
>>>> update
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> caches
>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>> all changed topics.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Q3: Could you please include a section about the choice
>>> of
>>>>>>>>>>>> hash
>>>>>>>>>>>>>>>>>>>> implementation? The hash implementation must be
>>> consistent
>>>>>>>>>>>> across
>>>>>>>>>>>>>>>>>>> different
>>>>>>>>>>>>>>>>>>>> JDKs, so we use Murmur3 to generate the hash value.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>> Chia-Ping
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Frank Yang <yangp...@gmail.com> 於 2024年11月1日 週五
>>> 下午3:57寫道:
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> I would like to start a discussion thread on KIP-1101.
>>>>>>>>>>>> Trigger
>>>>>>>>>>>>>>>> rebalance
>>>>>>>>>>>>>>>>>>>>> on rack topology changes. In this KIP, we aim to use
>>> less
>>>>>>>>>>>> memory /
>>>>>>>>>>>>>>>> disk
>>>>>>>>>>>>>>>>>>>>> resources to detect rack changes in the new
>>> coordinator.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>> 
>>>> 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1101%3A+Trigger+rebalance+on+rack+topology+changes
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Please take a look and feel free to share any thoughts.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>>>>>>>> PoAn
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>> 
>>>>>> 
>>>> 
>>>> 
>>> 

Reply via email to