dajac commented on code in PR #15442:
URL: https://github.com/apache/kafka/pull/15442#discussion_r1507296782


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3500,6 +3503,59 @@ public void maybeDeleteGroup(String groupId, 
List<Record> records) {
         }
     }
 
+    /**
+     * A group can be upgraded offline if it's a classic group and empty.
+     *
+     * @param groupId The group to be validated.
+     * @return true if the offline upgrade is valid.
+     */
+    private boolean validateOfflineUpgrade(String groupId) {
+        Group group = groups.get(groupId);
+
+        if (group == null || group.type() == CONSUMER) {
+            return false;
+        }
+
+        ClassicGroup classicGroup = (ClassicGroup) group;
+        if (!classicGroup.isEmpty()) {
+            return false;
+        } else {
+            return true;
+        }
+    }
+
+    /**
+     * Upgrade the empty classic group to a consumer group if it's valid.
+     *
+     * @param groupId       The group id to be updated.
+     * @param records       The list of records to delete the classic group 
and create the consumer group.
+     * @param isSimpleGroup The boolean indicating whether the group to be 
updated is a simple group.
+     */
+    public void maybeUpgradeEmptyGroup(String groupId, List<Record> records, 
boolean isSimpleGroup) {

Review Comment:
   In this particular case where the group is empty and a new member joins, my 
understanding is that we only need to delete the previous group. The new group 
will be automatically created with the new member. Am I missing something?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3500,6 +3503,59 @@ public void maybeDeleteGroup(String groupId, 
List<Record> records) {
         }
     }
 
+    /**
+     * A group can be upgraded offline if it's a classic group and empty.
+     *
+     * @param groupId The group to be validated.
+     * @return true if the offline upgrade is valid.
+     */
+    private boolean validateOfflineUpgrade(String groupId) {
+        Group group = groups.get(groupId);
+
+        if (group == null || group.type() == CONSUMER) {
+            return false;
+        }
+
+        ClassicGroup classicGroup = (ClassicGroup) group;
+        if (!classicGroup.isEmpty()) {
+            return false;
+        } else {
+            return true;
+        }
+    }
+
+    /**
+     * Upgrade the empty classic group to a consumer group if it's valid.
+     *
+     * @param groupId       The group id to be updated.
+     * @param records       The list of records to delete the classic group 
and create the consumer group.
+     * @param isSimpleGroup The boolean indicating whether the group to be 
updated is a simple group.
+     */
+    public void maybeUpgradeEmptyGroup(String groupId, List<Record> records, 
boolean isSimpleGroup) {
+        if (validateOfflineUpgrade(groupId)) {
+            final long currentTimeMs = time.milliseconds();
+            ClassicGroup classicGroup = getOrMaybeCreateClassicGroup(groupId, 
false);
+            int groupEpoch = classicGroup.generationId();
+
+            // Replace the classic group with a new consumer group.
+            ConsumerGroup consumerGroup = 
getOrMaybeCreateConsumerGroup(groupId, true);
+            // We don't create the tombstone because the replay will remove 
the newly created consumer group.

Review Comment:
   I think that we must write a tombstone for the old group. As you said, the 
map will be updated based on the records with the new group. However, we also 
need to compact the old record for the group and the only way to do it is to 
write a tombstone. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to