jeffkbkim commented on code in PR #14408:
URL: https://github.com/apache/kafka/pull/14408#discussion_r1339019021


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##########
@@ -90,4 +92,29 @@ void validateOffsetFetch(
         int memberEpoch,
         long lastCommittedOffset
     ) throws KafkaException;
+
+    /**
+     * Validates the OffsetDelete request.
+     */
+    void validateOffsetDelete() throws KafkaException;
+
+    /**
+     * Validates the GroupDelete request.
+     */
+    void validateGroupDelete() throws KafkaException;

Review Comment:
   should these be DeleteGroup?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##########
@@ -849,6 +853,43 @@ public void validateOffsetFetch(
         }
     }
 
+    /**
+     * Validates the OffsetDelete request.
+     */
+    @Override
+    public void validateOffsetDelete() throws ApiException {
+        if (isInState(DEAD)) {
+            throw new GroupIdNotFoundException(String.format("Group %s is in 
dead state.", groupId));
+        } else if (isInState(STABLE)
+            || isInState(PREPARING_REBALANCE)
+            || isInState(COMPLETING_REBALANCE)) {
+            throw Errors.NON_EMPTY_GROUP.exception();
+        }
+    }
+
+    /**
+     * Validates the GroupDelete request.
+     */
+    @Override
+    public void validateGroupDelete() throws ApiException {
+        if (isInState(DEAD)) {
+            throw new GroupIdNotFoundException(String.format("Group %s is in 
dead state.", groupId));
+        } else if (isInState(STABLE)
+            || isInState(PREPARING_REBALANCE)
+            || isInState(COMPLETING_REBALANCE)) {
+            throw Errors.NON_EMPTY_GROUP.exception();
+        }

Review Comment:
   in the existing implementation, we transition to DEAD if the group is empty 
so that even if the write operation fails we delete the group in the next purge 
cycle.
   
   we don't need to do this here since if the write operation fails we revert 
to the previous state and return an error so the client knows that the 
operation failed. is this correct?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -936,4 +938,213 @@ public void 
testLeaveGroupThrowsUnknownMemberIdException() throws Exception {
 
         assertEquals(expectedResponse, future.get());
     }
+
+    @Test
+    public void testDeleteOffsets() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+        requestTopicCollection.add(
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                .setName("topic")
+                .setPartitions(Collections.singletonList(
+                    new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                ))
+        );
+        OffsetDeleteRequestData request = new 
OffsetDeleteRequestData().setGroupId("group")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
responsePartitionCollection =
+            new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+        responsePartitionCollection.add(
+            new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(0)
+        );
+        OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =
+            new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+        responseTopicCollection.add(
+            new 
OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection)
+        );
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setTopics(responseTopicCollection);
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offset"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(response));
+
+        CompletableFuture<OffsetDeleteResponseData> future = 
service.deleteOffsets(
+            requestContext(ApiKeys.OFFSET_DELETE),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(response, future.get());
+    }
+
+    @Test
+    public void testDeleteOffsetsInvalidGroupId() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+        requestTopicCollection.add(
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                .setName("topic")
+                .setPartitions(Collections.singletonList(
+                    new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                ))
+        );
+        OffsetDeleteRequestData request = new 
OffsetDeleteRequestData().setGroupId("")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setErrorCode(Errors.INVALID_GROUP_ID.code());
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offset"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(response));
+
+        CompletableFuture<OffsetDeleteResponseData> future = 
service.deleteOffsets(
+            requestContext(ApiKeys.OFFSET_DELETE),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(response, future.get());
+    }
+
+    @Test
+    public void testDeleteOffsetsCoordinatorNotAvailableException() throws 
Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+        requestTopicCollection.add(
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                .setName("topic")
+                .setPartitions(Collections.singletonList(
+                    new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                ))
+        );
+        OffsetDeleteRequestData request = new 
OffsetDeleteRequestData().setGroupId("group")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offset"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(FutureUtils.failedFuture(
+            new CoordinatorLoadInProgressException(null)
+        ));
+
+        CompletableFuture<OffsetDeleteResponseData> future = 
service.deleteOffsets(
+            requestContext(ApiKeys.OFFSET_DELETE),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(response, future.get());
+    }
+
+    @Test
+    public void testDeleteGroups() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 3);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection1 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        DeleteGroupsResponseData.DeletableGroupResult result1 =
+            new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-1");

Review Comment:
   can we follow the same line break as in L1101-1102? same for result2



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -333,6 +348,80 @@ public CoordinatorResult<OffsetCommitResponseData, Record> 
commitOffset(
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handles an OffsetDelete request.
+     *
+     * @param request The OffsetDelete request.
+     *
+     * @return A Result containing the OffsetDeleteResponseData response and
+     *         a list of records to update the state machine.
+     */
+    public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+        OffsetDeleteRequestData request
+    ) throws ApiException {
+        final Group group = validateOffsetDelete(request);
+        final List<Record> records = new ArrayList<>();
+        final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =
+            new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+        final OffsetDeleteResponseData response = new 
OffsetDeleteResponseData();
+        final TimelineHashMap<String, TimelineHashMap<Integer, 
OffsetAndMetadata>> offsetsByTopic =
+            offsetsByGroup.get(request.groupId());
+
+        request.topics().forEach(topic -> {
+            final 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
responsePartitionCollection =
+                new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+            final TimelineHashMap<Integer, OffsetAndMetadata> 
offsetsByPartition = offsetsByTopic == null ?
+                null : offsetsByTopic.get(topic.name());
+
+            topic.partitions().forEach(partition -> {
+                final OffsetDeleteResponseData.OffsetDeleteResponsePartition 
responsePartition =
+                    new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(partition.partitionIndex());
+                if (group.isSubscribedToTopic(topic.name())) {
+                    
responsePartition.setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code());
+                } else if (offsetsByPartition != null && 
offsetsByPartition.containsKey(partition.partitionIndex())) {
+                    records.add(RecordHelpers.newOffsetCommitTombstoneRecord(
+                        request.groupId(),
+                        topic.name(),
+                        partition.partitionIndex()
+                    ));
+                }
+                responsePartitionCollection.add(responsePartition);
+            });
+
+            final OffsetDeleteResponseData.OffsetDeleteResponseTopic 
responseTopic =
+                new OffsetDeleteResponseData.OffsetDeleteResponseTopic()
+                    .setName(topic.name())
+                    .setPartitions(responsePartitionCollection);
+            responseTopicCollection.add(responseTopic);
+        });
+        response.setTopics(responseTopicCollection);
+
+        return new CoordinatorResult<>(records, response);
+    }
+
+    /**
+     * Deletes offsets as part of a DeleteGroups request.
+     * Populates the record list passed in with records to update the state 
machine.
+     * Validations are done in deleteGroups method in GroupCoordinatorShard.

Review Comment:
   Validations are done in `{@link 
GroupCoordinatorShard#deleteGroups(RequestContext, List)}`



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##########
@@ -90,4 +92,29 @@ void validateOffsetFetch(
         int memberEpoch,
         long lastCommittedOffset
     ) throws KafkaException;
+
+    /**
+     * Validates the OffsetDelete request.
+     */
+    void validateOffsetDelete() throws KafkaException;
+
+    /**
+     * Validates the GroupDelete request.
+     */
+    void validateGroupDelete() throws KafkaException;
+
+    /**
+     * Returns true if the group is actively subscribed to the topic.
+     *
+     * @param topic The topic name.
+     * @return whether the group is subscribed to the topic.

Review Comment:
   Whether



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -523,9 +526,78 @@ public 
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
             return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
         }
 
-        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-            "This API is not implemented yet."
-        ));
+        final 
List<CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>>
 futures =
+            new ArrayList<>(groupIds.size());
+
+        final Map<TopicPartition, List<String>> groupsByTopicPartition = new 
HashMap<>();
+        groupIds.forEach(groupId -> {
+            // For backwards compatibility, we support DeleteGroups for the 
empty group id.
+            if (groupId == null) {
+                
futures.add(CompletableFuture.completedFuture(DeleteGroupsRequest.getErrorResultCollection(
+                    Collections.singletonList(null),
+                    Errors.INVALID_GROUP_ID
+                )));
+            } else {
+                final TopicPartition topicPartition = 
topicPartitionFor(groupId);
+                groupsByTopicPartition
+                    .computeIfAbsent(topicPartition, __ -> new ArrayList<>())
+                    .add(groupId);
+            }
+        });
+
+        groupsByTopicPartition.forEach((topicPartition, groupList) -> {
+            
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
+                runtime.scheduleWriteOperation(
+                    "delete-group",

Review Comment:
   should this be "delete-groups"?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -262,6 +267,44 @@ public HeartbeatResponseData genericGroupHeartbeat(
         );
     }
 
+    /**
+     * Handles a DeleteGroups request.
+     *
+     * @param context   The request context.
+     * @param groupIds  The groupIds of the groups to be deleted
+     * @return A Result containing the 
DeleteGroupsResponseData.DeletableGroupResultCollection response and
+     *         a list of records to update the state machine.
+     */
+    public 
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> deleteGroups(
+        RequestContext context,
+        List<String> groupIds
+    ) throws ApiException {
+        final DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        final List<Record> records = new ArrayList<>();
+
+        groupIds.forEach(groupId -> {
+            try {
+                groupMetadataManager.validateGroupDelete(groupId);
+                offsetMetadataManager.deleteAllOffsets(groupId, records);
+                groupMetadataManager.deleteGroup(groupId, records);
+
+                resultCollection.add(
+                    new DeleteGroupsResponseData.DeletableGroupResult()
+                        .setGroupId(groupId)
+                );
+            } catch (ApiException exception) {
+                resultCollection.add(
+                    new DeleteGroupsResponseData.DeletableGroupResult()
+                        .setGroupId(groupId)
+                        .setErrorCode(Errors.forException(exception).code())
+                );
+            }
+        });
+
+        return new CoordinatorResult<>(records, resultCollection);

Review Comment:
   the current implementation logs how many groups and offsets were removed. 
should we add something similar?
   
   ```
         info(s"The following groups were deleted: 
${groupsEligibleForDeletion.map(_.groupId).mkString(", ")}. " +
           s"A total of $offsetsRemoved offsets were removed.")
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3071,6 +3071,31 @@ private void removeCurrentMemberFromGenericGroup(
         group.remove(member.memberId());
     }
 
+    /**
+     * Handles a GroupDelete request.
+     * Populates the record list passed in with record to update the state 
machine.
+     * Validations are done in deleteGroups method in GroupCoordinatorShard.

Review Comment:
   ditto on link
   
   can we add a comment on why we don't expect an exception to be thrown here?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3071,6 +3071,31 @@ private void removeCurrentMemberFromGenericGroup(
         group.remove(member.memberId());
     }
 
+    /**
+     * Handles a GroupDelete request.
+     * Populates the record list passed in with record to update the state 
machine.
+     * Validations are done in deleteGroups method in GroupCoordinatorShard.
+     *
+     * @param groupId The group id of the group to be deleted.
+     * @param records The record list to populate.
+     */
+    public void deleteGroup(
+        String groupId,
+        List<Record> records
+    ) {
+        records.addAll(group(groupId).createGroupTombstoneRecords());
+    }
+
+    /**
+     * Validates the GroupDelete request.
+     *
+     * @param groupId The group id of the group to be deleted.

Review Comment:
   The id of the group to be deleted.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -333,6 +348,80 @@ public CoordinatorResult<OffsetCommitResponseData, Record> 
commitOffset(
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handles an OffsetDelete request.
+     *
+     * @param request The OffsetDelete request.
+     *
+     * @return A Result containing the OffsetDeleteResponseData response and
+     *         a list of records to update the state machine.
+     */
+    public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+        OffsetDeleteRequestData request
+    ) throws ApiException {
+        final Group group = validateOffsetDelete(request);
+        final List<Record> records = new ArrayList<>();
+        final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =
+            new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+        final OffsetDeleteResponseData response = new 
OffsetDeleteResponseData();
+        final TimelineHashMap<String, TimelineHashMap<Integer, 
OffsetAndMetadata>> offsetsByTopic =
+            offsetsByGroup.get(request.groupId());
+
+        request.topics().forEach(topic -> {
+            final 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
responsePartitionCollection =
+                new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+            final TimelineHashMap<Integer, OffsetAndMetadata> 
offsetsByPartition = offsetsByTopic == null ?
+                null : offsetsByTopic.get(topic.name());
+
+            topic.partitions().forEach(partition -> {
+                final OffsetDeleteResponseData.OffsetDeleteResponsePartition 
responsePartition =
+                    new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(partition.partitionIndex());
+                if (group.isSubscribedToTopic(topic.name())) {

Review Comment:
   can we move this check outside of the forEach block? we perform this check 
for every partition of the topic



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##########
@@ -849,6 +853,43 @@ public void validateOffsetFetch(
         }
     }
 
+    /**
+     * Validates the OffsetDelete request.
+     */
+    @Override
+    public void validateOffsetDelete() throws ApiException {
+        if (isInState(DEAD)) {
+            throw new GroupIdNotFoundException(String.format("Group %s is in 
dead state.", groupId));
+        } else if (isInState(STABLE)

Review Comment:
   don't we also need to check whether the stable group is using the 
ConsumerProtocol.PROTOCOL_TYPE?
   from
   ```
                   case PreparingRebalance | CompletingRebalance | Stable if 
group.isConsumerGroup =>
   ```
   



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -936,4 +938,213 @@ public void 
testLeaveGroupThrowsUnknownMemberIdException() throws Exception {
 
         assertEquals(expectedResponse, future.get());
     }
+
+    @Test
+    public void testDeleteOffsets() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+        requestTopicCollection.add(
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                .setName("topic")
+                .setPartitions(Collections.singletonList(
+                    new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                ))
+        );
+        OffsetDeleteRequestData request = new 
OffsetDeleteRequestData().setGroupId("group")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
responsePartitionCollection =
+            new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+        responsePartitionCollection.add(
+            new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(0)
+        );
+        OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =
+            new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+        responseTopicCollection.add(
+            new 
OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection)
+        );
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setTopics(responseTopicCollection);
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offset"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(response));
+
+        CompletableFuture<OffsetDeleteResponseData> future = 
service.deleteOffsets(
+            requestContext(ApiKeys.OFFSET_DELETE),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(response, future.get());
+    }
+
+    @Test
+    public void testDeleteOffsetsInvalidGroupId() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+        requestTopicCollection.add(
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                .setName("topic")
+                .setPartitions(Collections.singletonList(
+                    new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                ))
+        );
+        OffsetDeleteRequestData request = new 
OffsetDeleteRequestData().setGroupId("")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setErrorCode(Errors.INVALID_GROUP_ID.code());
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offset"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(response));
+
+        CompletableFuture<OffsetDeleteResponseData> future = 
service.deleteOffsets(
+            requestContext(ApiKeys.OFFSET_DELETE),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(response, future.get());
+    }
+
+    @Test
+    public void testDeleteOffsetsCoordinatorNotAvailableException() throws 
Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+        requestTopicCollection.add(
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                .setName("topic")
+                .setPartitions(Collections.singletonList(
+                    new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                ))
+        );
+        OffsetDeleteRequestData request = new 
OffsetDeleteRequestData().setGroupId("group")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offset"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(FutureUtils.failedFuture(
+            new CoordinatorLoadInProgressException(null)
+        ));
+
+        CompletableFuture<OffsetDeleteResponseData> future = 
service.deleteOffsets(
+            requestContext(ApiKeys.OFFSET_DELETE),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(response, future.get());
+    }
+
+    @Test
+    public void testDeleteGroups() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 3);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection1 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        DeleteGroupsResponseData.DeletableGroupResult result1 =
+            new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-1");
+        resultCollection1.add(result1);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection2 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        DeleteGroupsResponseData.DeletableGroupResult result2 =
+            new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-2");
+        resultCollection2.add(result2);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection3 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();

Review Comment:
   do we need this?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -936,4 +938,213 @@ public void 
testLeaveGroupThrowsUnknownMemberIdException() throws Exception {
 
         assertEquals(expectedResponse, future.get());
     }
+
+    @Test
+    public void testDeleteOffsets() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+        requestTopicCollection.add(
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                .setName("topic")
+                .setPartitions(Collections.singletonList(
+                    new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                ))
+        );
+        OffsetDeleteRequestData request = new 
OffsetDeleteRequestData().setGroupId("group")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
responsePartitionCollection =
+            new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+        responsePartitionCollection.add(
+            new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(0)
+        );
+        OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =
+            new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+        responseTopicCollection.add(
+            new 
OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection)
+        );
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setTopics(responseTopicCollection);
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offset"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(response));
+
+        CompletableFuture<OffsetDeleteResponseData> future = 
service.deleteOffsets(
+            requestContext(ApiKeys.OFFSET_DELETE),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(response, future.get());
+    }
+
+    @Test
+    public void testDeleteOffsetsInvalidGroupId() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+        requestTopicCollection.add(
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                .setName("topic")
+                .setPartitions(Collections.singletonList(
+                    new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                ))
+        );
+        OffsetDeleteRequestData request = new 
OffsetDeleteRequestData().setGroupId("")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setErrorCode(Errors.INVALID_GROUP_ID.code());
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offset"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(response));
+
+        CompletableFuture<OffsetDeleteResponseData> future = 
service.deleteOffsets(
+            requestContext(ApiKeys.OFFSET_DELETE),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(response, future.get());
+    }
+
+    @Test
+    public void testDeleteOffsetsCoordinatorNotAvailableException() throws 
Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 1);
+
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection 
requestTopicCollection =
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection();
+        requestTopicCollection.add(
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                .setName("topic")
+                .setPartitions(Collections.singletonList(
+                    new 
OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
+                ))
+        );
+        OffsetDeleteRequestData request = new 
OffsetDeleteRequestData().setGroupId("group")
+            .setTopics(requestTopicCollection);
+
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData()
+            .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-offset"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(FutureUtils.failedFuture(
+            new CoordinatorLoadInProgressException(null)
+        ));
+
+        CompletableFuture<OffsetDeleteResponseData> future = 
service.deleteOffsets(
+            requestContext(ApiKeys.OFFSET_DELETE),
+            request,
+            BufferSupplier.NO_CACHING
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(response, future.get());
+    }
+
+    @Test
+    public void testDeleteGroups() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        service.startup(() -> 3);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection1 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        DeleteGroupsResponseData.DeletableGroupResult result1 =
+            new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-1");
+        resultCollection1.add(result1);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection2 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        DeleteGroupsResponseData.DeletableGroupResult result2 =
+            new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-2");
+        resultCollection2.add(result2);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection3 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        DeleteGroupsResponseData.DeletableGroupResult result3 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("group-id-3")
+            .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());
+        resultCollection3.add(result3);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        expectedResultCollection.addAll(Arrays.asList(
+            new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId(null).setErrorCode(Errors.INVALID_GROUP_ID.code()),
+            result1.duplicate(),
+            result2.duplicate(),
+            result3.duplicate()
+        ));
+
+        when(runtime.partitions()).thenReturn(Sets.newSet(
+            new TopicPartition("__consumer_offsets", 0),
+            new TopicPartition("__consumer_offsets", 1),
+            new TopicPartition("__consumer_offsets", 2)
+        ));
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-group"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 2)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(resultCollection1));
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-group"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenAnswer(invocation -> {
+            Thread.sleep(1000);

Review Comment:
   in general, it's not a good practice to use Thread.sleep in tests. also, i 
don't think this tests what we actually want to test.
   
   We want to confirm that the final future is not completed until this 
operation completes. So i propose:
   1. have this thread wait
   2. confirm future did not complete
   3. unblock this thread
   4. confirm future completes
   
   something like the following:
   ```
           CountDownLatch latch = new CountDownLatch(1);
           ...
   
           when(runtime.scheduleWriteOperation(
               ArgumentMatchers.eq("delete-group"),
               ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
               ArgumentMatchers.any()
           )).thenAnswer(invocation -> CompletableFuture.supplyAsync(() -> {
               try {
                   assertTrue(latch.await(5, TimeUnit.SECONDS));
               } catch (InterruptedException ignored) {}
               return resultCollection2;
           }));
   
           when(runtime.scheduleWriteOperation(
               ArgumentMatchers.eq("delete-group"),
               ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 1)),
               ArgumentMatchers.any()
           )).thenReturn(FutureUtils.failedFuture(new 
CoordinatorLoadInProgressException(null)));
   
           List<String> groupIds = Arrays.asList("group-id-1", "group-id-2", 
"group-id-3", null);
           
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
               service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS), 
groupIds, BufferSupplier.NO_CACHING);
   
           assertFalse(future.isDone());
           latch.countDown();
   
           TestUtils.waitForCondition(future::isDone, "The future did not 
complete.");
           assertTrue(expectedResultCollection.containsAll(future.get()));
           assertTrue(future.get().containsAll(expectedResultCollection));
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -705,9 +777,39 @@ public CompletableFuture<OffsetDeleteResponseData> 
deleteOffsets(
             return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
         }
 
-        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-            "This API is not implemented yet."
-        ));
+        if (!isGroupIdNotEmpty(request.groupId())) {
+            return CompletableFuture.completedFuture(new 
OffsetDeleteResponseData()
+                .setErrorCode(Errors.INVALID_GROUP_ID.code())
+            );
+        }
+
+        return runtime.scheduleWriteOperation(
+            "delete-offset",

Review Comment:
   should this be "delete-offsets"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to