dajac commented on code in PR #15528: URL: https://github.com/apache/kafka/pull/15528#discussion_r1524389411
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -579,7 +579,7 @@ public List<DescribeGroupsResponseData.DescribedGroup> describeGroups( } /** - * Gets or maybe creates a consumer group. + * Gets or maybe creates a consumer group without updating the groups map. Review Comment: nit: Could we extend the comment to mention that the group will be materialized during the replay? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -601,6 +601,33 @@ ConsumerGroup getOrMaybeCreateConsumerGroup( throw new GroupIdNotFoundException(String.format("Consumer group %s not found.", groupId)); } + if (group == null) { + ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics); + return consumerGroup; Review Comment: nit: We could remove `consumerGroup`. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -9421,33 +9421,44 @@ public void testClassicGroupMaybeDelete() { @Test public void testConsumerGroupDelete() { + String groupId = "group-id"; GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .build(); - context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group-id", true); + + // Create an empty group. + context.replay(RecordHelpers.newGroupEpochRecord(groupId, 10)); + context.replay(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, Collections.emptyMap())); + context.replay(RecordHelpers.newTargetAssignmentEpochRecord(groupId, 10)); Review Comment: You could actually replace this by using `withConsumerGroup` when constructing the context. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -9421,33 +9421,44 @@ public void testClassicGroupMaybeDelete() { @Test public void testConsumerGroupDelete() { + String groupId = "group-id"; GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .build(); - context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group-id", true); + + // Create an empty group. + context.replay(RecordHelpers.newGroupEpochRecord(groupId, 10)); + context.replay(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, Collections.emptyMap())); + context.replay(RecordHelpers.newTargetAssignmentEpochRecord(groupId, 10)); List<Record> expectedRecords = Arrays.asList( - RecordHelpers.newTargetAssignmentEpochTombstoneRecord("group-id"), - RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord("group-id"), - RecordHelpers.newGroupEpochTombstoneRecord("group-id") + RecordHelpers.newTargetAssignmentEpochTombstoneRecord(groupId), + RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(groupId), + RecordHelpers.newGroupEpochTombstoneRecord(groupId) ); List<Record> records = new ArrayList<>(); - context.groupMetadataManager.deleteGroup("group-id", records); + context.groupMetadataManager.deleteGroup(groupId, records); assertEquals(expectedRecords, records); } @Test public void testConsumerGroupMaybeDelete() { + String groupId = "group-id"; GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .build(); - ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group-id", true); + + // Create an empty group. + context.replay(RecordHelpers.newGroupEpochRecord(groupId, 10)); + context.replay(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, Collections.emptyMap())); + context.replay(RecordHelpers.newTargetAssignmentEpochRecord(groupId, 10)); + ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false); Review Comment: nit: I wonder if we could remove this and use `replay` to update the group later one. It will be better in my opinion. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -601,6 +601,33 @@ ConsumerGroup getOrMaybeCreateConsumerGroup( throw new GroupIdNotFoundException(String.format("Consumer group %s not found.", groupId)); } + if (group == null) { + ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics); + return consumerGroup; + } else { + if (group.type() == CONSUMER) { + return (ConsumerGroup) group; + } else { + // We don't support upgrading/downgrading between protocols at the moment so + // we throw an exception if a group exists with the wrong type. + throw new GroupIdNotFoundException(String.format("Group %s is not a consumer group.", groupId)); + } + } + } + + /** + * Gets or maybe creates a consumer group. Updates the groups map if a new group is created. + * + * @param groupId The group id. + * + * @return A ConsumerGroup. + * @throws GroupIdNotFoundException if the group does not exist and createIfNotExists is false or Review Comment: On the replay path, I think that it would actually better to thrown an `IllegalStateException`. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -601,6 +601,33 @@ ConsumerGroup getOrMaybeCreateConsumerGroup( throw new GroupIdNotFoundException(String.format("Consumer group %s not found.", groupId)); } + if (group == null) { + ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics); + return consumerGroup; + } else { + if (group.type() == CONSUMER) { + return (ConsumerGroup) group; + } else { + // We don't support upgrading/downgrading between protocols at the moment so + // we throw an exception if a group exists with the wrong type. + throw new GroupIdNotFoundException(String.format("Group %s is not a consumer group.", groupId)); + } + } + } + + /** + * Gets or maybe creates a consumer group. Updates the groups map if a new group is created. + * + * @param groupId The group id. + * + * @return A ConsumerGroup. + * @throws GroupIdNotFoundException if the group does not exist and createIfNotExists is false or + * if the group is not a consumer group. + */ + private ConsumerGroup maybeCreateAndUpdateConsumerGroup( Review Comment: The naming is a bit confusing here. I wonder if we could use something like `getOrMaybeCreatePersistedConsumerGroup` to denote that we are persisting the group in this one. We may also be able to keep the `createIfNotExists`. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1511,7 +1538,12 @@ public void replay( String groupId = key.groupId(); String memberId = key.memberId(); - ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, value != null); + ConsumerGroup consumerGroup; + if (value != null) { + consumerGroup = maybeCreateAndUpdateConsumerGroup(groupId); + } else { + consumerGroup = getOrMaybeCreateConsumerGroup(groupId, false); Review Comment: It is a bit weird to keep using the old method in replay methods. If we do my previous suggestion, we may be able to use the new one everywhere. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org