dajac commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2560381093
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -1087,12 +1100,16 @@ void addPartitionEpochs(
partitionsOrNull = new TimelineHashMap<>(snapshotRegistry,
assignedPartitions.size());
}
for (Integer partitionId : assignedPartitions) {
- Integer prevValue = partitionsOrNull.put(partitionId,
epoch);
+ Integer prevValue = partitionsOrNull.get(partitionId);
if (prevValue != null) {
- throw new IllegalStateException(
- String.format("Cannot set the epoch of %s-%s to %d
because the partition is " +
- "still owned at epoch %d", topicId,
partitionId, epoch, prevValue));
+ if (prevValue > epoch) {
Review Comment:
nit: Let's put the happy path first here too and we can also combine the two
if conditions into one.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -942,11 +942,13 @@ private void removeTaskProcessIds(
currentTasksProcessId.compute(subtopologyId, (__,
partitionsOrNull) -> {
if (partitionsOrNull != null) {
assignedPartitions.keySet().forEach(partitionId -> {
- String prevValue =
partitionsOrNull.remove(partitionId);
+ String prevValue = partitionsOrNull.get(partitionId);
if (!Objects.equals(prevValue, expectedProcessId)) {
Review Comment:
Same comments for this file.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -1048,11 +1058,13 @@ void removePartitionEpochs(
currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> {
if (partitionsOrNull != null) {
assignedPartitions.forEach(partitionId -> {
- Integer prevValue =
partitionsOrNull.remove(partitionId);
+ Integer prevValue = partitionsOrNull.get(partitionId);
if (prevValue != expectedEpoch) {
Review Comment:
nit: Let's inverse the condition so we have the happy path first.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -24387,6 +24387,263 @@ public void
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
assertDoesNotThrow(() -> context.replay(record));
}
+ @Test
+ public void testConsumerGroupAssignmentResolvesWithCompaction() {
+ String groupId = "fooup";
+ String memberA = "memberA";
+ String memberB = "memberB";
+
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "foo";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(topicId, topicName, 2)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+ long topicHash = computeTopicHash(topicName, metadataImage);
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMetadataHash(topicHash))
+ .build();
+
+ ConsumerGroup group =
context.groupMetadataManager.consumerGroup(groupId);
+
+ // Assign partition 0 to member A
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+ .build()));
+
+ // Assign partition 0 to member B. This is allowed even though
partition 0 is already owned by member A.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(12)
+ .setPreviousMemberEpoch(11)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+ .build()));
+
+ // Now assign partition 1 to member A.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(13)
+ .setPreviousMemberEpoch(12)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+ .build()));
+
+ // Verify partition epochs.
+ assertEquals(group.currentPartitionEpoch(topicId, 0), 12);
+ assertEquals(group.currentPartitionEpoch(topicId, 1), 13);
+ }
+
+ @Test
+ public void testConsumerGroupUnassignmentResolvesWithCompaction() {
+ String groupId = "fooup";
+ String memberA = "memberA";
+ String memberB = "memberB";
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 3)
+ .addTopic(barTopicId, barTopicName, 2)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+ long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+ long barTopicHash = computeTopicHash(barTopicName, metadataImage);
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMetadataHash(computeGroupHash(Map.of(fooTopicName,
fooTopicHash, barTopicName, barTopicHash))))
+ .build();
+
+ ConsumerGroup group =
context.groupMetadataManager.consumerGroup(groupId);
+
+ // Assign partition foo-1 to member A
Review Comment:
nit: Add a `.` at the end of the sentence to be consistent.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -149,10 +157,13 @@ public String toLowerCaseString() {
private final TimelineObject<Boolean> hasSubscriptionMetadataRecord;
public ConsumerGroup(
+ LogContext logContext,
SnapshotRegistry snapshotRegistry,
String groupId
) {
super(snapshotRegistry, groupId);
+ // Add the GroupId to the log prefix for context.
+ this.log = new LogContext(String.format("%s [Group %s]: ",
logContext.logPrefix(), groupId)).logger(ConsumerGroup.class);
Review Comment:
I just recalled about https://github.com/apache/kafka/pull/19138 so I would
prefer to inline adding the prefix given that we have only a few logs in this
class. It is probably OK like this but I prefer to be on the safe side.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -24387,6 +24387,263 @@ public void
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
assertDoesNotThrow(() -> context.replay(record));
}
+ @Test
+ public void testConsumerGroupAssignmentResolvesWithCompaction() {
+ String groupId = "fooup";
+ String memberA = "memberA";
+ String memberB = "memberB";
+
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "foo";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(topicId, topicName, 2)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+ long topicHash = computeTopicHash(topicName, metadataImage);
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMetadataHash(topicHash))
+ .build();
+
+ ConsumerGroup group =
context.groupMetadataManager.consumerGroup(groupId);
+
+ // Assign partition 0 to member A
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+ .build()));
+
+ // Assign partition 0 to member B. This is allowed even though
partition 0 is already owned by member A.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(12)
+ .setPreviousMemberEpoch(11)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+ .build()));
+
+ // Now assign partition 1 to member A.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(13)
+ .setPreviousMemberEpoch(12)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+ .build()));
+
+ // Verify partition epochs.
+ assertEquals(group.currentPartitionEpoch(topicId, 0), 12);
+ assertEquals(group.currentPartitionEpoch(topicId, 1), 13);
+ }
+
+ @Test
+ public void testConsumerGroupUnassignmentResolvesWithCompaction() {
Review Comment:
I tried this test _without the fix_ and it passes!?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -24387,6 +24387,263 @@ public void
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
assertDoesNotThrow(() -> context.replay(record));
}
+ @Test
+ public void testConsumerGroupAssignmentResolvesWithCompaction() {
+ String groupId = "fooup";
+ String memberA = "memberA";
+ String memberB = "memberB";
+
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "foo";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(topicId, topicName, 2)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+ long topicHash = computeTopicHash(topicName, metadataImage);
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMetadataHash(topicHash))
+ .build();
+
+ ConsumerGroup group =
context.groupMetadataManager.consumerGroup(groupId);
+
+ // Assign partition 0 to member A
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+ .build()));
+
+ // Assign partition 0 to member B. This is allowed even though
partition 0 is already owned by member A.
Review Comment:
Let's explain why this is possible. We could perhaps add a comment with all
the records that would be written and how compaction could remove some of
those? My concern is that it is not an obvious issue to grasp for non-experts.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -24387,6 +24387,263 @@ public void
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
assertDoesNotThrow(() -> context.replay(record));
}
+ @Test
+ public void testConsumerGroupAssignmentResolvesWithCompaction() {
+ String groupId = "fooup";
+ String memberA = "memberA";
+ String memberB = "memberB";
+
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "foo";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(topicId, topicName, 2)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+ long topicHash = computeTopicHash(topicName, metadataImage);
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMetadataHash(topicHash))
+ .build();
+
+ ConsumerGroup group =
context.groupMetadataManager.consumerGroup(groupId);
+
+ // Assign partition 0 to member A
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+ .build()));
+
+ // Assign partition 0 to member B. This is allowed even though
partition 0 is already owned by member A.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(12)
+ .setPreviousMemberEpoch(11)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+ .build()));
+
+ // Now assign partition 1 to member A.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(13)
+ .setPreviousMemberEpoch(12)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+ .build()));
+
+ // Verify partition epochs.
+ assertEquals(group.currentPartitionEpoch(topicId, 0), 12);
+ assertEquals(group.currentPartitionEpoch(topicId, 1), 13);
+ }
+
+ @Test
+ public void testConsumerGroupUnassignmentResolvesWithCompaction() {
+ String groupId = "fooup";
+ String memberA = "memberA";
+ String memberB = "memberB";
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 3)
+ .addTopic(barTopicId, barTopicName, 2)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+ long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+ long barTopicHash = computeTopicHash(barTopicName, metadataImage);
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMetadataHash(computeGroupHash(Map.of(fooTopicName,
fooTopicHash, barTopicName, barTopicHash))))
+ .build();
+
+ ConsumerGroup group =
context.groupMetadataManager.consumerGroup(groupId);
+
+ // Assign partition foo-1 to member A
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
0, 1)))
+ .build()));
+
Review Comment:
nit: We can remove an empty line.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -24387,6 +24387,263 @@ public void
testShareGroupRecordsNoExceptionOnReplay(CoordinatorRecord record) {
assertDoesNotThrow(() -> context.replay(record));
}
+ @Test
+ public void testConsumerGroupAssignmentResolvesWithCompaction() {
+ String groupId = "fooup";
+ String memberA = "memberA";
+ String memberB = "memberB";
+
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "foo";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(topicId, topicName, 2)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+ long topicHash = computeTopicHash(topicName, metadataImage);
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMetadataHash(topicHash))
+ .build();
+
+ ConsumerGroup group =
context.groupMetadataManager.consumerGroup(groupId);
+
+ // Assign partition 0 to member A
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+ .build()));
+
+ // Assign partition 0 to member B. This is allowed even though
partition 0 is already owned by member A.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberB)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(12)
+ .setPreviousMemberEpoch(11)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 0)))
+ .build()));
+
+ // Now assign partition 1 to member A.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
new ConsumerGroupMember.Builder(memberA)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(13)
+ .setPreviousMemberEpoch(12)
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(topicId, 1)))
+ .build()));
+
+ // Verify partition epochs.
+ assertEquals(group.currentPartitionEpoch(topicId, 0), 12);
+ assertEquals(group.currentPartitionEpoch(topicId, 1), 13);
Review Comment:
nit: The expected value must be first and the actual value second. This
applies to all the assertions added in this PR.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -942,11 +942,13 @@ private void removeTaskProcessIds(
currentTasksProcessId.compute(subtopologyId, (__,
partitionsOrNull) -> {
if (partitionsOrNull != null) {
assignedPartitions.keySet().forEach(partitionId -> {
- String prevValue =
partitionsOrNull.remove(partitionId);
+ String prevValue = partitionsOrNull.get(partitionId);
if (!Objects.equals(prevValue, expectedProcessId)) {
- throw new IllegalStateException(
+ log.debug(
String.format("Cannot remove the process ID %s
from task %s_%s because the partition is " +
Review Comment:
We need to prefix those too.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]