dajac commented on code in PR #19761:
URL: https://github.com/apache/kafka/pull/19761#discussion_r2107327586
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3624,24 +3629,28 @@ private UpdateSubscriptionMetadataResult
updateSubscriptionMetadata(
numMembers
);
- if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+ if (groupMetadataHash != group.metadataHash()) {
if (log.isDebugEnabled()) {
- log.debug("[GroupId {}] Computed new subscription metadata:
{}.",
- groupId, subscriptionMetadata);
+ log.debug("[GroupId {}] Computed new metadata hash: {}.",
+ groupId, groupMetadataHash);
}
bumpGroupEpoch = true;
- records.add(newConsumerGroupSubscriptionMetadataRecord(groupId,
subscriptionMetadata));
}
if (bumpGroupEpoch) {
groupEpoch += 1;
- records.add(newConsumerGroupEpochRecord(groupId, groupEpoch, 0));
- log.info("[GroupId {}] Bumped group epoch to {}.", groupId,
groupEpoch);
+ records.add(newConsumerGroupEpochRecord(groupId, groupEpoch,
groupMetadataHash));
+ log.info("[GroupId {}] Bumped group epoch to {} with metadata hash
{}.", groupId, groupEpoch, groupMetadataHash);
metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
}
group.setMetadataRefreshDeadline(currentTimeMs +
METADATA_REFRESH_INTERVAL_MS, groupEpoch);
+ if (group.addSubscriptionMetadataTombstoneRecord()) {
Review Comment:
nit: I wonder whether we should call it `hasSubscriptionMetadataRecord()`.
What do you think? I also suggest to add a comment explaining this block.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -490,6 +490,11 @@ GroupMetadataManager build() {
*/
private MetadataImage metadataImage;
+ /**
+ * The topic hash value by topic name.
+ */
Review Comment:
It would be great if we could expand the comment to explain how we maintain
this cache.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##########
@@ -349,12 +349,19 @@ static void throwIfRegularExpressionIsInvalid(
* @return The hash of the group.
*/
static long computeGroupHash(Map<String, Long> topicHashes) {
- if (topicHashes.isEmpty()) {
+ // Sort entries by topic name
+ List<Map.Entry<String, Long>> sortedEntries = new ArrayList<>();
+ for (Map.Entry<String, Long> entry : topicHashes.entrySet()) {
+ // Filter out entries with a hash value of 0, which indicates no
topic
+ if (entry.getValue() != 0) {
Review Comment:
Sure, I understand that we may have nonexistent topics here. I was more
trying to understand whether having those zeros in the final hash was an issue.
I suppose that you're saying that it is an issue.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3624,24 +3629,28 @@ private UpdateSubscriptionMetadataResult
updateSubscriptionMetadata(
numMembers
);
- if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+ if (groupMetadataHash != group.metadataHash()) {
if (log.isDebugEnabled()) {
- log.debug("[GroupId {}] Computed new subscription metadata:
{}.",
- groupId, subscriptionMetadata);
+ log.debug("[GroupId {}] Computed new metadata hash: {}.",
+ groupId, groupMetadataHash);
}
bumpGroupEpoch = true;
- records.add(newConsumerGroupSubscriptionMetadataRecord(groupId,
subscriptionMetadata));
}
if (bumpGroupEpoch) {
groupEpoch += 1;
- records.add(newConsumerGroupEpochRecord(groupId, groupEpoch, 0));
- log.info("[GroupId {}] Bumped group epoch to {}.", groupId,
groupEpoch);
+ records.add(newConsumerGroupEpochRecord(groupId, groupEpoch,
groupMetadataHash));
+ log.info("[GroupId {}] Bumped group epoch to {} with metadata hash
{}.", groupId, groupEpoch, groupMetadataHash);
metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
}
group.setMetadataRefreshDeadline(currentTimeMs +
METADATA_REFRESH_INTERVAL_MS, groupEpoch);
+ if (group.addSubscriptionMetadataTombstoneRecord()) {
+
records.add(newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId));
+ group.setAddSubscriptionMetadataTombstoneRecord(false);
Review Comment:
We should remove this as it will be updated when the record is replayed.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java:
##########
@@ -398,6 +423,21 @@ public Map<String, TopicMetadata>
computeSubscriptionMetadata(
return Collections.unmodifiableMap(newSubscriptionMetadata);
}
+ public static long computeMetadataHash(
+ Map<String, SubscriptionCount> subscribedTopicNames,
+ Map<String, Long> topicHashCache,
+ MetadataImage metadataImage
+ ) {
+ Map<String, Long> topicHash = new
HashMap<>(subscribedTopicNames.size());
+ subscribedTopicNames.keySet().forEach(topicName ->
+ topicHash.put(
+ topicName,
+ topicHashCache.computeIfAbsent(topicName, k ->
Utils.computeTopicHash(topicName, metadataImage))
Review Comment:
We may have an issue here. If the topic does not exist, we cache zero.
However, we don't remove it when the topic is created, isn't it? Or, is it
covered by `topicsDelta.changedTopics()`? It would be great if we could add
more tests.
We should also add unit tests for this new method.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -151,6 +151,8 @@ public String toLowerCaseString() {
*/
private final TimelineHashMap<String, ResolvedRegularExpression>
resolvedRegularExpressions;
+ private final TimelineObject<Boolean>
addSubscriptionMetadataTombstoneRecord;
Review Comment:
nit: I re-iterate my previous comment here. `hasSubscriptionMetadataRecord`
seems a bit nicer.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##########
@@ -349,12 +349,19 @@ static void throwIfRegularExpressionIsInvalid(
* @return The hash of the group.
*/
static long computeGroupHash(Map<String, Long> topicHashes) {
- if (topicHashes.isEmpty()) {
+ // Sort entries by topic name
+ List<Map.Entry<String, Long>> sortedEntries = new ArrayList<>();
+ for (Map.Entry<String, Long> entry : topicHashes.entrySet()) {
+ // Filter out entries with a hash value of 0, which indicates no
topic
+ if (entry.getValue() != 0) {
Review Comment:
btw, it seems that we could move the zero check to L369 to simplify a bit
the code.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -541,6 +546,109 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName,
3)
assertRecordsEquals(expectedRecords, result.records());
}
+ @Test
+ public void testTopicHashIsRemoveFromCacheIfNoGroupSubscribesIt() {
+ String groupId = "fooup";
+ // Use a static member id as it makes the test easier.
+ String memberId = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addRacks()
+ .build();
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withMetadataImage(metadataImage)
+ .build();
+
+ assignor.prepareGroupAssignment(new GroupAssignment(
+ Map.of(memberId, new MemberAssignmentImpl(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+ )))
+ ));
+
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setServerAssignor("range")
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(List.of("foo"))
+ .setTopicPartitions(List.of()));
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(1)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.of(
+ new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(0, 1, 2, 3, 4, 5))
+ ))),
+ result.response()
+ );
+
+ ConsumerGroupMember expectedMember = new
ConsumerGroupMember.Builder(memberId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(1)
+ .setPreviousMemberEpoch(0)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+ .build();
+
+ List<CoordinatorRecord> expectedRecords = List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember),
+ GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
1, computeGroupHash(Map.of(
+ fooTopicName, computeTopicHash(fooTopicName, metadataImage)
+ ))),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+ )),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember)
+ );
+
+ assertRecordsEquals(expectedRecords, result.records());
+
assertTrue(context.groupMetadataManager.topicHashCache().containsKey(fooTopicName));
+
+ // leave group
+ result = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(-2));
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(-2)
+ .setHeartbeatIntervalMs(0),
+ result.response()
+ );
+
+ expectedRecords = List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId),
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId),
+ GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
2, 0)
+ );
+ assertRecordsEquals(expectedRecords, result.records());
+ assertTrue(context.groupMetadataManager.topicHashCache().isEmpty());
Review Comment:
nit: I would prefer to use `assertEquals(Map.of(),
context.groupMetadataManager.topicHashCache())`. If there is a failure, it
gives you more information.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -541,6 +546,109 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName,
3)
assertRecordsEquals(expectedRecords, result.records());
}
+ @Test
+ public void testTopicHashIsRemoveFromCacheIfNoGroupSubscribesIt() {
+ String groupId = "fooup";
+ // Use a static member id as it makes the test easier.
+ String memberId = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addRacks()
+ .build();
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withMetadataImage(metadataImage)
+ .build();
+
+ assignor.prepareGroupAssignment(new GroupAssignment(
+ Map.of(memberId, new MemberAssignmentImpl(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+ )))
+ ));
+
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setServerAssignor("range")
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(List.of("foo"))
+ .setTopicPartitions(List.of()));
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(1)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.of(
+ new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(0, 1, 2, 3, 4, 5))
+ ))),
+ result.response()
+ );
+
+ ConsumerGroupMember expectedMember = new
ConsumerGroupMember.Builder(memberId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(1)
+ .setPreviousMemberEpoch(0)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+ .build();
+
+ List<CoordinatorRecord> expectedRecords = List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember),
+ GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
1, computeGroupHash(Map.of(
+ fooTopicName, computeTopicHash(fooTopicName, metadataImage)
+ ))),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+ )),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember)
+ );
+
+ assertRecordsEquals(expectedRecords, result.records());
+
assertTrue(context.groupMetadataManager.topicHashCache().containsKey(fooTopicName));
+
+ // leave group
Review Comment:
nit: All the comments in this file starts with a capital letter and ends
with a dot. Let's try to state consistent.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -20635,12 +21067,10 @@ barTopicName, new TopicMetadata(barTopicId,
barTopicName, 3)
),
// Remove regex.
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
"foo*")),
- // Updated subscription metadata.
-
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
Map.of(
- barTopicName, new TopicMetadata(barTopicId, barTopicName,
3)
- ))),
// Bumped epoch.
-
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11,
0))
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11,
computeGroupHash(Map.of(
+ barTopicName, barTopicHash
+ ))))
),
result.records()
Review Comment:
Not related to this line but it would be great if we could add more tests
for the cache cleaning (e.g. when a topic is updated or deleted).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]