dajac commented on code in PR #19761:
URL: https://github.com/apache/kafka/pull/19761#discussion_r2106927220
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -550,6 +556,7 @@ private GroupMetadataManager(
this.shareGroupAssignor = shareGroupAssignor;
this.authorizerPlugin = authorizerPlugin;
this.streamsGroupAssignors =
streamsGroupAssignors.stream().collect(Collectors.toMap(TaskAssignor::name,
Function.identity()));
+ this.topicHashCache = new ConcurrentHashMap<>();
Review Comment:
nit: We can use a regular HashMap here because the GroupMetadataManager is
never used concurrently.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -151,6 +152,8 @@ public String toLowerCaseString() {
*/
private final TimelineHashMap<String, ResolvedRegularExpression>
resolvedRegularExpressions;
+ private final AtomicBoolean addSubscriptionMetadataTombstoneRecord = new
AtomicBoolean(false);
Review Comment:
We should also use a timeline data structure for this one.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java:
##########
@@ -398,6 +423,21 @@ public Map<String, TopicMetadata>
computeSubscriptionMetadata(
return Collections.unmodifiableMap(newSubscriptionMetadata);
}
+ public long computeMetadataHash(
Review Comment:
nit: This could be a static method.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##########
@@ -349,12 +349,19 @@ static void throwIfRegularExpressionIsInvalid(
* @return The hash of the group.
*/
static long computeGroupHash(Map<String, Long> topicHashes) {
- if (topicHashes.isEmpty()) {
+ // Sort entries by topic name
+ List<Map.Entry<String, Long>> sortedEntries = new ArrayList<>();
+ for (Map.Entry<String, Long> entry : topicHashes.entrySet()) {
+ // Filter out entries with a hash value of 0, which indicates no
topic
+ if (entry.getValue() != 0) {
Review Comment:
I wonder whether it is really necessary to ignore those. Having a zero does
not hurt, isn't it?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java:
##########
@@ -398,6 +423,21 @@ public Map<String, TopicMetadata>
computeSubscriptionMetadata(
return Collections.unmodifiableMap(newSubscriptionMetadata);
}
+ public long computeMetadataHash(
+ Map<String, SubscriptionCount> subscribedTopicNames,
+ Map<String, Long> topicHashCache,
+ MetadataImage metadataImage
+ ) {
+ Map<String, Long> topicHash = new
HashMap<>(subscribedTopicNames.size());
+ subscribedTopicNames.keySet().forEach(topicName -> {
Review Comment:
nit: We can remove the curly braces.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2220,6 +2227,11 @@ private
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord>
int groupEpoch = group.groupEpoch();
SubscriptionType subscriptionType = group.subscriptionType();
+ if (group.addSubscriptionMetadataTombstoneRecord()) {
+
records.add(newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId));
+ group.setAddSubscriptionMetadataTombstoneRecord(false);
Review Comment:
I wonder whether we should put this code into updateSubscriptionMetadata to
ensure that we do this in all the place. Have you considered it?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2220,6 +2227,11 @@ private
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord>
int groupEpoch = group.groupEpoch();
SubscriptionType subscriptionType = group.subscriptionType();
+ if (group.addSubscriptionMetadataTombstoneRecord()) {
+
records.add(newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId));
+ group.setAddSubscriptionMetadataTombstoneRecord(false);
Review Comment:
This is not the proper way to do this. In general, we never mutate the state
from here. We should move it to the replay method of the tombstone.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]