dajac commented on code in PR #14408:
URL: https://github.com/apache/kafka/pull/14408#discussion_r1331125886


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -523,9 +526,38 @@ public 
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
             return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
         }
 
-        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-            "This API is not implemented yet."
-        ));
+        final Map<Integer, List<String>> groupsByPartition = new HashMap<>();
+        for (String groupId : groupIds) {
+            final int partition = partitionFor(groupId);
+            final List<String> groupList = 
groupsByPartition.getOrDefault(partition, new ArrayList<>());
+            groupList.add(groupId);
+            groupsByPartition.put(partition, groupList);
+        }
+
+        final 
List<CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>>
 futures = new ArrayList<>();
+        for (Map.Entry<Integer, List<String>> entry : 
groupsByPartition.entrySet()) {
+            int partition = entry.getKey();
+            List<String> groupList = entry.getValue();

Review Comment:
   nit: You could use `foreach` which is a bit more concise. 



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -523,9 +526,38 @@ public 
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
             return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
         }
 
-        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-            "This API is not implemented yet."
-        ));
+        final Map<Integer, List<String>> groupsByPartition = new HashMap<>();
+        for (String groupId : groupIds) {
+            final int partition = partitionFor(groupId);
+            final List<String> groupList = 
groupsByPartition.getOrDefault(partition, new ArrayList<>());
+            groupList.add(groupId);
+            groupsByPartition.put(partition, groupList);
+        }
+
+        final 
List<CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>>
 futures = new ArrayList<>();
+        for (Map.Entry<Integer, List<String>> entry : 
groupsByPartition.entrySet()) {
+            int partition = entry.getKey();
+            List<String> groupList = entry.getValue();
+            
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
+                    runtime.scheduleWriteOperation("delete-group",
+                            new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partition),
+                            coordinator -> coordinator.deleteGroups(context, 
groupList));
+            futures.add(future);
+        }
+
+        final CompletableFuture<Void> allFutures = 
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
+        return allFutures.thenApply(v -> {
+            final DeleteGroupsResponseData.DeletableGroupResultCollection res 
= new DeleteGroupsResponseData.DeletableGroupResultCollection();
+            for 
(CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future : futures) {
+                try {
+                    DeleteGroupsResponseData.DeletableGroupResultCollection 
result = future.get();

Review Comment:
   It may be better to use `join` instead of `get`. I think that you would be 
able to remove the try..catch if you use `join`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -523,9 +526,38 @@ public 
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
             return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
         }
 
-        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-            "This API is not implemented yet."
-        ));
+        final Map<Integer, List<String>> groupsByPartition = new HashMap<>();
+        for (String groupId : groupIds) {
+            final int partition = partitionFor(groupId);
+            final List<String> groupList = 
groupsByPartition.getOrDefault(partition, new ArrayList<>());
+            groupList.add(groupId);
+            groupsByPartition.put(partition, groupList);
+        }
+
+        final 
List<CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>>
 futures = new ArrayList<>();
+        for (Map.Entry<Integer, List<String>> entry : 
groupsByPartition.entrySet()) {
+            int partition = entry.getKey();
+            List<String> groupList = entry.getValue();
+            
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
+                    runtime.scheduleWriteOperation("delete-group",
+                            new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partition),
+                            coordinator -> coordinator.deleteGroups(context, 
groupList));
+            futures.add(future);
+        }
+
+        final CompletableFuture<Void> allFutures = 
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

Review Comment:
   Let's assume that one of the write operation fails with 
`COORDINATOR_LOAD_IN_PROGRESS`, this would result in failing `allFutures` even 
though some write operations may have been successful. It seems to me that we 
should handle exceptions for each write operation future before we combine 
them, no?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -523,9 +526,38 @@ public 
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
             return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
         }
 
-        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-            "This API is not implemented yet."
-        ));
+        final Map<Integer, List<String>> groupsByPartition = new HashMap<>();
+        for (String groupId : groupIds) {
+            final int partition = partitionFor(groupId);
+            final List<String> groupList = 
groupsByPartition.getOrDefault(partition, new ArrayList<>());
+            groupList.add(groupId);
+            groupsByPartition.put(partition, groupList);
+        }
+
+        final 
List<CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>>
 futures = new ArrayList<>();
+        for (Map.Entry<Integer, List<String>> entry : 
groupsByPartition.entrySet()) {
+            int partition = entry.getKey();
+            List<String> groupList = entry.getValue();
+            
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
+                    runtime.scheduleWriteOperation("delete-group",
+                            new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partition),
+                            coordinator -> coordinator.deleteGroups(context, 
groupList));

Review Comment:
   nit: Let's put `delete-group` on a new line as well. Could you also ensure 
that the format conforms to the existing code? e.g. where the closing 
parenthesis is, the indentation (4 spaces), etc.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -333,6 +348,98 @@ public CoordinatorResult<OffsetCommitResponseData, Record> 
commitOffset(
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handles an OffsetDelete request.
+     *
+     * @param context The request context.
+     * @param request The OffsetDelete request.
+     *
+     * @return A Result containing the OffsetDeleteResponseData response and
+     *         a list of records to update the state machine.
+     */
+    public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+            RequestContext context,
+            OffsetDeleteRequestData request
+    ) throws ApiException {
+        final List<Record> records = new ArrayList<>();
+        final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =
+                new 
OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData();
+        try {
+            Group group = validateOffsetDelete(request);
+
+            request.topics().forEach(topic -> {
+                final 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
responsePartitionCollection =
+                        new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+                final boolean subscribedToTopic = 
group.isSubscribedToTopic(topic.name());
+
+                topic.partitions().forEach(partition -> {
+                    records.add(RecordHelpers.newOffsetCommitTombstoneRecord(
+                            request.groupId(),
+                            topic.name(),
+                            partition.partitionIndex()
+                    ));
+
+                    OffsetDeleteResponseData.OffsetDeleteResponsePartition 
responsePartition =
+                            new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(partition.partitionIndex());
+                    if (subscribedToTopic) {
+                        responsePartition = 
responsePartition.setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code());
+                    }
+                    responsePartitionCollection.add(responsePartition);
+                });
+
+                final OffsetDeleteResponseData.OffsetDeleteResponseTopic 
responseTopic =
+                        new 
OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection);
+                responseTopicCollection.add(responseTopic);
+            });
+            response = response.setTopics(responseTopicCollection);
+        } catch (ApiException ex) {
+            response = response.setErrorCode(Errors.forException(ex).code());
+        }
+        return new CoordinatorResult<>(records, response);
+    }
+
+    /**
+     * Deletes all the offsets of the given groups to handle a GroupDelete 
request.
+     * Validations are done in groupDelete method in GroupMetadataManager.
+     *
+     * @param context The request context.
+     * @param groupIds The list of group ids of the given groups.
+     * @return A Result containing the OffsetDeleteResponseData response and
+     *         a list of records to update the state machine.
+     */
+    public CoordinatorResult<OffsetDeleteResponseData, Record> 
deleteAllOffsets(

Review Comment:
   As I said earlier, I think that returning CoordinatorResult is not 
appropriate here because we don't need a response in this case. We basically 
build for the response to ignore it right after.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -262,6 +267,29 @@ public HeartbeatResponseData genericGroupHeartbeat(
         );
     }
 
+    public 
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> deleteGroups(
+            RequestContext context,
+            List<String> groupIds
+    ) throws ApiException {
+
+        
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> groupDeleteCoordinatorResult =
+                groupMetadataManager.groupDelete(context, groupIds);
+
+        List<String> validGroupIds = new ArrayList<>();
+        for (DeleteGroupsResponseData.DeletableGroupResult result : 
groupDeleteCoordinatorResult.response()) {
+            if (result.errorCode() == Errors.NONE.code()) {
+                validGroupIds.add(result.groupId());
+            }
+        }
+
+        CoordinatorResult<OffsetDeleteResponseData, Record> 
deleteOffsetCoordinatorResult =
+                offsetMetadataManager.deleteAllOffsets(context, validGroupIds);
+
+        final List<Record> records = groupDeleteCoordinatorResult.records();
+        records.addAll(deleteOffsetCoordinatorResult.records());
+        return new CoordinatorResult<>(records, 
groupDeleteCoordinatorResult.response());

Review Comment:
   I have a few comments regarding this piece of code:
   1. I think that we should write the tombstones for the offsets before the 
ones for the group.
   2. It is a bit strange to return a CoordinatorResult from `deleteAllOffsets` 
and to ignore it. It would be better to pass the list of records to the method 
and to let the method populate it if the deletion is accepted. I would also 
remove the response as we don't need it.
   3. The `validGroupIds` is a bit weird. How about iterating over the group 
ids here? Then, you can call the various methods from the manages to validate, 
delete offsets and finally delete the group. If there is an error, you can 
directly populate the response with it.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3071,6 +3072,41 @@ private void removeCurrentMemberFromGenericGroup(
         group.remove(member.memberId());
     }
 
+    public 
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> groupDelete(
+            RequestContext context,
+            List<String> groupIds
+    ) throws ApiException {
+        final DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection =
+                new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        final List<Record> records = new ArrayList<>();
+
+        groupIds.forEach(groupId -> {
+            DeleteGroupsResponseData.DeletableGroupResult result =
+                    new 
DeleteGroupsResponseData.DeletableGroupResult().setGroupId(groupId);
+            try {
+                validateGroupDelete(groupId);
+                
records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId));

Review Comment:
   `newGroupMetadataTombstoneRecord` only works for generic groups. For 
consumer groups, we need to write other tombstones.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -705,9 +737,15 @@ public CompletableFuture<OffsetDeleteResponseData> 
deleteOffsets(
             return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
         }
 
-        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-            "This API is not implemented yet."
-        ));
+        if (!isGroupIdNotEmpty(request.groupId())) {
+            return 
FutureUtils.failedFuture(Errors.INVALID_GROUP_ID.exception());
+        }
+
+        return runtime.scheduleWriteOperation(
+                "delete-offset",
+                topicPartitionFor(request.groupId()),
+                coordinator -> coordinator.deleteOffsets(context, request)

Review Comment:
   nit: Indentation. 



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -262,6 +267,29 @@ public HeartbeatResponseData genericGroupHeartbeat(
         );
     }
 
+    public 
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> deleteGroups(
+            RequestContext context,
+            List<String> groupIds

Review Comment:
   nit: Indentation. There are other cases in this PR. I won't mention them all.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -333,6 +348,98 @@ public CoordinatorResult<OffsetCommitResponseData, Record> 
commitOffset(
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Handles an OffsetDelete request.
+     *
+     * @param context The request context.
+     * @param request The OffsetDelete request.
+     *
+     * @return A Result containing the OffsetDeleteResponseData response and
+     *         a list of records to update the state machine.
+     */
+    public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
+            RequestContext context,
+            OffsetDeleteRequestData request
+    ) throws ApiException {
+        final List<Record> records = new ArrayList<>();
+        final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =
+                new 
OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+        OffsetDeleteResponseData response = new OffsetDeleteResponseData();
+        try {
+            Group group = validateOffsetDelete(request);
+
+            request.topics().forEach(topic -> {
+                final 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
responsePartitionCollection =
+                        new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+                final boolean subscribedToTopic = 
group.isSubscribedToTopic(topic.name());
+
+                topic.partitions().forEach(partition -> {
+                    records.add(RecordHelpers.newOffsetCommitTombstoneRecord(
+                            request.groupId(),
+                            topic.name(),
+                            partition.partitionIndex()
+                    ));

Review Comment:
   We should not write the record if subscribedToTopic is true because it will 
effectively delete the offset.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to