dajac commented on code in PR #15528:
URL: https://github.com/apache/kafka/pull/15528#discussion_r1524389411


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -579,7 +579,7 @@ public List<DescribeGroupsResponseData.DescribedGroup> 
describeGroups(
     }
 
     /**
-     * Gets or maybe creates a consumer group.
+     * Gets or maybe creates a consumer group without updating the groups map.

Review Comment:
   nit: Could we extend the comment to mention that the group will be 
materialized during the replay?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -601,6 +601,33 @@ ConsumerGroup getOrMaybeCreateConsumerGroup(
             throw new GroupIdNotFoundException(String.format("Consumer group 
%s not found.", groupId));
         }
 
+        if (group == null) {
+            ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
+            return consumerGroup;

Review Comment:
   nit: We could remove `consumerGroup`.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -9421,33 +9421,44 @@ public void testClassicGroupMaybeDelete() {
 
     @Test
     public void testConsumerGroupDelete() {
+        String groupId = "group-id";
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
             .build();
-        context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group-id", 
true);
+
+        // Create an empty group.
+        context.replay(RecordHelpers.newGroupEpochRecord(groupId, 10));
+        
context.replay(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, 
Collections.emptyMap()));
+        context.replay(RecordHelpers.newTargetAssignmentEpochRecord(groupId, 
10));

Review Comment:
   You could actually replace this by using `withConsumerGroup` when 
constructing the context.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -9421,33 +9421,44 @@ public void testClassicGroupMaybeDelete() {
 
     @Test
     public void testConsumerGroupDelete() {
+        String groupId = "group-id";
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
             .build();
-        context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group-id", 
true);
+
+        // Create an empty group.
+        context.replay(RecordHelpers.newGroupEpochRecord(groupId, 10));
+        
context.replay(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, 
Collections.emptyMap()));
+        context.replay(RecordHelpers.newTargetAssignmentEpochRecord(groupId, 
10));
 
         List<Record> expectedRecords = Arrays.asList(
-            RecordHelpers.newTargetAssignmentEpochTombstoneRecord("group-id"),
-            
RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord("group-id"),
-            RecordHelpers.newGroupEpochTombstoneRecord("group-id")
+            RecordHelpers.newTargetAssignmentEpochTombstoneRecord(groupId),
+            RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(groupId),
+            RecordHelpers.newGroupEpochTombstoneRecord(groupId)
         );
         List<Record> records = new ArrayList<>();
-        context.groupMetadataManager.deleteGroup("group-id", records);
+        context.groupMetadataManager.deleteGroup(groupId, records);
         assertEquals(expectedRecords, records);
     }
 
     @Test
     public void testConsumerGroupMaybeDelete() {
+        String groupId = "group-id";
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
             .build();
-        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group-id", true);
+
+        // Create an empty group.
+        context.replay(RecordHelpers.newGroupEpochRecord(groupId, 10));
+        
context.replay(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, 
Collections.emptyMap()));
+        context.replay(RecordHelpers.newTargetAssignmentEpochRecord(groupId, 
10));
+        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false);

Review Comment:
   nit: I wonder if we could remove this and use `replay` to update the group 
later one. It will be better in my opinion.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -601,6 +601,33 @@ ConsumerGroup getOrMaybeCreateConsumerGroup(
             throw new GroupIdNotFoundException(String.format("Consumer group 
%s not found.", groupId));
         }
 
+        if (group == null) {
+            ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
+            return consumerGroup;
+        } else {
+            if (group.type() == CONSUMER) {
+                return (ConsumerGroup) group;
+            } else {
+                // We don't support upgrading/downgrading between protocols at 
the moment so
+                // we throw an exception if a group exists with the wrong type.
+                throw new GroupIdNotFoundException(String.format("Group %s is 
not a consumer group.", groupId));
+            }
+        }
+    }
+
+    /**
+     * Gets or maybe creates a consumer group. Updates the groups map if a new 
group is created.
+     *
+     * @param groupId           The group id.
+     *
+     * @return A ConsumerGroup.
+     * @throws GroupIdNotFoundException if the group does not exist and 
createIfNotExists is false or

Review Comment:
   On the replay path, I think that it would actually better to thrown an 
`IllegalStateException`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -601,6 +601,33 @@ ConsumerGroup getOrMaybeCreateConsumerGroup(
             throw new GroupIdNotFoundException(String.format("Consumer group 
%s not found.", groupId));
         }
 
+        if (group == null) {
+            ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
+            return consumerGroup;
+        } else {
+            if (group.type() == CONSUMER) {
+                return (ConsumerGroup) group;
+            } else {
+                // We don't support upgrading/downgrading between protocols at 
the moment so
+                // we throw an exception if a group exists with the wrong type.
+                throw new GroupIdNotFoundException(String.format("Group %s is 
not a consumer group.", groupId));
+            }
+        }
+    }
+
+    /**
+     * Gets or maybe creates a consumer group. Updates the groups map if a new 
group is created.
+     *
+     * @param groupId           The group id.
+     *
+     * @return A ConsumerGroup.
+     * @throws GroupIdNotFoundException if the group does not exist and 
createIfNotExists is false or
+     *                                  if the group is not a consumer group.
+     */
+    private ConsumerGroup maybeCreateAndUpdateConsumerGroup(

Review Comment:
   The naming is a bit confusing here. I wonder if we could use something like 
`getOrMaybeCreatePersistedConsumerGroup` to denote that we are persisting the 
group in this one. We may also be able to keep the `createIfNotExists`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1511,7 +1538,12 @@ public void replay(
         String groupId = key.groupId();
         String memberId = key.memberId();
 
-        ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, 
value != null);
+        ConsumerGroup consumerGroup;
+        if (value != null) {
+            consumerGroup = maybeCreateAndUpdateConsumerGroup(groupId);
+        } else {
+            consumerGroup = getOrMaybeCreateConsumerGroup(groupId, false);

Review Comment:
   It is a bit weird to keep using the old method in replay methods. If we do 
my previous suggestion, we may be able to use the new one everywhere. What do 
you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to