dajac commented on code in PR #14408:
URL: https://github.com/apache/kafka/pull/14408#discussion_r1340716345


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -262,6 +267,44 @@ public HeartbeatResponseData genericGroupHeartbeat(
         );
     }
 
+    /**
+     * Handles a DeleteGroups request.
+     *
+     * @param context   The request context.
+     * @param groupIds  The groupIds of the groups to be deleted
+     * @return A Result containing the 
DeleteGroupsResponseData.DeletableGroupResultCollection response and
+     *         a list of records to update the state machine.
+     */
+    public 
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> deleteGroups(
+        RequestContext context,
+        List<String> groupIds
+    ) throws ApiException {
+        final DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();

Review Comment:
   nit: Should we set the expected size here?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##########
@@ -849,6 +853,42 @@ public void validateOffsetFetch(
         }
     }
 
+    /**
+     * Validates the OffsetDelete request.
+     */
+    @Override
+    public void validateOffsetDelete() throws ApiException {
+        if (isInState(DEAD)) {

Review Comment:
   @jeffkbkim Do we ever transition to Dead? If not, I wonder if we should just 
remove this and remove the Dead state. What do you think?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -105,6 +115,107 @@ public void testCommitOffset() {
         assertEquals(result, coordinator.commitOffset(context, request));
     }
 
+    @Test
+    public void testDeleteGroups() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            groupMetadataManager,
+            offsetMetadataManager
+        );
+
+        RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+        List<String> groupIds = Arrays.asList("group-id-1", "group-id-2");
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection = new 
DeleteGroupsResponseData.DeletableGroupResultCollection();
+        expectedResultCollection.add(new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-1"));

Review Comment:
   ditto for those two.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -1561,6 +1604,156 @@ public void 
testConsumerGroupOffsetFetchWithStaleMemberEpoch() {
             () -> context.fetchAllOffsets("group", "member", 10, 
Long.MAX_VALUE));
     }
 
+    static private void testOffsetDeleteWith(
+        OffsetMetadataManagerTestContext context,
+        String groupId,
+        String topic,
+        int partition,
+        Errors error
+    ) {
+        final OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+        requestTopicCollection.add(

Review Comment:
   We could also apply my formatting suggestion here.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##########
@@ -90,4 +92,29 @@ void validateOffsetFetch(
         int memberEpoch,
         long lastCommittedOffset
     ) throws KafkaException;
+
+    /**
+     * Validates the OffsetDelete request.
+     */
+    void validateOffsetDelete() throws KafkaException;
+
+    /**
+     * Validates the DeleteGroups request.
+     */
+    void validateDeleteGroup() throws KafkaException;
+
+    /**
+     * Returns true if the group is actively subscribed to the topic.
+     *
+     * @param topic The topic name.
+     * @return Whether the group is subscribed to the topic.
+     */
+    boolean isSubscribedToTopic(String topic);
+
+    /**
+     * Creates tombstone(s) for deleting the group.
+     *
+     * @return The list of tombstone record(s).
+     */
+    List<Record> createGroupTombstoneRecords();

Review Comment:
   I wonder if we should rather pass the list of records as an argument in 
order to avoid having to copy the records afterwards. Have you considered this?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3071,6 +3071,35 @@ private void removeCurrentMemberFromGenericGroup(
         group.remove(member.memberId());
     }
 
+    /**
+     * Handles a DeleteGroups request.
+     * Populates the record list passed in with record to update the state 
machine.
+     * Validations are done in {@link 
GroupCoordinatorShard#deleteGroups(RequestContext, List)} by
+     * calling {@link GroupMetadataManager#validateDeleteGroup(String)}.
+     *
+     * @param groupId The ID of the group to be deleted.
+     * @param records The record list to populate.
+     */
+    public void deleteGroup(
+        String groupId,
+        List<Record> records
+    ) {
+        // groupId has been checked in 
GroupMetadataManager#validateDeleteGroup.
+        // In this method, we only populate records with tombstone records, so 
we don't expect an exception to be thrown here.
+

Review Comment:
   nit: Let's remove this empty line.



##########
clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsRequest.java:
##########
@@ -78,4 +79,18 @@ public static DeleteGroupsRequest parse(ByteBuffer buffer, 
short version) {
     public DeleteGroupsRequestData data() {
         return data;
     }
+
+    public static DeleteGroupsResponseData.DeletableGroupResultCollection 
getErrorResultCollection(

Review Comment:
   nit: Could we refactor `getErrorResponse` to use this new method as well? 
Should we also add a unit test for this one?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3071,6 +3071,35 @@ private void removeCurrentMemberFromGenericGroup(
         group.remove(member.memberId());
     }
 
+    /**
+     * Handles a DeleteGroups request.
+     * Populates the record list passed in with record to update the state 
machine.
+     * Validations are done in {@link 
GroupCoordinatorShard#deleteGroups(RequestContext, List)} by
+     * calling {@link GroupMetadataManager#validateDeleteGroup(String)}.
+     *
+     * @param groupId The ID of the group to be deleted.
+     * @param records The record list to populate.
+     */
+    public void deleteGroup(
+        String groupId,
+        List<Record> records
+    ) {
+        // groupId has been checked in 
GroupMetadataManager#validateDeleteGroup.

Review Comment:
   nit: Should we just add this to the document of the groupId field?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##########
@@ -849,6 +853,42 @@ public void validateOffsetFetch(
         }
     }
 
+    /**
+     * Validates the OffsetDelete request.
+     */
+    @Override
+    public void validateOffsetDelete() throws ApiException {
+        if (isInState(DEAD)) {
+            throw new GroupIdNotFoundException(String.format("Group %s is in 
dead state.", groupId));
+        } else if (!usesConsumerGroupProtocol()
+            && (isInState(STABLE) || isInState(PREPARING_REBALANCE) || 
isInState(COMPLETING_REBALANCE))) {
+            throw Errors.NON_EMPTY_GROUP.exception();
+        }
+    }
+
+    /**
+     * Validates the DeleteGroups request.
+     */
+    @Override
+    public void validateDeleteGroup() throws ApiException {
+        if (isInState(DEAD)) {
+            throw new GroupIdNotFoundException(String.format("Group %s is in 
dead state.", groupId));
+        } else if (isInState(STABLE)

Review Comment:
   nit: ditto for the switch.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##########
@@ -849,6 +853,42 @@ public void validateOffsetFetch(
         }
     }
 
+    /**
+     * Validates the OffsetDelete request.
+     */
+    @Override
+    public void validateOffsetDelete() throws ApiException {
+        if (isInState(DEAD)) {
+            throw new GroupIdNotFoundException(String.format("Group %s is in 
dead state.", groupId));
+        } else if (!usesConsumerGroupProtocol()
+            && (isInState(STABLE) || isInState(PREPARING_REBALANCE) || 
isInState(COMPLETING_REBALANCE))) {

Review Comment:
   nit: I wonder if using a switch would be better here. Have you considered it?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -936,4 +940,216 @@ public void 
testLeaveGroupThrowsUnknownMemberIdException() throws Exception {
 
         assertEquals(expectedResponse, future.get());
     }
+
+    @Test
+    public void testDeleteOffsets() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+        requestTopicCollection.add(
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                .setName("topic")
+                .setPartitions(Collections.singletonList(
+                    new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                ))
+        );
+        OffsetDeleteRequestData request = new 
OffsetDeleteRequestData().setGroupId("group")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
responsePartitionCollection =
+            new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+        responsePartitionCollection.add(
+            new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(0)
+        );
+        OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =
+            new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+        responseTopicCollection.add(
+            new 
OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection)
+        );
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setTopics(responseTopicCollection);
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offsets"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(response));
+
+        CompletableFuture<OffsetDeleteResponseData> future = 
service.deleteOffsets(
+            requestContext(ApiKeys.OFFSET_DELETE),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(response, future.get());
+    }
+
+    @Test
+    public void testDeleteOffsetsInvalidGroupId() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+        requestTopicCollection.add(
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                .setName("topic")
+                .setPartitions(Collections.singletonList(
+                    new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                ))
+        );
+        OffsetDeleteRequestData request = new 
OffsetDeleteRequestData().setGroupId("")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setErrorCode(Errors.INVALID_GROUP_ID.code());
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offsets"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(response));
+
+        CompletableFuture<OffsetDeleteResponseData> future = 
service.deleteOffsets(
+            requestContext(ApiKeys.OFFSET_DELETE),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(response, future.get());
+    }
+
+    @Test
+    public void testDeleteOffsetsCoordinatorNotAvailableException() throws 
Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+        requestTopicCollection.add(
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                .setName("topic")
+                .setPartitions(Collections.singletonList(
+                    new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                ))
+        );
+        OffsetDeleteRequestData request = new 
OffsetDeleteRequestData().setGroupId("group")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offsets"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(FutureUtils.failedFuture(
+            new CoordinatorLoadInProgressException(null)
+        ));
+
+        CompletableFuture<OffsetDeleteResponseData> future = 
service.deleteOffsets(
+            requestContext(ApiKeys.OFFSET_DELETE),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(response, future.get());
+    }
+
+    @Test
+    public void testDeleteGroups() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 3);
+        CountDownLatch latch = new CountDownLatch(1);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection1 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        DeleteGroupsResponseData.DeletableGroupResult result1 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("group-id-1");
+        resultCollection1.add(result1);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection2 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        DeleteGroupsResponseData.DeletableGroupResult result2 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("group-id-2");
+        resultCollection2.add(result2);
+
+        DeleteGroupsResponseData.DeletableGroupResult result3 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("group-id-3")
+            .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        expectedResultCollection.addAll(Arrays.asList(
+            new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId(null).setErrorCode(Errors.INVALID_GROUP_ID.code()),
+            result1.duplicate(),
+            result2.duplicate(),
+            result3.duplicate()
+        ));
+
+        when(runtime.partitions()).thenReturn(Sets.newSet(
+            new TopicPartition("__consumer_offsets", 0),
+            new TopicPartition("__consumer_offsets", 1),
+            new TopicPartition("__consumer_offsets", 2)
+        ));
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 2)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(resultCollection1));
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenAnswer(invocation -> CompletableFuture.supplyAsync(() -> {
+            try {
+                assertTrue(latch.await(5, TimeUnit.SECONDS));
+            } catch (InterruptedException ignored) { }
+            return resultCollection2;
+        }));

Review Comment:
   I am not sure to understand what you are trying to achieve here. Could you 
elaborate?
   
   If you want to delay the completion of the future, the best would be to 
create a CompletableFuture, use thenReturn(future), and then complete the 
future at L1149.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -105,6 +115,107 @@ public void testCommitOffset() {
         assertEquals(result, coordinator.commitOffset(context, request));
     }
 
+    @Test
+    public void testDeleteGroups() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            groupMetadataManager,
+            offsetMetadataManager
+        );
+
+        RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+        List<String> groupIds = Arrays.asList("group-id-1", "group-id-2");
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection = new 
DeleteGroupsResponseData.DeletableGroupResultCollection();
+        expectedResultCollection.add(new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-1"));
+        expectedResultCollection.add(new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-2"));
+        List<Record> expectedRecords = Arrays.asList(

Review Comment:
   Could we also use `groupIds` to generate the list here?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -936,4 +940,216 @@ public void 
testLeaveGroupThrowsUnknownMemberIdException() throws Exception {
 
         assertEquals(expectedResponse, future.get());
     }
+
+    @Test
+    public void testDeleteOffsets() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+        requestTopicCollection.add(
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                .setName("topic")
+                .setPartitions(Collections.singletonList(
+                    new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                ))
+        );
+        OffsetDeleteRequestData request = new 
OffsetDeleteRequestData().setGroupId("group")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
responsePartitionCollection =
+            new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+        responsePartitionCollection.add(
+            new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(0)
+        );
+        OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =
+            new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+        responseTopicCollection.add(
+            new 
OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection)
+        );
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setTopics(responseTopicCollection);
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offsets"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(response));
+
+        CompletableFuture<OffsetDeleteResponseData> future = 
service.deleteOffsets(
+            requestContext(ApiKeys.OFFSET_DELETE),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(response, future.get());
+    }
+
+    @Test
+    public void testDeleteOffsetsInvalidGroupId() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+        requestTopicCollection.add(
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                .setName("topic")
+                .setPartitions(Collections.singletonList(
+                    new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                ))
+        );
+        OffsetDeleteRequestData request = new 
OffsetDeleteRequestData().setGroupId("")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setErrorCode(Errors.INVALID_GROUP_ID.code());
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offsets"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(response));
+
+        CompletableFuture<OffsetDeleteResponseData> future = 
service.deleteOffsets(
+            requestContext(ApiKeys.OFFSET_DELETE),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(response, future.get());
+    }
+
+    @Test
+    public void testDeleteOffsetsCoordinatorNotAvailableException() throws 
Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+        requestTopicCollection.add(
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                .setName("topic")
+                .setPartitions(Collections.singletonList(
+                    new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                ))
+        );
+        OffsetDeleteRequestData request = new 
OffsetDeleteRequestData().setGroupId("group")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offsets"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(FutureUtils.failedFuture(
+            new CoordinatorLoadInProgressException(null)

Review Comment:
   nit: The `null` here is not ideal. Could we put a string instead? Or you 
could also use COORDINATOR_LOAD_IN_PROGRESS.exception().



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -105,6 +115,107 @@ public void testCommitOffset() {
         assertEquals(result, coordinator.commitOffset(context, request));
     }
 
+    @Test
+    public void testDeleteGroups() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            groupMetadataManager,
+            offsetMetadataManager
+        );
+
+        RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+        List<String> groupIds = Arrays.asList("group-id-1", "group-id-2");
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection = new 
DeleteGroupsResponseData.DeletableGroupResultCollection();
+        expectedResultCollection.add(new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-1"));
+        expectedResultCollection.add(new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-2"));
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newOffsetCommitTombstoneRecord("group-id-1", 
"topic-name", 0),
+            RecordHelpers.newGroupMetadataTombstoneRecord("group-id-1"),
+            RecordHelpers.newOffsetCommitTombstoneRecord("group-id-2", 
"topic-name", 0),
+            RecordHelpers.newGroupMetadataTombstoneRecord("group-id-2")
+        );
+        
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> expectedResult = new CoordinatorResult<>(
+            expectedRecords,
+            expectedResultCollection
+        );
+
+        
doNothing().when(groupMetadataManager).validateDeleteGroup(ArgumentMatchers.anyString());
+        doAnswer(invocation -> {

Review Comment:
   Is there a reason why you don't use when().thenAnswer(...)?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -936,4 +940,216 @@ public void 
testLeaveGroupThrowsUnknownMemberIdException() throws Exception {
 
         assertEquals(expectedResponse, future.get());
     }
+
+    @Test
+    public void testDeleteOffsets() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+        requestTopicCollection.add(
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                .setName("topic")
+                .setPartitions(Collections.singletonList(
+                    new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                ))
+        );
+        OffsetDeleteRequestData request = new 
OffsetDeleteRequestData().setGroupId("group")

Review Comment:
   nit: Could we put `setGroupId` on a new line as well?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -105,6 +115,107 @@ public void testCommitOffset() {
         assertEquals(result, coordinator.commitOffset(context, request));
     }
 
+    @Test
+    public void testDeleteGroups() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            groupMetadataManager,
+            offsetMetadataManager
+        );
+
+        RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+        List<String> groupIds = Arrays.asList("group-id-1", "group-id-2");
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection = new 
DeleteGroupsResponseData.DeletableGroupResultCollection();
+        expectedResultCollection.add(new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-1"));
+        expectedResultCollection.add(new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-2"));
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newOffsetCommitTombstoneRecord("group-id-1", 
"topic-name", 0),
+            RecordHelpers.newGroupMetadataTombstoneRecord("group-id-1"),
+            RecordHelpers.newOffsetCommitTombstoneRecord("group-id-2", 
"topic-name", 0),
+            RecordHelpers.newGroupMetadataTombstoneRecord("group-id-2")
+        );
+        
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> expectedResult = new CoordinatorResult<>(
+            expectedRecords,
+            expectedResultCollection
+        );
+
+        
doNothing().when(groupMetadataManager).validateDeleteGroup(ArgumentMatchers.anyString());
+        doAnswer(invocation -> {
+            String groupId = invocation.getArgument(0);
+            List<Record> records = invocation.getArgument(1);
+            records.add(RecordHelpers.newOffsetCommitTombstoneRecord(groupId, 
"topic-name", 0));
+            return null;
+        }).when(offsetMetadataManager).deleteAllOffsets(anyString(), 
anyList());
+        doAnswer(invocation -> {
+            String groupId = invocation.getArgument(0);
+            List<Record> records = invocation.getArgument(1);
+            
records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId));
+            return null;
+        }).when(groupMetadataManager).deleteGroup(anyString(), anyList());
+        
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> coordinatorResult =
+            coordinator.deleteGroups(context, groupIds);
+
+        for (String groupId : groupIds) {
+            verify(groupMetadataManager, 
times(1)).validateDeleteGroup(ArgumentMatchers.eq(groupId));
+            verify(groupMetadataManager, 
times(1)).deleteGroup(ArgumentMatchers.eq(groupId), anyList());
+            verify(offsetMetadataManager, 
times(1)).deleteAllOffsets(ArgumentMatchers.eq(groupId), anyList());
+        }
+        assertEquals(expectedResult, coordinatorResult);
+    }
+
+    @Test
+    public void testDeleteGroupsInvalidGroupId() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            groupMetadataManager,
+            offsetMetadataManager
+        );
+
+        RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+        List<String> groupIds = Arrays.asList("group-id-1", "group-id-2", 
"group-id-3");
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection = new 
DeleteGroupsResponseData.DeletableGroupResultCollection();
+        expectedResultCollection.add(new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-1"));
+        expectedResultCollection.add(new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("group-id-2")
+            .setErrorCode(Errors.INVALID_GROUP_ID.code())
+        );
+        expectedResultCollection.add(new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-3"));

Review Comment:
   small nit: It may be a bit easier to read if we create the expected response 
as follow? What do you think? If you find it better, we could also update the 
other test cases.
   
   ```
   new DeleteGroupsResponseData.DeletableGroupResultCollection(Arrays.asList(
      new DeleteGroupsResponseData.DeletableGroupResult()
          .setGroupId("group-id-1"),
     ....
   ).iterator());
   
   
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -105,6 +115,107 @@ public void testCommitOffset() {
         assertEquals(result, coordinator.commitOffset(context, request));
     }
 
+    @Test
+    public void testDeleteGroups() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            groupMetadataManager,
+            offsetMetadataManager
+        );
+
+        RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+        List<String> groupIds = Arrays.asList("group-id-1", "group-id-2");
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection = new 
DeleteGroupsResponseData.DeletableGroupResultCollection();
+        expectedResultCollection.add(new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-1"));
+        expectedResultCollection.add(new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-2"));
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newOffsetCommitTombstoneRecord("group-id-1", 
"topic-name", 0),
+            RecordHelpers.newGroupMetadataTombstoneRecord("group-id-1"),
+            RecordHelpers.newOffsetCommitTombstoneRecord("group-id-2", 
"topic-name", 0),
+            RecordHelpers.newGroupMetadataTombstoneRecord("group-id-2")
+        );
+        
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> expectedResult = new CoordinatorResult<>(
+            expectedRecords,
+            expectedResultCollection
+        );
+
+        
doNothing().when(groupMetadataManager).validateDeleteGroup(ArgumentMatchers.anyString());
+        doAnswer(invocation -> {
+            String groupId = invocation.getArgument(0);
+            List<Record> records = invocation.getArgument(1);
+            records.add(RecordHelpers.newOffsetCommitTombstoneRecord(groupId, 
"topic-name", 0));
+            return null;
+        }).when(offsetMetadataManager).deleteAllOffsets(anyString(), 
anyList());
+        doAnswer(invocation -> {
+            String groupId = invocation.getArgument(0);
+            List<Record> records = invocation.getArgument(1);
+            
records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId));
+            return null;
+        }).when(groupMetadataManager).deleteGroup(anyString(), anyList());
+        
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> coordinatorResult =

Review Comment:
   nit: Could we put an empty line before this one? I find the code a bit hard 
to read because all the lines are all together.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -177,6 +183,29 @@ public CoordinatorResult<OffsetCommitResponseData, Record> 
commitOffset(
             return result;
         }
 
+        public CoordinatorResult<OffsetDeleteResponseData, Record> 
deleteOffsets(
+            OffsetDeleteRequestData request
+        ) {
+            snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset); // TODO: 
lastCommitedOffset or lastWrittenOffset?

Review Comment:
   In the GroupMetadataManagerTestContext, we actually moved this to the replay 
method. See 
[here](https://github.com/apache/kafka/blob/ad7956170bcaf093ea8b2f725126d42cf7fb522b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java#L1144).
 It may be better to do the same here. What do you think?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -105,6 +115,107 @@ public void testCommitOffset() {
         assertEquals(result, coordinator.commitOffset(context, request));
     }
 
+    @Test
+    public void testDeleteGroups() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            groupMetadataManager,
+            offsetMetadataManager
+        );
+
+        RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+        List<String> groupIds = Arrays.asList("group-id-1", "group-id-2");
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection = new 
DeleteGroupsResponseData.DeletableGroupResultCollection();
+        expectedResultCollection.add(new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-1"));
+        expectedResultCollection.add(new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-2"));
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newOffsetCommitTombstoneRecord("group-id-1", 
"topic-name", 0),
+            RecordHelpers.newGroupMetadataTombstoneRecord("group-id-1"),
+            RecordHelpers.newOffsetCommitTombstoneRecord("group-id-2", 
"topic-name", 0),
+            RecordHelpers.newGroupMetadataTombstoneRecord("group-id-2")
+        );
+        
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> expectedResult = new CoordinatorResult<>(
+            expectedRecords,
+            expectedResultCollection
+        );
+
+        
doNothing().when(groupMetadataManager).validateDeleteGroup(ArgumentMatchers.anyString());
+        doAnswer(invocation -> {
+            String groupId = invocation.getArgument(0);
+            List<Record> records = invocation.getArgument(1);
+            records.add(RecordHelpers.newOffsetCommitTombstoneRecord(groupId, 
"topic-name", 0));
+            return null;
+        }).when(offsetMetadataManager).deleteAllOffsets(anyString(), 
anyList());
+        doAnswer(invocation -> {
+            String groupId = invocation.getArgument(0);
+            List<Record> records = invocation.getArgument(1);
+            
records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId));
+            return null;
+        }).when(groupMetadataManager).deleteGroup(anyString(), anyList());
+        
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> coordinatorResult =
+            coordinator.deleteGroups(context, groupIds);
+
+        for (String groupId : groupIds) {
+            verify(groupMetadataManager, 
times(1)).validateDeleteGroup(ArgumentMatchers.eq(groupId));
+            verify(groupMetadataManager, 
times(1)).deleteGroup(ArgumentMatchers.eq(groupId), anyList());
+            verify(offsetMetadataManager, 
times(1)).deleteAllOffsets(ArgumentMatchers.eq(groupId), anyList());
+        }
+        assertEquals(expectedResult, coordinatorResult);
+    }
+
+    @Test
+    public void testDeleteGroupsInvalidGroupId() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            groupMetadataManager,
+            offsetMetadataManager
+        );
+
+        RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
+        List<String> groupIds = Arrays.asList("group-id-1", "group-id-2", 
"group-id-3");
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection = new 
DeleteGroupsResponseData.DeletableGroupResultCollection();
+        expectedResultCollection.add(new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-1"));
+        expectedResultCollection.add(new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("group-id-2")
+            .setErrorCode(Errors.INVALID_GROUP_ID.code())
+        );
+        expectedResultCollection.add(new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-3"));
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newOffsetCommitTombstoneRecord("group-id-1", 
"topic-name", 0),
+            RecordHelpers.newGroupMetadataTombstoneRecord("group-id-1"),
+            RecordHelpers.newOffsetCommitTombstoneRecord("group-id-3", 
"topic-name", 0),
+            RecordHelpers.newGroupMetadataTombstoneRecord("group-id-3")
+        );
+        
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> expectedResult = new CoordinatorResult<>(
+            expectedRecords,
+            expectedResultCollection
+        );
+
+        doThrow(Errors.INVALID_GROUP_ID.exception())
+            
.when(groupMetadataManager).validateDeleteGroup(ArgumentMatchers.eq("group-id-2"));
+        doAnswer(invocation -> {
+            String groupId = invocation.getArgument(0);
+            List<Record> records = invocation.getArgument(1);
+            records.add(RecordHelpers.newOffsetCommitTombstoneRecord(groupId, 
"topic-name", 0));
+            return null;
+        }).when(offsetMetadataManager).deleteAllOffsets(anyString(), 
anyList());
+        doAnswer(invocation -> {
+            String groupId = invocation.getArgument(0);
+            List<Record> records = invocation.getArgument(1);
+            
records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId));
+            return null;
+        }).when(groupMetadataManager).deleteGroup(anyString(), anyList());
+        
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> coordinatorResult =
+            coordinator.deleteGroups(context, groupIds);
+
+        verify(groupMetadataManager, 
times(3)).validateDeleteGroup(anyString());
+        verify(groupMetadataManager, times(2)).deleteGroup(anyString(), 
anyList());
+        verify(offsetMetadataManager, times(2)).deleteAllOffsets(anyString(), 
anyList());

Review Comment:
   Should we at minimum verify the group ids here?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -274,6 +303,20 @@ public void commitOffset(
             ));
         }
 
+        public void deleteOffset(
+            String groupId,
+            String topic,
+            int partition
+        ) {
+            snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset); // TODO: 
lastCommitedOffset or lastWrittenOffset?
+
+            replay(RecordHelpers.newOffsetCommitTombstoneRecord(
+                    groupId,
+                    topic,
+                    partition

Review Comment:
   nit: Indentation should be 4 spaces.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -1561,6 +1604,156 @@ public void 
testConsumerGroupOffsetFetchWithStaleMemberEpoch() {
             () -> context.fetchAllOffsets("group", "member", 10, 
Long.MAX_VALUE));
     }
 
+    static private void testOffsetDeleteWith(

Review Comment:
   Should we move this method to the test context?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -1561,6 +1604,156 @@ public void 
testConsumerGroupOffsetFetchWithStaleMemberEpoch() {
             () -> context.fetchAllOffsets("group", "member", 10, 
Long.MAX_VALUE));
     }
 
+    static private void testOffsetDeleteWith(
+        OffsetMetadataManagerTestContext context,
+        String groupId,
+        String topic,
+        int partition,
+        Errors error
+    ) {
+        final OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+        requestTopicCollection.add(
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                .setName(topic)
+                .setPartitions(Collections.singletonList(
+                    new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(partition)
+                ))
+        );
+
+        final OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
expectedResponsePartitionCollection =
+            new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+        expectedResponsePartitionCollection.add(
+            new OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+                .setPartitionIndex(partition)
+                .setErrorCode(error.code())
+        );
+        final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
expectedResponseTopicCollection =
+            new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+        expectedResponseTopicCollection.add(
+            new OffsetDeleteResponseData.OffsetDeleteResponseTopic()
+                .setName(topic)
+                .setPartitions(expectedResponsePartitionCollection)
+        );
+
+        final List<Record> expectedRecords = error == Errors.NONE &&
+            context.offsetMetadataManager.offset(groupId, topic, partition) != 
null ?
+            Collections.singletonList(
+                RecordHelpers.newOffsetCommitTombstoneRecord(groupId, topic, 
partition)
+            ) :
+            Collections.emptyList();

Review Comment:
   This block is really hard to parse. 
   
   ```
   final List<Record> expectedRecords = 
        error == Errors.NONE && context.offsetMetadataManager.offset(groupId, 
topic, partition) != null ?
               
Collections.singletonList(RecordHelpers.newOffsetCommitTombstoneRecord(groupId, 
topic, partition)) :
               Collections.emptyList();
   ```
   
   Would it be better like this? Otherwise, I would use a regular if statement.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -1561,6 +1604,156 @@ public void 
testConsumerGroupOffsetFetchWithStaleMemberEpoch() {
             () -> context.fetchAllOffsets("group", "member", 10, 
Long.MAX_VALUE));
     }
 
+    static private void testOffsetDeleteWith(
+        OffsetMetadataManagerTestContext context,
+        String groupId,
+        String topic,
+        int partition,
+        Errors error
+    ) {
+        final OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+        requestTopicCollection.add(
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                .setName(topic)
+                .setPartitions(Collections.singletonList(
+                    new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(partition)
+                ))
+        );
+
+        final OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
expectedResponsePartitionCollection =
+            new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+        expectedResponsePartitionCollection.add(
+            new OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+                .setPartitionIndex(partition)
+                .setErrorCode(error.code())
+        );
+        final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
expectedResponseTopicCollection =
+            new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+        expectedResponseTopicCollection.add(
+            new OffsetDeleteResponseData.OffsetDeleteResponseTopic()
+                .setName(topic)
+                .setPartitions(expectedResponsePartitionCollection)
+        );
+
+        final List<Record> expectedRecords = error == Errors.NONE &&
+            context.offsetMetadataManager.offset(groupId, topic, partition) != 
null ?
+            Collections.singletonList(
+                RecordHelpers.newOffsetCommitTombstoneRecord(groupId, topic, 
partition)
+            ) :
+            Collections.emptyList();
+
+        CoordinatorResult<OffsetDeleteResponseData, Record> coordinatorResult 
= context.deleteOffsets(
+            new OffsetDeleteRequestData()
+                .setGroupId(groupId)
+                .setTopics(requestTopicCollection)
+        );
+
+        assertEquals(new 
OffsetDeleteResponseData().setTopics(expectedResponseTopicCollection), 
coordinatorResult.response());
+        assertEquals(expectedRecords, coordinatorResult.records());
+    }
+
+    @Test
+    public void testGenericGroupOffsetDelete() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+        GenericGroup group = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+            "foo",
+            true
+        );
+        context.commitOffset("foo", "bar", 0, 100L, 0);
+        group.setSubscribedTopics(Optional.of(Collections.emptySet()));
+        testOffsetDeleteWith(context, "foo", "bar", 0, Errors.NONE);
+    }
+
+    @Test
+    public void testGenericGroupOffsetDeleteWithInvalidOffsets() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+        GenericGroup group = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+            "foo",
+            true
+        );
+        group.setSubscribedTopics(Optional.of(Collections.singleton("bar")));
+        context.commitOffset("foo", "bar", 0, 100L, 0);
+
+        // Delete the offset whose topic partition doesn't exist.
+        testOffsetDeleteWith(context, "foo", "bar1", 0, Errors.NONE);
+        // Delete the offset from the topic that the group is subscribed to.
+        testOffsetDeleteWith(context, "foo", "bar", 0, 
Errors.GROUP_SUBSCRIBED_TO_TOPIC);
+    }
+
+    @Test
+    public void testConsumerGroupOffsetDelete() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+            "foo",
+            true
+        );
+        context.commitOffset("foo", "bar", 0, 100L, 0);
+        assertFalse(group.isSubscribedToTopic("bar"));
+        testOffsetDeleteWith(context, "foo", "bar", 0, Errors.NONE);
+    }
+
+    @Test
+    public void testConsumerGroupOffsetDeleteWithInvalidOffsets() {

Review Comment:
   ditto.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -1561,6 +1604,156 @@ public void 
testConsumerGroupOffsetFetchWithStaleMemberEpoch() {
             () -> context.fetchAllOffsets("group", "member", 10, 
Long.MAX_VALUE));
     }
 
+    static private void testOffsetDeleteWith(
+        OffsetMetadataManagerTestContext context,
+        String groupId,
+        String topic,
+        int partition,
+        Errors error
+    ) {
+        final OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+        requestTopicCollection.add(
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                .setName(topic)
+                .setPartitions(Collections.singletonList(
+                    new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(partition)
+                ))
+        );
+
+        final OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
expectedResponsePartitionCollection =
+            new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+        expectedResponsePartitionCollection.add(
+            new OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+                .setPartitionIndex(partition)
+                .setErrorCode(error.code())
+        );
+        final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
expectedResponseTopicCollection =
+            new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+        expectedResponseTopicCollection.add(
+            new OffsetDeleteResponseData.OffsetDeleteResponseTopic()
+                .setName(topic)
+                .setPartitions(expectedResponsePartitionCollection)
+        );
+
+        final List<Record> expectedRecords = error == Errors.NONE &&
+            context.offsetMetadataManager.offset(groupId, topic, partition) != 
null ?
+            Collections.singletonList(
+                RecordHelpers.newOffsetCommitTombstoneRecord(groupId, topic, 
partition)
+            ) :
+            Collections.emptyList();
+
+        CoordinatorResult<OffsetDeleteResponseData, Record> coordinatorResult 
= context.deleteOffsets(
+            new OffsetDeleteRequestData()
+                .setGroupId(groupId)
+                .setTopics(requestTopicCollection)
+        );
+
+        assertEquals(new 
OffsetDeleteResponseData().setTopics(expectedResponseTopicCollection), 
coordinatorResult.response());
+        assertEquals(expectedRecords, coordinatorResult.records());
+    }
+
+    @Test
+    public void testGenericGroupOffsetDelete() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+        GenericGroup group = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+            "foo",
+            true
+        );
+        context.commitOffset("foo", "bar", 0, 100L, 0);
+        group.setSubscribedTopics(Optional.of(Collections.emptySet()));
+        testOffsetDeleteWith(context, "foo", "bar", 0, Errors.NONE);
+    }
+
+    @Test
+    public void testGenericGroupOffsetDeleteWithInvalidOffsets() {

Review Comment:
   What does `InvalidOffset` mean here?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -1561,6 +1604,156 @@ public void 
testConsumerGroupOffsetFetchWithStaleMemberEpoch() {
             () -> context.fetchAllOffsets("group", "member", 10, 
Long.MAX_VALUE));
     }
 
+    static private void testOffsetDeleteWith(
+        OffsetMetadataManagerTestContext context,
+        String groupId,
+        String topic,
+        int partition,
+        Errors error
+    ) {
+        final OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+        requestTopicCollection.add(
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                .setName(topic)
+                .setPartitions(Collections.singletonList(
+                    new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(partition)
+                ))
+        );
+
+        final OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
expectedResponsePartitionCollection =
+            new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+        expectedResponsePartitionCollection.add(
+            new OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+                .setPartitionIndex(partition)
+                .setErrorCode(error.code())
+        );
+        final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
expectedResponseTopicCollection =
+            new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+        expectedResponseTopicCollection.add(
+            new OffsetDeleteResponseData.OffsetDeleteResponseTopic()
+                .setName(topic)
+                .setPartitions(expectedResponsePartitionCollection)
+        );
+
+        final List<Record> expectedRecords = error == Errors.NONE &&
+            context.offsetMetadataManager.offset(groupId, topic, partition) != 
null ?
+            Collections.singletonList(
+                RecordHelpers.newOffsetCommitTombstoneRecord(groupId, topic, 
partition)
+            ) :
+            Collections.emptyList();
+
+        CoordinatorResult<OffsetDeleteResponseData, Record> coordinatorResult 
= context.deleteOffsets(
+            new OffsetDeleteRequestData()
+                .setGroupId(groupId)
+                .setTopics(requestTopicCollection)
+        );
+
+        assertEquals(new 
OffsetDeleteResponseData().setTopics(expectedResponseTopicCollection), 
coordinatorResult.response());
+        assertEquals(expectedRecords, coordinatorResult.records());
+    }
+
+    @Test
+    public void testGenericGroupOffsetDelete() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+        GenericGroup group = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+            "foo",
+            true
+        );
+        context.commitOffset("foo", "bar", 0, 100L, 0);
+        group.setSubscribedTopics(Optional.of(Collections.emptySet()));
+        testOffsetDeleteWith(context, "foo", "bar", 0, Errors.NONE);
+    }
+
+    @Test
+    public void testGenericGroupOffsetDeleteWithInvalidOffsets() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+        GenericGroup group = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+            "foo",
+            true
+        );
+        group.setSubscribedTopics(Optional.of(Collections.singleton("bar")));
+        context.commitOffset("foo", "bar", 0, 100L, 0);
+
+        // Delete the offset whose topic partition doesn't exist.
+        testOffsetDeleteWith(context, "foo", "bar1", 0, Errors.NONE);
+        // Delete the offset from the topic that the group is subscribed to.
+        testOffsetDeleteWith(context, "foo", "bar", 0, 
Errors.GROUP_SUBSCRIBED_TO_TOPIC);
+    }
+
+    @Test
+    public void testConsumerGroupOffsetDelete() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+            "foo",
+            true
+        );
+        context.commitOffset("foo", "bar", 0, 100L, 0);
+        assertFalse(group.isSubscribedToTopic("bar"));
+        testOffsetDeleteWith(context, "foo", "bar", 0, Errors.NONE);
+    }
+
+    @Test
+    public void testConsumerGroupOffsetDeleteWithInvalidOffsets() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+            "foo",
+            true
+        );
+        MetadataImage image = new 
GroupMetadataManagerTest.MetadataImageBuilder()
+            .addTopic(Uuid.randomUuid(), "foo", 1)
+            .addRacks()
+            .build();
+        ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder("member1")
+            .setSubscribedTopicNames(Collections.singletonList("bar"))
+            .build();
+        group.computeSubscriptionMetadata(
+            null,
+            member1,
+            image.topics(),
+            image.cluster()
+        );
+        group.updateMember(member1);
+        context.commitOffset("foo", "bar", 0, 100L, 0);
+        assertTrue(group.isSubscribedToTopic("bar"));
+
+        // Delete the offset whose topic partition doesn't exist.
+        testOffsetDeleteWith(context, "foo", "bar1", 0, Errors.NONE);
+        // Delete the offset from the topic that the group is subscribed to.
+        testOffsetDeleteWith(context, "foo", "bar", 0, 
Errors.GROUP_SUBSCRIBED_TO_TOPIC);
+    }
+
+    @ParameterizedTest
+    @ValueSource(classes = {GenericGroup.class, ConsumerGroup.class})
+    public void testDeleteGroupAllOffsets(Class groupClass) {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+        Group group = null;
+        if (groupClass == GenericGroup.class) {
+            group = context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+                "foo",
+                true
+            );
+        } else {
+            group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+                "foo",
+                true
+            );
+        }
+        context.commitOffset("foo", "bar-0", 0, 100L, 0);
+        context.commitOffset("foo", "bar-0", 1, 100L, 0);
+        context.commitOffset("foo", "bar-1", 0, 100L, 0);
+
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newOffsetCommitTombstoneRecord("foo", "bar-0", 0),
+            RecordHelpers.newOffsetCommitTombstoneRecord("foo", "bar-0", 1),
+            RecordHelpers.newOffsetCommitTombstoneRecord("foo", "bar-1", 0)
+        );
+        List<Record> records = new ArrayList<>();

Review Comment:
   nit: Could we put an empty line here?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -1561,6 +1604,156 @@ public void 
testConsumerGroupOffsetFetchWithStaleMemberEpoch() {
             () -> context.fetchAllOffsets("group", "member", 10, 
Long.MAX_VALUE));
     }
 
+    static private void testOffsetDeleteWith(
+        OffsetMetadataManagerTestContext context,
+        String groupId,
+        String topic,
+        int partition,
+        Errors error
+    ) {
+        final OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+        requestTopicCollection.add(
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                .setName(topic)
+                .setPartitions(Collections.singletonList(
+                    new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(partition)
+                ))
+        );
+
+        final OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
expectedResponsePartitionCollection =
+            new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+        expectedResponsePartitionCollection.add(
+            new OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+                .setPartitionIndex(partition)
+                .setErrorCode(error.code())
+        );
+        final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
expectedResponseTopicCollection =
+            new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+        expectedResponseTopicCollection.add(
+            new OffsetDeleteResponseData.OffsetDeleteResponseTopic()
+                .setName(topic)
+                .setPartitions(expectedResponsePartitionCollection)
+        );
+
+        final List<Record> expectedRecords = error == Errors.NONE &&
+            context.offsetMetadataManager.offset(groupId, topic, partition) != 
null ?
+            Collections.singletonList(
+                RecordHelpers.newOffsetCommitTombstoneRecord(groupId, topic, 
partition)
+            ) :
+            Collections.emptyList();
+
+        CoordinatorResult<OffsetDeleteResponseData, Record> coordinatorResult 
= context.deleteOffsets(
+            new OffsetDeleteRequestData()
+                .setGroupId(groupId)
+                .setTopics(requestTopicCollection)
+        );
+
+        assertEquals(new 
OffsetDeleteResponseData().setTopics(expectedResponseTopicCollection), 
coordinatorResult.response());
+        assertEquals(expectedRecords, coordinatorResult.records());
+    }
+
+    @Test
+    public void testGenericGroupOffsetDelete() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+        GenericGroup group = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+            "foo",
+            true
+        );
+        context.commitOffset("foo", "bar", 0, 100L, 0);
+        group.setSubscribedTopics(Optional.of(Collections.emptySet()));
+        testOffsetDeleteWith(context, "foo", "bar", 0, Errors.NONE);
+    }
+
+    @Test
+    public void testGenericGroupOffsetDeleteWithInvalidOffsets() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+        GenericGroup group = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+            "foo",
+            true
+        );
+        group.setSubscribedTopics(Optional.of(Collections.singleton("bar")));
+        context.commitOffset("foo", "bar", 0, 100L, 0);
+
+        // Delete the offset whose topic partition doesn't exist.
+        testOffsetDeleteWith(context, "foo", "bar1", 0, Errors.NONE);
+        // Delete the offset from the topic that the group is subscribed to.
+        testOffsetDeleteWith(context, "foo", "bar", 0, 
Errors.GROUP_SUBSCRIBED_TO_TOPIC);
+    }
+
+    @Test
+    public void testConsumerGroupOffsetDelete() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+            "foo",
+            true
+        );
+        context.commitOffset("foo", "bar", 0, 100L, 0);
+        assertFalse(group.isSubscribedToTopic("bar"));
+        testOffsetDeleteWith(context, "foo", "bar", 0, Errors.NONE);
+    }
+
+    @Test
+    public void testConsumerGroupOffsetDeleteWithInvalidOffsets() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+        ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+            "foo",
+            true
+        );
+        MetadataImage image = new 
GroupMetadataManagerTest.MetadataImageBuilder()
+            .addTopic(Uuid.randomUuid(), "foo", 1)
+            .addRacks()
+            .build();
+        ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder("member1")
+            .setSubscribedTopicNames(Collections.singletonList("bar"))
+            .build();
+        group.computeSubscriptionMetadata(
+            null,
+            member1,
+            image.topics(),
+            image.cluster()
+        );
+        group.updateMember(member1);
+        context.commitOffset("foo", "bar", 0, 100L, 0);
+        assertTrue(group.isSubscribedToTopic("bar"));
+
+        // Delete the offset whose topic partition doesn't exist.
+        testOffsetDeleteWith(context, "foo", "bar1", 0, Errors.NONE);
+        // Delete the offset from the topic that the group is subscribed to.
+        testOffsetDeleteWith(context, "foo", "bar", 0, 
Errors.GROUP_SUBSCRIBED_TO_TOPIC);
+    }
+
+    @ParameterizedTest
+    @ValueSource(classes = {GenericGroup.class, ConsumerGroup.class})

Review Comment:
   Could we use `GroupType` instead? Then you could use a switch based on the 
enum.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -262,6 +267,44 @@ public HeartbeatResponseData genericGroupHeartbeat(
         );
     }
 
+    /**
+     * Handles a DeleteGroups request.
+     *
+     * @param context   The request context.
+     * @param groupIds  The groupIds of the groups to be deleted
+     * @return A Result containing the 
DeleteGroupsResponseData.DeletableGroupResultCollection response and
+     *         a list of records to update the state machine.
+     */
+    public 
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> deleteGroups(
+        RequestContext context,
+        List<String> groupIds
+    ) throws ApiException {
+        final DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        final List<Record> records = new ArrayList<>();
+
+        groupIds.forEach(groupId -> {
+            try {
+                groupMetadataManager.validateGroupDelete(groupId);
+                offsetMetadataManager.deleteAllOffsets(groupId, records);
+                groupMetadataManager.deleteGroup(groupId, records);
+
+                resultCollection.add(
+                    new DeleteGroupsResponseData.DeletableGroupResult()
+                        .setGroupId(groupId)
+                );
+            } catch (ApiException exception) {
+                resultCollection.add(
+                    new DeleteGroupsResponseData.DeletableGroupResult()
+                        .setGroupId(groupId)
+                        .setErrorCode(Errors.forException(exception).code())
+                );
+            }
+        });
+
+        return new CoordinatorResult<>(records, resultCollection);

Review Comment:
   Good question. In my opinion, this log line is useful for the expiration 
case. I am not sure if it really is in this one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to