AndrewJSchofield commented on code in PR #19431:
URL: https://github.com/apache/kafka/pull/19431#discussion_r2039571274


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1580,83 +1574,51 @@ public 
CompletableFuture<DeleteShareGroupOffsetsResponseData> deleteShareGroupOf
                 
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.INVALID_GROUP_ID));
         }
 
-        Map<Uuid, String> requestTopicIdToNameMapping = new HashMap<>();
-        List<DeleteShareGroupStateRequestData.DeleteStateData> 
deleteShareGroupStateRequestTopicsData = new 
ArrayList<>(requestData.topics().size());
-        
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> 
deleteShareGroupOffsetsResponseTopicList = new 
ArrayList<>(requestData.topics().size());
-
-        requestData.topics().forEach(topic -> {
-            Uuid topicId = 
metadataImage.topics().topicNameToIdView().get(topic.topicName());
-            if (topicId != null) {
-                requestTopicIdToNameMapping.put(topicId, topic.topicName());
-                deleteShareGroupStateRequestTopicsData.add(new 
DeleteShareGroupStateRequestData.DeleteStateData()
-                    .setTopicId(topicId)
-                    .setPartitions(
-                        topic.partitions().stream().map(
-                            partitionIndex -> new 
DeleteShareGroupStateRequestData.PartitionData().setPartition(partitionIndex)
-                        ).toList()
-                    ));
-            } else {
-                deleteShareGroupOffsetsResponseTopicList.add(new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
-                    .setTopicName(topic.topicName())
-                    .setPartitions(topic.partitions().stream().map(
-                        partition -> new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
-                            .setPartitionIndex(partition)
-                            
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
-                            
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
-                    ).toList()));
-            }
-        });
-
-        // If the request for the persister is empty, just complete the 
operation right away.
-        if (deleteShareGroupStateRequestTopicsData.isEmpty()) {
+        if (requestData.topics() == null || requestData.topics().isEmpty()) {
             return CompletableFuture.completedFuture(
                 new DeleteShareGroupOffsetsResponseData()
-                    .setResponses(deleteShareGroupOffsetsResponseTopicList));
+            );
         }
 
-        CompletableFuture<DeleteShareGroupOffsetsResponseData> future = new 
CompletableFuture<>();
+        return runtime.scheduleReadOperation(
+            "share-group-delete-offsets-request",
+            topicPartitionFor(groupId),
+            (coordinator, lastCommittedOffset) -> 
coordinator.shareGroupDeleteOffsetsRequest(groupId, requestData)
+        )
+            .thenCompose(resultHolder -> {
+                if (resultHolder == null) {
+                    log.error("Failed to retrieve deleteState request 
parameters from group coordinator for the group {}", groupId);
+                    return CompletableFuture.completedFuture(
+                        
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.UNKNOWN_SERVER_ERROR)
+                    );
+                }
+
+                if (resultHolder.topLevelError().code() != Errors.NONE.code()) 
{
+                    log.error("Failed to retrieve deleteState request 
parameters from group coordinator for the group {}", groupId);

Review Comment:
   I don't think you should log an error here. This is the usual error path for 
situations such as a non-existent group.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -290,6 +294,61 @@ public GroupCoordinatorShard build() {
         }
     }
 
+    public static class DeleteShareGroupOffsetsResultHolder {
+        private final Errors topLevelError;

Review Comment:
   I think you should separate Errors into `short topLevelErrorCode` and 
`String topLevelErrorMessage`. Otherwise, you'll be missing information about 
why an error occurs (not a share group, or non-existent group).



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -612,6 +671,56 @@ public CoordinatorResult<Map<String, 
Map.Entry<DeleteShareGroupStateParameters,
         return new CoordinatorResult<>(List.of(), responseMap);
     }
 
+    /**
+     * Does the following checks to make sure that a DeleteShareGroupOffsets 
request is valid and can be processed further
+     * 1. Checks whether the provided group is empty
+     * 2. Checks the requested topics are presented in the metadataImage
+     * 3. Checks the requested share partitions are initialized for the group
+     *
+     * @param groupId - The group ID
+     * @param requestData - The request data for DeleteShareGroupOffsetsRequest
+     * @return {@link DeleteShareGroupOffsetsResultHolder} an object 
containing top level error code, list of topic responses
+     *                                               and persister deleteState 
request parameters
+     */
+    public DeleteShareGroupOffsetsResultHolder shareGroupDeleteOffsetsRequest(
+        String groupId,
+        DeleteShareGroupOffsetsRequestData requestData
+    ) {
+        try {
+            ShareGroup group = groupMetadataManager.shareGroup(groupId);
+            group.validateDeleteGroup();
+
+            
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> 
errorTopicResponseList = new ArrayList<>();
+            List<DeleteShareGroupStateRequestData.DeleteStateData> 
deleteShareGroupStateRequestTopicsData =
+                groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(
+                    groupId,
+                    requestData,
+                    errorTopicResponseList
+                );
+
+            if (deleteShareGroupStateRequestTopicsData.isEmpty()) {
+                return new DeleteShareGroupOffsetsResultHolder(Errors.NONE, 
errorTopicResponseList);
+            }
+
+            DeleteShareGroupStateRequestData deleteShareGroupStateRequestData 
= new DeleteShareGroupStateRequestData()
+                .setGroupId(requestData.groupId())
+                .setTopics(deleteShareGroupStateRequestTopicsData);
+
+            return new DeleteShareGroupOffsetsResultHolder(
+                Errors.NONE,
+                errorTopicResponseList,
+                
DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData)
+            );
+
+        } catch (GroupIdNotFoundException exception) {
+            log.error("groupId {} not found", groupId, exception);
+            return new 
DeleteShareGroupOffsetsResultHolder(Errors.forException(exception));

Review Comment:
   So, `new 
DeleteShareGroupOffsetsResultHolder(Errors.GROUP_ID_NOT_FOUND.code(), 
exception.getMessage());` is better.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -8178,6 +8181,51 @@ public Optional<DeleteShareGroupStateParameters> 
shareGroupBuildPartitionDeleteR
         );
     }
 
+    /**
+     * Returns a list of delete share group state request topic objects to be 
used with the persister.
+     * @param groupId - group ID of the share group
+     * @param requestData - the request data for DeleteShareGroupOffsets 
request
+     * @return List of objects representing the share group state delete 
request for topics.

Review Comment:
   nit: Missing `errorTopicResponseList` from the parameters list.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1580,83 +1574,51 @@ public 
CompletableFuture<DeleteShareGroupOffsetsResponseData> deleteShareGroupOf
                 
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.INVALID_GROUP_ID));
         }
 
-        Map<Uuid, String> requestTopicIdToNameMapping = new HashMap<>();
-        List<DeleteShareGroupStateRequestData.DeleteStateData> 
deleteShareGroupStateRequestTopicsData = new 
ArrayList<>(requestData.topics().size());
-        
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> 
deleteShareGroupOffsetsResponseTopicList = new 
ArrayList<>(requestData.topics().size());
-
-        requestData.topics().forEach(topic -> {
-            Uuid topicId = 
metadataImage.topics().topicNameToIdView().get(topic.topicName());
-            if (topicId != null) {
-                requestTopicIdToNameMapping.put(topicId, topic.topicName());
-                deleteShareGroupStateRequestTopicsData.add(new 
DeleteShareGroupStateRequestData.DeleteStateData()
-                    .setTopicId(topicId)
-                    .setPartitions(
-                        topic.partitions().stream().map(
-                            partitionIndex -> new 
DeleteShareGroupStateRequestData.PartitionData().setPartition(partitionIndex)
-                        ).toList()
-                    ));
-            } else {
-                deleteShareGroupOffsetsResponseTopicList.add(new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
-                    .setTopicName(topic.topicName())
-                    .setPartitions(topic.partitions().stream().map(
-                        partition -> new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
-                            .setPartitionIndex(partition)
-                            
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
-                            
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
-                    ).toList()));
-            }
-        });
-
-        // If the request for the persister is empty, just complete the 
operation right away.
-        if (deleteShareGroupStateRequestTopicsData.isEmpty()) {
+        if (requestData.topics() == null || requestData.topics().isEmpty()) {
             return CompletableFuture.completedFuture(
                 new DeleteShareGroupOffsetsResponseData()
-                    .setResponses(deleteShareGroupOffsetsResponseTopicList));
+            );
         }
 
-        CompletableFuture<DeleteShareGroupOffsetsResponseData> future = new 
CompletableFuture<>();
+        return runtime.scheduleReadOperation(
+            "share-group-delete-offsets-request",
+            topicPartitionFor(groupId),
+            (coordinator, lastCommittedOffset) -> 
coordinator.shareGroupDeleteOffsetsRequest(groupId, requestData)
+        )
+            .thenCompose(resultHolder -> {
+                if (resultHolder == null) {
+                    log.error("Failed to retrieve deleteState request 
parameters from group coordinator for the group {}", groupId);
+                    return CompletableFuture.completedFuture(
+                        
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.UNKNOWN_SERVER_ERROR)
+                    );
+                }
+
+                if (resultHolder.topLevelError().code() != Errors.NONE.code()) 
{
+                    log.error("Failed to retrieve deleteState request 
parameters from group coordinator for the group {}", groupId);
+                    return CompletableFuture.completedFuture(
+                        
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(resultHolder.topLevelError())

Review Comment:
   And here's a case where you want to maintain the error message so it's 
available to the admin client.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -612,6 +671,56 @@ public CoordinatorResult<Map<String, 
Map.Entry<DeleteShareGroupStateParameters,
         return new CoordinatorResult<>(List.of(), responseMap);
     }
 
+    /**
+     * Does the following checks to make sure that a DeleteShareGroupOffsets 
request is valid and can be processed further
+     * 1. Checks whether the provided group is empty
+     * 2. Checks the requested topics are presented in the metadataImage
+     * 3. Checks the requested share partitions are initialized for the group
+     *
+     * @param groupId - The group ID
+     * @param requestData - The request data for DeleteShareGroupOffsetsRequest
+     * @return {@link DeleteShareGroupOffsetsResultHolder} an object 
containing top level error code, list of topic responses
+     *                                               and persister deleteState 
request parameters
+     */
+    public DeleteShareGroupOffsetsResultHolder shareGroupDeleteOffsetsRequest(
+        String groupId,
+        DeleteShareGroupOffsetsRequestData requestData
+    ) {
+        try {
+            ShareGroup group = groupMetadataManager.shareGroup(groupId);
+            group.validateDeleteGroup();
+
+            
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> 
errorTopicResponseList = new ArrayList<>();
+            List<DeleteShareGroupStateRequestData.DeleteStateData> 
deleteShareGroupStateRequestTopicsData =
+                groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(
+                    groupId,
+                    requestData,
+                    errorTopicResponseList
+                );
+
+            if (deleteShareGroupStateRequestTopicsData.isEmpty()) {
+                return new DeleteShareGroupOffsetsResultHolder(Errors.NONE, 
errorTopicResponseList);
+            }
+
+            DeleteShareGroupStateRequestData deleteShareGroupStateRequestData 
= new DeleteShareGroupStateRequestData()
+                .setGroupId(requestData.groupId())
+                .setTopics(deleteShareGroupStateRequestTopicsData);
+
+            return new DeleteShareGroupOffsetsResultHolder(
+                Errors.NONE,
+                errorTopicResponseList,
+                
DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData)
+            );
+
+        } catch (GroupIdNotFoundException exception) {
+            log.error("groupId {} not found", groupId, exception);
+            return new 
DeleteShareGroupOffsetsResultHolder(Errors.forException(exception));
+        } catch (GroupNotEmptyException exception) {
+            log.error("Provided group {} is not empty", groupId);
+            return new 
DeleteShareGroupOffsetsResultHolder(Errors.forException(exception));

Review Comment:
   `new DeleteShareGroupOffsetsResultHolder(Errors.NON_EMPTY_GROUP.code(), 
exception.getMessage());`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to