dajac commented on code in PR #14408:
URL: https://github.com/apache/kafka/pull/14408#discussion_r1332632377


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -523,9 +525,46 @@ public 
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
             return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
         }
 
-        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-            "This API is not implemented yet."
-        ));
+        final Map<Integer, List<String>> groupsByPartition = new HashMap<>();
+        groupIds.forEach(groupId -> {
+            final int partition = partitionFor(groupId);

Review Comment:
   nit: I wonder if we should use `topicPartitionFor` here. With this, we could 
directly have the TopicPartition as the key in the Map and we would not need to 
create `new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partition)` later 
on. What do you think?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -523,9 +525,46 @@ public 
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
             return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
         }
 
-        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-            "This API is not implemented yet."
-        ));
+        final Map<Integer, List<String>> groupsByPartition = new HashMap<>();
+        groupIds.forEach(groupId -> {
+            final int partition = partitionFor(groupId);
+            final List<String> groupList = 
groupsByPartition.getOrDefault(partition, new ArrayList<>());
+            groupList.add(groupId);
+            groupsByPartition.put(partition, groupList);

Review Comment:
   nit: You could do the following to avoid having to put the list again into 
the map.
   
   ```
   groupsByPartition
       .computeIdAbsent(partition, __ -> new ArrayList())
       .put(groupId);
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -262,6 +267,45 @@ public HeartbeatResponseData genericGroupHeartbeat(
         );
     }
 
+    /**
+     * Handles a GroupDelete request.
+     *
+     * @param context The request context.
+     * @param groupIds The groupIds of the groups to be deleted
+     * @return A Result containing the 
DeleteGroupsResponseData.DeletableGroupResultCollection response and
+     *         a list of records to update the state machine.
+     */
+    public 
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> deleteGroups(
+        RequestContext context,
+        List<String> groupIds
+    ) throws ApiException {
+        final DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        final List<Record> records = new ArrayList<>();
+
+        groupIds.forEach(groupId -> {
+            try {
+                groupMetadataManager.validateGroupDelete(groupId);
+

Review Comment:
   nit: We can remove this empty line.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -705,9 +744,39 @@ public CompletableFuture<OffsetDeleteResponseData> 
deleteOffsets(
             return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
         }
 
-        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-            "This API is not implemented yet."
-        ));
+        if (!isGroupIdNotEmpty(request.groupId())) {
+            return CompletableFuture.completedFuture(new 
OffsetDeleteResponseData()
+                .setErrorCode(Errors.INVALID_GROUP_ID.code())
+            );
+        }
+
+        return runtime.scheduleWriteOperation(
+            "delete-offset",
+            topicPartitionFor(request.groupId()),
+            coordinator -> coordinator.deleteOffsets(context, request)
+        ).exceptionally(exception -> {

Review Comment:
   It is interesting to point out that, in the current implementation, all 
these errors are swallowed. This is definitely not ideal because it tells to 
the user that the deletion is successful even if was not. Should we apply the 
same error handling to the deleteGroups?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3071,6 +3072,39 @@ private void removeCurrentMemberFromGenericGroup(
         group.remove(member.memberId());
     }
 
+    /**
+     * Handles a GroupDelete request.
+     *
+     * @param context The request context.
+     * @param groupId The group id of the group to be deleted.
+     * @return A Result containing the 
DeleteGroupsResponseData.DeletableGroupResult response and
+     *         a list of records to update the state machine.
+     */
+    public CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResult, 
Record> groupDelete(
+            RequestContext context,
+            String groupId

Review Comment:
   nit: The indentation is incorrect.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -333,6 +348,80 @@ public CoordinatorResult<OffsetCommitResponseData, Record> 
commitOffset(
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handles an OffsetDelete request.
+     *
+     * @param context The request context.
+     * @param request The OffsetDelete request.
+     *
+     * @return A Result containing the OffsetDeleteResponseData response and
+     *         a list of records to update the state machine.
+     */
+    public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+        RequestContext context,
+        OffsetDeleteRequestData request
+    ) throws ApiException {
+        final List<Record> records = new ArrayList<>();
+        final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =
+            new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData();
+        try {

Review Comment:
   I think that the try..catch is not needed here because we handle the 
exceptions in the group coordinator service, no?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -592,6 +607,45 @@ public void validateOffsetFetch(
         validateMemberEpoch(memberEpoch, member.memberEpoch());
     }
 
+    /**
+     * Validates the OffsetDelete request.
+     */
+    @Override
+    public void validateOffsetDelete() throws GroupIdNotFoundException {
+        if (state() == ConsumerGroupState.DEAD) {
+            throw new GroupIdNotFoundException(String.format("Group %s is in 
dead state.", groupId));
+        }

Review Comment:
   I think that a consumer group will actually never transition to Dead. We 
could actually remove this state.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -592,6 +607,45 @@ public void validateOffsetFetch(
         validateMemberEpoch(memberEpoch, member.memberEpoch());
     }
 
+    /**
+     * Validates the OffsetDelete request.
+     */
+    @Override
+    public void validateOffsetDelete() throws GroupIdNotFoundException {
+        if (state() == ConsumerGroupState.DEAD) {
+            throw new GroupIdNotFoundException(String.format("Group %s is in 
dead state.", groupId));
+        }
+    }
+
+    /**
+     * Validates the GroupDelete request.
+     */
+    @Override
+    public void validateGroupDelete() throws ApiException {
+        if (state() == ConsumerGroupState.DEAD) {
+            throw new GroupIdNotFoundException(String.format("Group %s is in 
dead state.", groupId));
+        } else if (state() == ConsumerGroupState.STABLE
+            || state() == ConsumerGroupState.ASSIGNING
+            || state() == ConsumerGroupState.RECONCILING) {
+            throw Errors.NON_EMPTY_GROUP.exception();
+        }
+
+        // We avoid writing the tombstone when the generationId is 0, since 
this group is only using
+        // Kafka for offset storage.
+        if (groupEpoch() <= 0) {
+            throw Errors.UNKNOWN_SERVER_ERROR.exception();
+        }
+    }
+
+    /**
+     * Creates a GroupMetadata tombstone.
+     *
+     * @return The record.
+     */
+    public Record createMetadataTombstoneRecord() {
+        return RecordHelpers.newGroupEpochTombstoneRecord(groupId());

Review Comment:
   As mentioned earlier, we have to generate other tombstones.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -592,6 +607,45 @@ public void validateOffsetFetch(
         validateMemberEpoch(memberEpoch, member.memberEpoch());
     }
 
+    /**
+     * Validates the OffsetDelete request.
+     */
+    @Override
+    public void validateOffsetDelete() throws GroupIdNotFoundException {
+        if (state() == ConsumerGroupState.DEAD) {
+            throw new GroupIdNotFoundException(String.format("Group %s is in 
dead state.", groupId));
+        }
+    }
+
+    /**
+     * Validates the GroupDelete request.
+     */
+    @Override
+    public void validateGroupDelete() throws ApiException {
+        if (state() == ConsumerGroupState.DEAD) {
+            throw new GroupIdNotFoundException(String.format("Group %s is in 
dead state.", groupId));
+        } else if (state() == ConsumerGroupState.STABLE
+            || state() == ConsumerGroupState.ASSIGNING
+            || state() == ConsumerGroupState.RECONCILING) {
+            throw Errors.NON_EMPTY_GROUP.exception();
+        }
+
+        // We avoid writing the tombstone when the generationId is 0, since 
this group is only using
+        // Kafka for offset storage.
+        if (groupEpoch() <= 0) {
+            throw Errors.UNKNOWN_SERVER_ERROR.exception();
+        }

Review Comment:
   This does not seem correct to me because this exception does not apply to 
consumer groups.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -523,9 +525,46 @@ public 
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
             return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
         }
 
-        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-            "This API is not implemented yet."
-        ));
+        final Map<Integer, List<String>> groupsByPartition = new HashMap<>();
+        groupIds.forEach(groupId -> {
+            final int partition = partitionFor(groupId);
+            final List<String> groupList = 
groupsByPartition.getOrDefault(partition, new ArrayList<>());
+            groupList.add(groupId);
+            groupsByPartition.put(partition, groupList);
+        });
+
+        final 
List<CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>>
 futures = new ArrayList<>();
+        groupsByPartition.forEach((partition, groupList) -> {
+            
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
+                runtime.scheduleWriteOperation(
+                    "delete-group",
+                    new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
partition),
+                    coordinator -> coordinator.deleteGroups(context, groupList)
+                ).exceptionally(exception -> {
+                    DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection =
+                        new 
DeleteGroupsResponseData.DeletableGroupResultCollection();
+                    groupIds.forEach(groupId -> {
+                        resultCollection.add(
+                            new DeleteGroupsResponseData.DeletableGroupResult()
+                                .setGroupId(groupId)
+                                
.setErrorCode(Errors.forException(exception).code())
+                        );
+                    });
+                    return resultCollection;
+                });
+
+            futures.add(future);
+        });
+
+        final CompletableFuture<Void> allFutures = 
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
+        return allFutures.thenApply(v -> {
+            final DeleteGroupsResponseData.DeletableGroupResultCollection res 
= new DeleteGroupsResponseData.DeletableGroupResultCollection();
+            for 
(CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future : futures) {
+                DeleteGroupsResponseData.DeletableGroupResultCollection result 
= future.join();
+                res.addAll(result);

Review Comment:
   nit: `res.addAll(future.join())` to reduce the code?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -523,9 +525,46 @@ public 
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
             return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
         }
 
-        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-            "This API is not implemented yet."
-        ));
+        final Map<Integer, List<String>> groupsByPartition = new HashMap<>();
+        groupIds.forEach(groupId -> {
+            final int partition = partitionFor(groupId);
+            final List<String> groupList = 
groupsByPartition.getOrDefault(partition, new ArrayList<>());
+            groupList.add(groupId);
+            groupsByPartition.put(partition, groupList);
+        });
+
+        final 
List<CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>>
 futures = new ArrayList<>();

Review Comment:
   nit: We could specify the size of the array when we allocate it.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -262,6 +267,45 @@ public HeartbeatResponseData genericGroupHeartbeat(
         );
     }
 
+    /**
+     * Handles a GroupDelete request.
+     *
+     * @param context The request context.
+     * @param groupIds The groupIds of the groups to be deleted
+     * @return A Result containing the 
DeleteGroupsResponseData.DeletableGroupResultCollection response and
+     *         a list of records to update the state machine.
+     */
+    public 
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> deleteGroups(
+        RequestContext context,
+        List<String> groupIds
+    ) throws ApiException {
+        final DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        final List<Record> records = new ArrayList<>();
+
+        groupIds.forEach(groupId -> {
+            try {
+                groupMetadataManager.validateGroupDelete(groupId);
+
+                
offsetMetadataManager.populateRecordListToDeleteAllOffsets(context, groupId, 
records);
+                final 
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResult, Record> 
deleteGroupCoordinatorResult =
+                    groupMetadataManager.groupDelete(context, groupId);

Review Comment:
   The CoordinatorResult is a bit annoying here. How about passing `records` to 
the method as well? Then we could construct the response here. We could also 
remove the context if it is not needed.
   
   How about naming it `deleteGroup` to be consistent with `deleteOffsets`?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -262,6 +267,45 @@ public HeartbeatResponseData genericGroupHeartbeat(
         );
     }
 
+    /**
+     * Handles a GroupDelete request.
+     *
+     * @param context The request context.
+     * @param groupIds The groupIds of the groups to be deleted
+     * @return A Result containing the 
DeleteGroupsResponseData.DeletableGroupResultCollection response and
+     *         a list of records to update the state machine.
+     */
+    public 
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> deleteGroups(
+        RequestContext context,
+        List<String> groupIds
+    ) throws ApiException {
+        final DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        final List<Record> records = new ArrayList<>();
+
+        groupIds.forEach(groupId -> {
+            try {
+                groupMetadataManager.validateGroupDelete(groupId);
+
+                
offsetMetadataManager.populateRecordListToDeleteAllOffsets(context, groupId, 
records);
+                final 
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResult, Record> 
deleteGroupCoordinatorResult =
+                    groupMetadataManager.groupDelete(context, groupId);
+                records.addAll(deleteGroupCoordinatorResult.records());
+
+                resultCollection.add(deleteGroupCoordinatorResult.response());
+            } catch (ApiException exception) {

Review Comment:
   I have a question regarding the error handling. Could `groupDelete` thrown 
an exception? If it can, we would need to handle records a bit differently 
because we don't want to delete the offsets if the group cannot be delete. The 
operation should be atomic.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -262,6 +267,45 @@ public HeartbeatResponseData genericGroupHeartbeat(
         );
     }
 
+    /**
+     * Handles a GroupDelete request.
+     *
+     * @param context The request context.
+     * @param groupIds The groupIds of the groups to be deleted
+     * @return A Result containing the 
DeleteGroupsResponseData.DeletableGroupResultCollection response and
+     *         a list of records to update the state machine.
+     */
+    public 
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> deleteGroups(
+        RequestContext context,
+        List<String> groupIds
+    ) throws ApiException {
+        final DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        final List<Record> records = new ArrayList<>();
+
+        groupIds.forEach(groupId -> {
+            try {
+                groupMetadataManager.validateGroupDelete(groupId);
+
+                
offsetMetadataManager.populateRecordListToDeleteAllOffsets(context, groupId, 
records);

Review Comment:
   nit: `deleteAllOffsets`? I also wonder if the context is required. If not, 
we could remove it.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -262,6 +267,45 @@ public HeartbeatResponseData genericGroupHeartbeat(
         );
     }
 
+    /**
+     * Handles a GroupDelete request.
+     *
+     * @param context The request context.
+     * @param groupIds The groupIds of the groups to be deleted
+     * @return A Result containing the 
DeleteGroupsResponseData.DeletableGroupResultCollection response and
+     *         a list of records to update the state machine.
+     */
+    public 
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> deleteGroups(
+        RequestContext context,
+        List<String> groupIds
+    ) throws ApiException {
+        final DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        final List<Record> records = new ArrayList<>();
+
+        groupIds.forEach(groupId -> {
+            try {
+                groupMetadataManager.validateGroupDelete(groupId);
+
+                
offsetMetadataManager.populateRecordListToDeleteAllOffsets(context, groupId, 
records);
+                final 
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResult, Record> 
deleteGroupCoordinatorResult =
+                    groupMetadataManager.groupDelete(context, groupId);
+                records.addAll(deleteGroupCoordinatorResult.records());
+
+                resultCollection.add(deleteGroupCoordinatorResult.response());
+            } catch (ApiException exception) {
+                resultCollection.add(
+                    new DeleteGroupsResponseData.DeletableGroupResult()
+                        .setGroupId(groupId)
+                        .setErrorCode(Errors.forException(exception).code())
+                );
+            }
+        });
+
+        return new CoordinatorResult<>(records, resultCollection);
+

Review Comment:
   nit: We could remove this empty line?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -242,6 +244,19 @@ private void validateOffsetFetch(
         );
     }
 
+    /**
+     * Validates an OffsetDelete request.
+     *
+     * @param request The actual request.
+     */
+    private Group validateOffsetDelete(
+            OffsetDeleteRequestData request

Review Comment:
   nit: Indentation is incorrect. 



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3071,6 +3072,39 @@ private void removeCurrentMemberFromGenericGroup(
         group.remove(member.memberId());
     }
 
+    /**
+     * Handles a GroupDelete request.
+     *
+     * @param context The request context.
+     * @param groupId The group id of the group to be deleted.
+     * @return A Result containing the 
DeleteGroupsResponseData.DeletableGroupResult response and
+     *         a list of records to update the state machine.
+     */
+    public CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResult, 
Record> groupDelete(
+            RequestContext context,
+            String groupId
+    ) throws ApiException {
+        DeleteGroupsResponseData.DeletableGroupResult result =
+            new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId(groupId);
+
+        final List<Record> records = new ArrayList<>();
+        records.add(group(groupId).createMetadataTombstoneRecord());
+
+        return new CoordinatorResult<>(records, result);
+    }
+
+    void validateGroupDelete(String groupId) throws ApiException {
+

Review Comment:
   nit: Let's remove this empty line and add javadoc to the method.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -333,6 +348,80 @@ public CoordinatorResult<OffsetCommitResponseData, Record> 
commitOffset(
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handles an OffsetDelete request.
+     *
+     * @param context The request context.
+     * @param request The OffsetDelete request.
+     *
+     * @return A Result containing the OffsetDeleteResponseData response and
+     *         a list of records to update the state machine.
+     */
+    public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+        RequestContext context,
+        OffsetDeleteRequestData request
+    ) throws ApiException {
+        final List<Record> records = new ArrayList<>();
+        final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =
+            new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData();
+        try {
+            Group group = validateOffsetDelete(request);

Review Comment:
   Let's do the validation before allocating response, records, etc. We don't 
have to allocate them if the request is invalid. `group` could also be `final`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -333,6 +348,80 @@ public CoordinatorResult<OffsetCommitResponseData, Record> 
commitOffset(
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handles an OffsetDelete request.
+     *
+     * @param context The request context.
+     * @param request The OffsetDelete request.
+     *
+     * @return A Result containing the OffsetDeleteResponseData response and
+     *         a list of records to update the state machine.
+     */
+    public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+        RequestContext context,
+        OffsetDeleteRequestData request
+    ) throws ApiException {
+        final List<Record> records = new ArrayList<>();
+        final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =
+            new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData();

Review Comment:
   nit: final?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -333,6 +348,80 @@ public CoordinatorResult<OffsetCommitResponseData, Record> 
commitOffset(
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handles an OffsetDelete request.
+     *
+     * @param context The request context.
+     * @param request The OffsetDelete request.
+     *
+     * @return A Result containing the OffsetDeleteResponseData response and
+     *         a list of records to update the state machine.
+     */
+    public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+        RequestContext context,
+        OffsetDeleteRequestData request
+    ) throws ApiException {
+        final List<Record> records = new ArrayList<>();
+        final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =
+            new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData();
+        try {
+            Group group = validateOffsetDelete(request);
+
+            request.topics().forEach(topic -> {
+                final 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
responsePartitionCollection =
+                    new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+                final boolean subscribedToTopic = 
group.isSubscribedToTopic(topic.name());
+
+                topic.partitions().forEach(partition -> {
+                    OffsetDeleteResponseData.OffsetDeleteResponsePartition 
responsePartition =
+                        new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(partition.partitionIndex());
+                    if (subscribedToTopic) {
+                        responsePartition = 
responsePartition.setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code());
+                    } else {
+                        
records.add(RecordHelpers.newOffsetCommitTombstoneRecord(

Review Comment:
   I wonder if we need to verify if there is actually an offset for the 
topic/partition. We don't need to write a tombstone if there is not. What do 
you think?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -333,6 +348,80 @@ public CoordinatorResult<OffsetCommitResponseData, Record> 
commitOffset(
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handles an OffsetDelete request.
+     *
+     * @param context The request context.
+     * @param request The OffsetDelete request.
+     *
+     * @return A Result containing the OffsetDeleteResponseData response and
+     *         a list of records to update the state machine.
+     */
+    public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+        RequestContext context,
+        OffsetDeleteRequestData request
+    ) throws ApiException {
+        final List<Record> records = new ArrayList<>();
+        final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =
+            new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData();
+        try {
+            Group group = validateOffsetDelete(request);
+
+            request.topics().forEach(topic -> {
+                final 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
responsePartitionCollection =
+                    new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+                final boolean subscribedToTopic = 
group.isSubscribedToTopic(topic.name());
+
+                topic.partitions().forEach(partition -> {
+                    OffsetDeleteResponseData.OffsetDeleteResponsePartition 
responsePartition =
+                        new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(partition.partitionIndex());
+                    if (subscribedToTopic) {
+                        responsePartition = 
responsePartition.setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code());
+                    } else {
+                        
records.add(RecordHelpers.newOffsetCommitTombstoneRecord(
+                            request.groupId(),
+                            topic.name(),
+                            partition.partitionIndex()
+                        ));
+                    }
+                    responsePartitionCollection.add(responsePartition);
+                });
+
+                final OffsetDeleteResponseData.OffsetDeleteResponseTopic 
responseTopic =
+                    new 
OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection);
+                responseTopicCollection.add(responseTopic);
+            });
+            response = response.setTopics(responseTopicCollection);

Review Comment:
   nit: `response = ` is not needed here as `setTopics` mutates the response 
directly.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -592,6 +607,45 @@ public void validateOffsetFetch(
         validateMemberEpoch(memberEpoch, member.memberEpoch());
     }
 
+    /**
+     * Validates the OffsetDelete request.
+     */
+    @Override
+    public void validateOffsetDelete() throws GroupIdNotFoundException {
+        if (state() == ConsumerGroupState.DEAD) {
+            throw new GroupIdNotFoundException(String.format("Group %s is in 
dead state.", groupId));
+        }
+    }
+
+    /**
+     * Validates the GroupDelete request.
+     */
+    @Override
+    public void validateGroupDelete() throws ApiException {
+        if (state() == ConsumerGroupState.DEAD) {
+            throw new GroupIdNotFoundException(String.format("Group %s is in 
dead state.", groupId));
+        } else if (state() == ConsumerGroupState.STABLE
+            || state() == ConsumerGroupState.ASSIGNING
+            || state() == ConsumerGroupState.RECONCILING) {
+            throw Errors.NON_EMPTY_GROUP.exception();
+        }

Review Comment:
   I wonder if using a switch would be better here. What do you think?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -592,6 +607,45 @@ public void validateOffsetFetch(
         validateMemberEpoch(memberEpoch, member.memberEpoch());
     }
 
+    /**
+     * Validates the OffsetDelete request.
+     */
+    @Override
+    public void validateOffsetDelete() throws GroupIdNotFoundException {
+        if (state() == ConsumerGroupState.DEAD) {
+            throw new GroupIdNotFoundException(String.format("Group %s is in 
dead state.", groupId));
+        }
+    }
+
+    /**
+     * Validates the GroupDelete request.
+     */
+    @Override
+    public void validateGroupDelete() throws ApiException {
+        if (state() == ConsumerGroupState.DEAD) {
+            throw new GroupIdNotFoundException(String.format("Group %s is in 
dead state.", groupId));

Review Comment:
   ditto.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##########
@@ -849,6 +853,46 @@ public void validateOffsetFetch(
         }
     }
 
+    /**
+     * Validates the OffsetDelete request.
+     */
+    @Override
+    public void validateOffsetDelete() throws GroupIdNotFoundException {
+        if (isInState(DEAD)) {
+            throw new GroupIdNotFoundException(String.format("Group %s is in 
dead state.", groupId));
+        }
+    }
+
+    /**
+     * Validates the GroupDelete request.
+     */
+    @Override
+    public void validateGroupDelete() throws ApiException {
+        if (isInState(DEAD)) {
+            throw new GroupIdNotFoundException(String.format("Group %s is in 
dead state.", groupId));
+        } else if (isInState(STABLE)
+            || isInState(PREPARING_REBALANCE)
+            || isInState(COMPLETING_REBALANCE)) {
+            throw Errors.NON_EMPTY_GROUP.exception();
+        }
+
+        // We avoid writing the tombstone when the generationId is 0, since 
this group is only using
+        // Kafka for offset storage.
+        if (generationId() <= 0) {
+            throw Errors.UNKNOWN_SERVER_ERROR.exception();

Review Comment:
   Throwing an exception does not seem to be the right approach to me because 
we still want to delete the group and the exception will stop the process. My 
understanding is that we could just skip generating the tombstone if the 
generation <= 0.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to