jeffkbkim commented on code in PR #14408:
URL: https://github.com/apache/kafka/pull/14408#discussion_r1333554676


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -523,9 +526,63 @@ public 
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
             return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
         }
 
-        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-            "This API is not implemented yet."
-        ));
+        final Map<TopicPartition, List<String>> groupsByTopicPartition = new 
HashMap<>();
+        groupIds.forEach(groupId -> {
+            final TopicPartition topicPartition = topicPartitionFor(groupId);
+            groupsByTopicPartition
+                .computeIfAbsent(topicPartition, __ -> new ArrayList<>())
+                .add(groupId);
+        });
+
+        final 
List<CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>>
 futures =
+            new ArrayList<>(groupIds.size());
+        groupsByTopicPartition.forEach((topicPartition, groupList) -> {
+            
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
+                runtime.scheduleWriteOperation(
+                    "delete-group",
+                    topicPartition,
+                    coordinator -> coordinator.deleteGroups(context, groupList)
+                ).exceptionally(exception -> {
+                    if (exception instanceof UnknownTopicOrPartitionException 
||
+                        exception instanceof NotEnoughReplicasException) {
+                        return DeleteGroupsRequest.getErrorResultCollection(
+                            groupIds,
+                            Errors.COORDINATOR_NOT_AVAILABLE
+                        );
+                    }
+
+                    if (exception instanceof NotLeaderOrFollowerException ||
+                        exception instanceof KafkaStorageException) {
+                        return DeleteGroupsRequest.getErrorResultCollection(
+                            groupIds,
+                            Errors.NOT_COORDINATOR
+                        );
+                    }
+
+                    if (exception instanceof RecordTooLargeException ||
+                        exception instanceof RecordBatchTooLargeException ||
+                        exception instanceof InvalidFetchSizeException) {
+                        return DeleteGroupsRequest.getErrorResultCollection(
+                            groupIds,
+                            Errors.UNKNOWN_SERVER_ERROR
+                        );
+                    }
+
+                    return DeleteGroupsRequest.getErrorResultCollection(
+                        groupIds,
+                        Errors.forException(exception)
+                    );
+                });
+
+            futures.add(future);
+        });
+
+        final CompletableFuture<Void> allFutures = 
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

Review Comment:
   let's say that for some of the topic partitions, the deleteGroups write 
operations were successful. For others, let's say that there was a timeout. 
This would return a request timeout to the clients indicating that the request 
failed. I think this is fine, but it could be confusing to the user. @dajac 
what are your thoughts?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -333,6 +348,76 @@ public CoordinatorResult<OffsetCommitResponseData, Record> 
commitOffset(
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handles an OffsetDelete request.
+     *
+     * @param request The OffsetDelete request.
+     *
+     * @return A Result containing the OffsetDeleteResponseData response and
+     *         a list of records to update the state machine.
+     */
+    public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+        OffsetDeleteRequestData request
+    ) throws ApiException {
+        final Group group = validateOffsetDelete(request);
+        final List<Record> records = new ArrayList<>();
+        final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =
+            new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+        final OffsetDeleteResponseData response = new 
OffsetDeleteResponseData();
+        final TimelineHashMap<String, TimelineHashMap<Integer, 
OffsetAndMetadata>> offsetsByTopic =
+            offsetsByGroup.get(request.groupId());
+
+        request.topics().forEach(topic -> {
+            final 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
responsePartitionCollection =
+                new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+            final boolean subscribedToTopic = 
group.isSubscribedToTopic(topic.name());
+            final TimelineHashMap<Integer, OffsetAndMetadata> 
offsetsByPartition = offsetsByTopic.get(topic);

Review Comment:
   should it be `offsetsByTopic.get(topic.name())`?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -333,6 +348,76 @@ public CoordinatorResult<OffsetCommitResponseData, Record> 
commitOffset(
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handles an OffsetDelete request.
+     *
+     * @param request The OffsetDelete request.
+     *
+     * @return A Result containing the OffsetDeleteResponseData response and
+     *         a list of records to update the state machine.
+     */
+    public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+        OffsetDeleteRequestData request
+    ) throws ApiException {
+        final Group group = validateOffsetDelete(request);
+        final List<Record> records = new ArrayList<>();
+        final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =
+            new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+        final OffsetDeleteResponseData response = new 
OffsetDeleteResponseData();
+        final TimelineHashMap<String, TimelineHashMap<Integer, 
OffsetAndMetadata>> offsetsByTopic =
+            offsetsByGroup.get(request.groupId());
+
+        request.topics().forEach(topic -> {
+            final 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
responsePartitionCollection =
+                new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+            final boolean subscribedToTopic = 
group.isSubscribedToTopic(topic.name());
+            final TimelineHashMap<Integer, OffsetAndMetadata> 
offsetsByPartition = offsetsByTopic.get(topic);
+
+            topic.partitions().forEach(partition -> {
+                final OffsetDeleteResponseData.OffsetDeleteResponsePartition 
responsePartition =
+                    new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(partition.partitionIndex());
+                if (subscribedToTopic) {
+                    
responsePartition.setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code());
+                } else if (offsetsByPartition != null && 
offsetsByPartition.containsKey(partition)) {
+                    records.add(RecordHelpers.newOffsetCommitTombstoneRecord(
+                        request.groupId(),
+                        topic.name(),
+                        partition.partitionIndex()
+                    ));
+                }
+                responsePartitionCollection.add(responsePartition);
+            });
+
+            final OffsetDeleteResponseData.OffsetDeleteResponseTopic 
responseTopic =
+                new 
OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection);
+            responseTopicCollection.add(responseTopic);
+        });
+        response.setTopics(responseTopicCollection);
+
+        return new CoordinatorResult<>(records, response);
+    }
+
+    /**
+     * Handles an GroupDelete request.

Review Comment:
   nit: Handles "a"
   
   maybe we can reword this to "Deletes offsets as part of a DeleteGroups 
request."



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -333,6 +348,76 @@ public CoordinatorResult<OffsetCommitResponseData, Record> 
commitOffset(
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handles an OffsetDelete request.
+     *
+     * @param request The OffsetDelete request.
+     *
+     * @return A Result containing the OffsetDeleteResponseData response and
+     *         a list of records to update the state machine.
+     */
+    public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+        OffsetDeleteRequestData request
+    ) throws ApiException {
+        final Group group = validateOffsetDelete(request);
+        final List<Record> records = new ArrayList<>();
+        final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =
+            new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+        final OffsetDeleteResponseData response = new 
OffsetDeleteResponseData();
+        final TimelineHashMap<String, TimelineHashMap<Integer, 
OffsetAndMetadata>> offsetsByTopic =
+            offsetsByGroup.get(request.groupId());
+
+        request.topics().forEach(topic -> {
+            final 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
responsePartitionCollection =
+                new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+            final boolean subscribedToTopic = 
group.isSubscribedToTopic(topic.name());
+            final TimelineHashMap<Integer, OffsetAndMetadata> 
offsetsByPartition = offsetsByTopic.get(topic);
+
+            topic.partitions().forEach(partition -> {
+                final OffsetDeleteResponseData.OffsetDeleteResponsePartition 
responsePartition =
+                    new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(partition.partitionIndex());
+                if (subscribedToTopic) {
+                    
responsePartition.setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code());
+                } else if (offsetsByPartition != null && 
offsetsByPartition.containsKey(partition)) {

Review Comment:
   should this be `containsKey(partition.partitionIndex())`?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -333,6 +348,76 @@ public CoordinatorResult<OffsetCommitResponseData, Record> 
commitOffset(
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handles an OffsetDelete request.
+     *
+     * @param request The OffsetDelete request.
+     *
+     * @return A Result containing the OffsetDeleteResponseData response and
+     *         a list of records to update the state machine.
+     */
+    public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+        OffsetDeleteRequestData request
+    ) throws ApiException {
+        final Group group = validateOffsetDelete(request);
+        final List<Record> records = new ArrayList<>();
+        final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =
+            new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+        final OffsetDeleteResponseData response = new 
OffsetDeleteResponseData();
+        final TimelineHashMap<String, TimelineHashMap<Integer, 
OffsetAndMetadata>> offsetsByTopic =
+            offsetsByGroup.get(request.groupId());
+
+        request.topics().forEach(topic -> {
+            final 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
responsePartitionCollection =
+                new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+            final boolean subscribedToTopic = 
group.isSubscribedToTopic(topic.name());
+            final TimelineHashMap<Integer, OffsetAndMetadata> 
offsetsByPartition = offsetsByTopic.get(topic);
+
+            topic.partitions().forEach(partition -> {
+                final OffsetDeleteResponseData.OffsetDeleteResponsePartition 
responsePartition =
+                    new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(partition.partitionIndex());
+                if (subscribedToTopic) {
+                    
responsePartition.setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code());
+                } else if (offsetsByPartition != null && 
offsetsByPartition.containsKey(partition)) {
+                    records.add(RecordHelpers.newOffsetCommitTombstoneRecord(
+                        request.groupId(),
+                        topic.name(),
+                        partition.partitionIndex()
+                    ));
+                }
+                responsePartitionCollection.add(responsePartition);
+            });
+
+            final OffsetDeleteResponseData.OffsetDeleteResponseTopic 
responseTopic =
+                new 
OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection);
+            responseTopicCollection.add(responseTopic);
+        });
+        response.setTopics(responseTopicCollection);
+
+        return new CoordinatorResult<>(records, response);
+    }
+
+    /**
+     * Handles an GroupDelete request.
+     * Populates the record list passed in with records to update the state 
machine.
+     * Validations are done in deleteGroups method in GroupCoordinatorShard.
+     *
+     * @param groupId The group id of the given group.
+     * @param records The record list to populate.
+     */
+    public void deleteAllOffsets(
+        String groupId,
+        List<Record> records
+    ) throws ApiException {
+        TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> 
offsetsByTopic = offsetsByGroup.get(groupId);
+
+        offsetsByTopic.forEach((topic, offsetsByPartition) -> {
+            offsetsByPartition.forEach((partition, offsetAndMetadata) -> {

Review Comment:
   nit: `(partition, __) ->`



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##########
@@ -849,6 +854,39 @@ public void validateOffsetFetch(
         }
     }
 
+    /**
+     * Validates the OffsetDelete request.
+     */
+    @Override
+    public void validateOffsetDelete() throws GroupIdNotFoundException {
+        if (isInState(DEAD)) {
+            throw new GroupIdNotFoundException(String.format("Group %s is in 
dead state.", groupId));
+        }
+    }
+
+    /**
+     * Validates the GroupDelete request.
+     */
+    @Override
+    public void validateGroupDelete() throws ApiException {
+        if (isInState(DEAD)) {
+            throw new GroupIdNotFoundException(String.format("Group %s is in 
dead state.", groupId));
+        } else if (isInState(STABLE)
+            || isInState(PREPARING_REBALANCE)
+            || isInState(COMPLETING_REBALANCE)) {
+            throw Errors.NON_EMPTY_GROUP.exception();
+        }
+    }
+
+    /**
+     * Creates tombstone(s) for deleting the group.
+     *
+     * @return The list of tombstone record(s).
+     */
+    public List<Record> createMetadataTombstoneRecords() {
+        return 
Arrays.asList(RecordHelpers.newGroupMetadataTombstoneRecord(groupId()));

Review Comment:
   we can use Collections.singletonList()



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -333,6 +348,76 @@ public CoordinatorResult<OffsetCommitResponseData, Record> 
commitOffset(
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handles an OffsetDelete request.
+     *
+     * @param request The OffsetDelete request.
+     *
+     * @return A Result containing the OffsetDeleteResponseData response and
+     *         a list of records to update the state machine.
+     */
+    public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+        OffsetDeleteRequestData request
+    ) throws ApiException {
+        final Group group = validateOffsetDelete(request);
+        final List<Record> records = new ArrayList<>();
+        final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =
+            new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+        final OffsetDeleteResponseData response = new 
OffsetDeleteResponseData();
+        final TimelineHashMap<String, TimelineHashMap<Integer, 
OffsetAndMetadata>> offsetsByTopic =
+            offsetsByGroup.get(request.groupId());
+
+        request.topics().forEach(topic -> {
+            final 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
responsePartitionCollection =
+                new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+            final boolean subscribedToTopic = 
group.isSubscribedToTopic(topic.name());
+            final TimelineHashMap<Integer, OffsetAndMetadata> 
offsetsByPartition = offsetsByTopic.get(topic);
+
+            topic.partitions().forEach(partition -> {
+                final OffsetDeleteResponseData.OffsetDeleteResponsePartition 
responsePartition =
+                    new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(partition.partitionIndex());
+                if (subscribedToTopic) {
+                    
responsePartition.setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code());
+                } else if (offsetsByPartition != null && 
offsetsByPartition.containsKey(partition)) {
+                    records.add(RecordHelpers.newOffsetCommitTombstoneRecord(
+                        request.groupId(),
+                        topic.name(),
+                        partition.partitionIndex()
+                    ));
+                }
+                responsePartitionCollection.add(responsePartition);
+            });
+
+            final OffsetDeleteResponseData.OffsetDeleteResponseTopic 
responseTopic =
+                new 
OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection);
+            responseTopicCollection.add(responseTopic);
+        });
+        response.setTopics(responseTopicCollection);
+
+        return new CoordinatorResult<>(records, response);
+    }
+
+    /**
+     * Handles an GroupDelete request.
+     * Populates the record list passed in with records to update the state 
machine.
+     * Validations are done in deleteGroups method in GroupCoordinatorShard.
+     *
+     * @param groupId The group id of the given group.
+     * @param records The record list to populate.
+     */
+    public void deleteAllOffsets(
+        String groupId,
+        List<Record> records
+    ) throws ApiException {
+        TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> 
offsetsByTopic = offsetsByGroup.get(groupId);

Review Comment:
   this can be null right? if there are no offsets for the given group id



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to