Hi Chia-Ping / David / Lucas,

Happy new year and thanks for the review.

DJ02: Thanks for the suggestion. I updated the PR to use Guava.

DJ03: Yes, I updated the description to mention ISR change, 
add altering partition reassignment case, and mention that
non-related topic change doesn’t trigger a rebalance.
DJ03.1: Yes, I will keep using ModernGroup#requestMetadataRefresh
to notify group.

DJ06: Updated the PR to use Guava Hashing#combineUnordered
function to combine topic hash.

DJ07: Renamed it to MetadataHash.

DJ08: Added a sample hash function to the KIP and use first byte as magic byte. 
This is also included in latest PR.

DJ09: Added two paragraphs about upgraded and downgraded.

DJ10: According to Lucas’s comment, I add StreamsGroupMetadataValue update to 
this KIP.

Thanks,
PoAn


> On Dec 20, 2024, at 3:58 PM, Chia-Ping Tsai <chia7...@gmail.com> wrote:
> 
>> because assignors are sticky.
> 
> I forgot about that spec again :(
> 
> 
> 
> 
> David Jacot <david.ja...@gmail.com> 於 2024年12月20日 週五 下午3:41寫道:
> 
>> Hi Chia-Ping,
>> 
>> DJ08: In my opinion, changing the format will be rare so it is
>> acceptable if rebalances are triggered in this case on
>> upgrade/downgrade. It is also what will happen if a cluster is
>> downgraded from 4.1 (with this KIP) to 4.0. The rebalance won't change
>> anything if the topology of the group is the same because assignors
>> are sticky. The default ones are and we recommend custom ones to also
>> be.
>> 
>> Best,
>> David
>> 
>> On Fri, Dec 20, 2024 at 2:11 AM Chia-Ping Tsai <chia7...@apache.org>
>> wrote:
>>> 
>>> ummm, it does not work for downgrade as the old coordinator has no idea
>> about new format :(
>>> 
>>> 
>>> On 2024/12/20 00:57:27 Chia-Ping Tsai wrote:
>>>> hi David
>>>> 
>>>>> DJ08:
>>>> 
>>>> That's a good question. If the "hash" lacks version control, it could
>> trigger a series of unnecessary rebalances. However, adding additional
>> information ("magic") to the hash does not help the upgraded coordinator
>> determine the "version." This means that the upgraded coordinator would
>> still trigger unnecessary rebalances because it has no way to know which
>> format to use when comparing the hash.
>>>> 
>>>> Perhaps we can add a new field to ConsumerGroupMetadataValue to
>> indicate the version of the "hash." This would allow the coordinator, when
>> handling subscription metadata, to compute the old hash and determine
>> whether an epoch bump is necessary. Additionally, the coordinator can
>> generate a new record to upgrade the hash without requiring an epoch bump.
>>>> 
>>>> Another issue is whether the coordinator should cache all versions of
>> the hash. I believe this is necessary; otherwise, during an upgrade, there
>> would be extensive recomputing of old hashes.
>>>> 
>>>> I believe this idea should also work for downgrades, and that's just
>> my two cents.
>>>> 
>>>> Best,
>>>> Chia-Ping
>>>> 
>>>> 
>>>> On 2024/12/19 14:39:41 David Jacot wrote:
>>>>> Hi PoAn and Chia-Ping,
>>>>> 
>>>>> Thanks for your responses.
>>>>> 
>>>>> DJ02: Sorry, I was not clear. I was wondering whether we could
>> compute the
>>>>> hash without having to convert to bytes before. Guava has a nice
>> interface
>>>>> for this allowing to incrementally add primitive types to the hash.
>> We can
>>>>> discuss this in the PR as it is an implementation detail.
>>>>> 
>>>>> DJ03: Thanks. I don't think that the replicas are updated when a
>> broker
>>>>> shuts down. What you said applies to the ISR. I suppose that we can
>> rely on
>>>>> the ISR changes to trigger updates. It is also worth noting
>>>>> that TopicsDelta#changedTopics is updated for every change (e.g. ISR
>>>>> change, leader change, replicas change, etc.). I suppose that it is
>> OK but
>>>>> it seems that it will trigger refreshes which are not necessary.
>> However, a
>>>>> rebalance won't be triggered because the hash won't change.
>>>>> DJ03.1: I suppose that we will continue to rely on
>>>>> ModernGroup#requestMetadataRefresh to notify groups that must
>> refresh their
>>>>> hashes. Is my understanding correct?
>>>>> 
>>>>> DJ05: Fair enough.
>>>>> 
>>>>> DJ06: You mention in two places that you would like to combine
>> hashes by
>>>>> additioning them. I wonder if this is a good practice. Intuitively,
>> I would
>>>>> have used XOR or hashed the hashed. Guava has a method for combining
>>>>> hashes. It may be worth looking into the algorithm used.
>>>>> 
>>>>> DJ07: I would rename "AllTopicHash" to "MetadataHash" in order to be
>> more
>>>>> generic.
>>>>> 
>>>>> DJ08: Regarding the per topic hash, I wonder whether we should
>> precise in
>>>>> the KIP how we will compute it. I had the following in mind:
>>>>> hash(topicName; numPartitions; [partitionId;sorted racks]). We could
>> also
>>>>> add a magic byte at the first element as a sort of version. I am not
>> sure
>>>>> whether it is needed though. I was thinking about this while
>> imagining how
>>>>> we would handle changing the format in the future.
>>>>> 
>>>>> DJ09: It would be great if we could provide more details about
>> backward
>>>>> compatibility. What happens when the cluster is upgraded or
>> downgraded?
>>>>> 
>>>>> DJ10: We should update KIP-1071. It may be worth pigging them in the
>>>>> discussion thread of KIP-1071.
>>>>> 
>>>>> Best,
>>>>> David
>>>>> 
>>>>> On Tue, Dec 17, 2024 at 9:25 AM PoAn Yang <yangp...@gmail.com>
>> wrote:
>>>>> 
>>>>>> Hi Chia-Ping / David / Andrew,
>>>>>> 
>>>>>> Thanks for the review and suggestions.
>>>>>> 
>>>>>> DJ01: Removed all implementation details.
>>>>>> 
>>>>>> DJ02: Does the “incrementally” mean that we only calculate the
>> difference
>>>>>> parts?
>>>>>> For example, if the number of partition change, we only calculate
>> the hash
>>>>>> of number of partition and reconstruct it to the topic hash.
>>>>>> IMO, we only calculate topic hash one time. With cache mechanism,
>> the
>>>>>> value can be reused in different groups on a same broker.
>>>>>> The CPU usage for this part is not very high.
>>>>>> 
>>>>>> DJ03: Added the update path to KIP for both cases.
>>>>>> 
>>>>>> DJ04: Yes, it’s a good idea. With cache mechanism and single hash
>> per
>>>>>> group, we can balance cpu and disk usage.
>>>>>> 
>>>>>> DJ05: Currently, the topic hash is only used in coordinator.
>> However, the
>>>>>> metadata image is used in many different places.
>>>>>> How about we move the hash to metadata image when we find more use
>> cases?
>>>>>> 
>>>>>> AS1, AS2: Thanks for the reminder. I will simply delete
>>>>>> ShareGroupPartitionMetadataKey/Value and add a new field to
>>>>>> ShareGroupMetadataValue.
>>>>>> 
>>>>>> Best,
>>>>>> PoAn
>>>>>> 
>>>>>>> On Dec 17, 2024, at 5:50 AM, Andrew Schofield <
>>>>>> andrew_schofield_j...@outlook.com> wrote:
>>>>>>> 
>>>>>>> Hi PoAn,
>>>>>>> Thanks for the KIP.
>>>>>>> 
>>>>>>> AS1: From the point of view of share groups, the API and record
>> schema
>>>>>>> definitions are unstable in AK 4.0. In AK 4.1, we will start
>> supporting
>>>>>> proper
>>>>>>> versioning. As a result, I think you do not need to deprecate
>> the fields
>>>>>> in the
>>>>>>> ShareGroupPartitionMetadataValue. Just include the schema for
>> the fields
>>>>>>> which are actually needed, and I'll update the schema in the
>> code when
>>>>>>> the KIP is implemented.
>>>>>>> 
>>>>>>> AS2: In the event that DJ04 actually removes the need for
>>>>>>> ConsumerGroupPartitionMetadataKey/Value entirely, I would simply
>>>>>>> delete ShareGroupPartitionMetadataKey/Value, assuming that it is
>>>>>>> accepted in time for AK 4.1.
>>>>>>> 
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> Andrew
>>>>>>> ________________________________________
>>>>>>> From: Chia-Ping Tsai <chia7...@apache.org>
>>>>>>> Sent: 16 December 2024 16:27
>>>>>>> To: dev@kafka.apache.org <dev@kafka.apache.org>
>>>>>>> Subject: Re: [DISCUSS] KIP-1101: Trigger rebalance on rack
>> topology
>>>>>> changes
>>>>>>> 
>>>>>>> hi David
>>>>>>> 
>>>>>>>> DJ05
>>>>>>> 
>>>>>>> One of the benefits of having a single hash per group (DJ04) is
>> the
>>>>>> reduction in the size of stored data. Additionally, the cost of
>>>>>> re-computing can be minimized thanks to caching. So I'm + 1 to
>> DJ04.
>>>>>> However, the advantage of storing the topic cache in the metadata
>> image is
>>>>>> somewhat unclear to me. Could you please provide more details on
>> what you
>>>>>> mean by "tight"?Furthermore, since the metadata image is a
>> thread-safe
>>>>>> object, we need to ensure that the lazy initialization is also
>> thread-safe.
>>>>>> If no other components require the cache, it would be better to
>> keep the
>>>>>> caches within the coordinator.
>>>>>>> 
>>>>>>> Best,
>>>>>>> Chia-Ping
>>>>>>> 
>>>>>>> On 2024/12/16 14:01:35 David Jacot wrote:
>>>>>>>> Hi PoAn,
>>>>>>>> 
>>>>>>>> Thanks for the KIP. I have some comments about it.
>>>>>>>> 
>>>>>>>> DJ01: Please, remove all the code from the KIP. We only care
>> about
>>>>>> public
>>>>>>>> interface changes, not about implementation details.
>>>>>>>> DJ02: Regarding the hash computation, I agree that we should use
>>>>>> Murmur3.
>>>>>>>> However, I don't quite like the implementation that you shared.
>> I
>>>>>> wonder if
>>>>>>>> we could make it work incrementally instead of computing a hash
>> of
>>>>>>>> everything and combining them.
>>>>>>>> DJ03: Regarding the cache, my understanding is that the cache is
>>>>>> populated
>>>>>>>> when a topic without hash is seen in a HB request and the cache
>> is
>>>>>> cleaned
>>>>>>>> up when topics are deleted based on the metadata image.
>> However, the
>>>>>> update
>>>>>>>> path is not clear. Let's say that a partition is added to a
>> topic, how
>>>>>> does
>>>>>>>> it detect it? Let's also imagine that the racks of a partition
>> have
>>>>>>>> changed, how does it detect it? In the KIP, it would be nice to
>> be clear
>>>>>>>> about those.
>>>>>>>> DJ04: I wonder whether we should go with a single hash per
>> group. Your
>>>>>>>> argument against it is that it would require to re-compute the
>> hash of
>>>>>> all
>>>>>>>> the topics when it needs to be computed. In my opinion, we could
>>>>>> leverage
>>>>>>>> the cached hash per topic to compute the hash of all the
>> subscribed
>>>>>> ones.
>>>>>>>> We could basically combine all the hashes without having to
>> compute all
>>>>>> of
>>>>>>>> them. This approach has a few benefits. 1) We could get rid of
>>>>>>>> the ConsumerGroupPartitionMetadata record as we could store the
>> hash
>>>>>> with
>>>>>>>> the group epoch. 2) We could get rid of the Map that we keep in
>> each
>>>>>> group
>>>>>>>> to store the hashed corresponding to the subscribed topics.
>>>>>>>> DJ05: Regarding the cache again, I wonder if we should actually
>> store
>>>>>> the
>>>>>>>> hash in the metadata image instead of maintaining it somewhere
>> else. We
>>>>>>>> could still lazily compute it. The benefit is that the value
>> would be
>>>>>> tight
>>>>>>>> to the topic. I have not really looked into it. Would it be an
>> option?
>>>>>>>> 
>>>>>>>> I'll be away for two weeks starting from Saturday. I kindly ask
>> you to
>>>>>> wait
>>>>>>>> on me if we cannot conclude this week.
>>>>>>>> 
>>>>>>>> Best,
>>>>>>>> David
>>>>>>>> 
>>>>>>>> On Tue, Nov 5, 2024 at 1:43 PM Frank Yang <yangp...@gmail.com>
>> wrote:
>>>>>>>> 
>>>>>>>>> Hi Chia-Ping,
>>>>>>>>> 
>>>>>>>>> Thanks for the review and suggestions.
>>>>>>>>> 
>>>>>>>>> Q0: Add how rack change and how it affects topic partition.
>>>>>>>>> 
>>>>>>>>> Q1: Add why we need a balance algorithm to Motivation section.
>>>>>>>>> 
>>>>>>>>> Q2: After checking again, we don’t need to update cache when
>> we replay
>>>>>>>>> records. We only need to renew it in consumer heartbeat.
>>>>>>>>> 
>>>>>>>>> Q3: Add a new section “Topic Hash Function”.
>>>>>>>>> 
>>>>>>>>> Thanks.
>>>>>>>>> PoAn
>>>>>>>>> 
>>>>>>>>>> On Nov 1, 2024, at 4:39 PM, Chia-Ping Tsai <
>> chia7...@gmail.com>
>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>> hi PoAn
>>>>>>>>>> 
>>>>>>>>>> Thanks for for this KIP!
>>>>>>>>>> 
>>>>>>>>>> Q0: Could you add more details about `A topic partition has
>> rack
>>>>>> change`?
>>>>>>>>>> IIRC, the "rack change" includes both follower and leader,
>> right?
>>>>>>>>>> 
>>>>>>>>>> Q1: Could you please add the 'concerns' we discussed to the
>> Motivation
>>>>>>>>>> section? This should include topics like 'computations' and
>> 'space
>>>>>>>>> usage'.
>>>>>>>>>> 
>>>>>>>>>> Q2: `The group coordinator can leverage it to add a new topic
>>>>>> hash.`This
>>>>>>>>>> description seems a bit off to me. Why do we need to update
>> the cache
>>>>>> at
>>>>>>>>>> this phase? The cache is intended to prevent duplicate
>> computations
>>>>>>>>> caused
>>>>>>>>>> by heartbeat requests that occur between two metadata change
>> events.
>>>>>>>>>> Therefore, we could even remove the changed topics from
>> caches on a
>>>>>>>>>> metadata change, as the first heartbeat request would update
>> the
>>>>>> caches
>>>>>>>>> for
>>>>>>>>>> all changed topics.
>>>>>>>>>> 
>>>>>>>>>> Q3: Could you please include a section about the choice of
>> hash
>>>>>>>>>> implementation? The hash implementation must be consistent
>> across
>>>>>>>>> different
>>>>>>>>>> JDKs, so we use Murmur3 to generate the hash value.
>>>>>>>>>> 
>>>>>>>>>> Best,
>>>>>>>>>> Chia-Ping
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> Frank Yang <yangp...@gmail.com> 於 2024年11月1日 週五 下午3:57寫道:
>>>>>>>>>> 
>>>>>>>>>>> Hi all,
>>>>>>>>>>> 
>>>>>>>>>>> I would like to start a discussion thread on KIP-1101.
>> Trigger
>>>>>> rebalance
>>>>>>>>>>> on rack topology changes. In this KIP, we aim to use less
>> memory /
>>>>>> disk
>>>>>>>>>>> resources to detect rack changes in the new coordinator.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1101%3A+Trigger+rebalance+on+rack+topology+changes
>>>>>>>>>>> 
>>>>>>>>>>> Please take a look and feel free to share any thoughts.
>>>>>>>>>>> 
>>>>>>>>>>> Thanks.
>>>>>>>>>>> PoAn
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> 

Reply via email to