AndrewJSchofield commented on code in PR #19431: URL: https://github.com/apache/kafka/pull/19431#discussion_r2039571274
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -1580,83 +1574,51 @@ public CompletableFuture<DeleteShareGroupOffsetsResponseData> deleteShareGroupOf DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.INVALID_GROUP_ID)); } - Map<Uuid, String> requestTopicIdToNameMapping = new HashMap<>(); - List<DeleteShareGroupStateRequestData.DeleteStateData> deleteShareGroupStateRequestTopicsData = new ArrayList<>(requestData.topics().size()); - List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> deleteShareGroupOffsetsResponseTopicList = new ArrayList<>(requestData.topics().size()); - - requestData.topics().forEach(topic -> { - Uuid topicId = metadataImage.topics().topicNameToIdView().get(topic.topicName()); - if (topicId != null) { - requestTopicIdToNameMapping.put(topicId, topic.topicName()); - deleteShareGroupStateRequestTopicsData.add(new DeleteShareGroupStateRequestData.DeleteStateData() - .setTopicId(topicId) - .setPartitions( - topic.partitions().stream().map( - partitionIndex -> new DeleteShareGroupStateRequestData.PartitionData().setPartition(partitionIndex) - ).toList() - )); - } else { - deleteShareGroupOffsetsResponseTopicList.add(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic() - .setTopicName(topic.topicName()) - .setPartitions(topic.partitions().stream().map( - partition -> new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition() - .setPartitionIndex(partition) - .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) - .setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()) - ).toList())); - } - }); - - // If the request for the persister is empty, just complete the operation right away. - if (deleteShareGroupStateRequestTopicsData.isEmpty()) { + if (requestData.topics() == null || requestData.topics().isEmpty()) { return CompletableFuture.completedFuture( new DeleteShareGroupOffsetsResponseData() - .setResponses(deleteShareGroupOffsetsResponseTopicList)); + ); } - CompletableFuture<DeleteShareGroupOffsetsResponseData> future = new CompletableFuture<>(); + return runtime.scheduleReadOperation( + "share-group-delete-offsets-request", + topicPartitionFor(groupId), + (coordinator, lastCommittedOffset) -> coordinator.shareGroupDeleteOffsetsRequest(groupId, requestData) + ) + .thenCompose(resultHolder -> { + if (resultHolder == null) { + log.error("Failed to retrieve deleteState request parameters from group coordinator for the group {}", groupId); + return CompletableFuture.completedFuture( + DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.UNKNOWN_SERVER_ERROR) + ); + } + + if (resultHolder.topLevelError().code() != Errors.NONE.code()) { + log.error("Failed to retrieve deleteState request parameters from group coordinator for the group {}", groupId); Review Comment: I don't think you should log an error here. This is the usual error path for situations such as a non-existent group. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ########## @@ -290,6 +294,61 @@ public GroupCoordinatorShard build() { } } + public static class DeleteShareGroupOffsetsResultHolder { + private final Errors topLevelError; Review Comment: I think you should separate Errors into `short topLevelErrorCode` and `String topLevelErrorMessage`. Otherwise, you'll be missing information about why an error occurs (not a share group, or non-existent group). ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ########## @@ -612,6 +671,56 @@ public CoordinatorResult<Map<String, Map.Entry<DeleteShareGroupStateParameters, return new CoordinatorResult<>(List.of(), responseMap); } + /** + * Does the following checks to make sure that a DeleteShareGroupOffsets request is valid and can be processed further + * 1. Checks whether the provided group is empty + * 2. Checks the requested topics are presented in the metadataImage + * 3. Checks the requested share partitions are initialized for the group + * + * @param groupId - The group ID + * @param requestData - The request data for DeleteShareGroupOffsetsRequest + * @return {@link DeleteShareGroupOffsetsResultHolder} an object containing top level error code, list of topic responses + * and persister deleteState request parameters + */ + public DeleteShareGroupOffsetsResultHolder shareGroupDeleteOffsetsRequest( + String groupId, + DeleteShareGroupOffsetsRequestData requestData + ) { + try { + ShareGroup group = groupMetadataManager.shareGroup(groupId); + group.validateDeleteGroup(); + + List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponseList = new ArrayList<>(); + List<DeleteShareGroupStateRequestData.DeleteStateData> deleteShareGroupStateRequestTopicsData = + groupMetadataManager.sharePartitionsEligibleForOffsetDeletion( + groupId, + requestData, + errorTopicResponseList + ); + + if (deleteShareGroupStateRequestTopicsData.isEmpty()) { + return new DeleteShareGroupOffsetsResultHolder(Errors.NONE, errorTopicResponseList); + } + + DeleteShareGroupStateRequestData deleteShareGroupStateRequestData = new DeleteShareGroupStateRequestData() + .setGroupId(requestData.groupId()) + .setTopics(deleteShareGroupStateRequestTopicsData); + + return new DeleteShareGroupOffsetsResultHolder( + Errors.NONE, + errorTopicResponseList, + DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData) + ); + + } catch (GroupIdNotFoundException exception) { + log.error("groupId {} not found", groupId, exception); + return new DeleteShareGroupOffsetsResultHolder(Errors.forException(exception)); Review Comment: So, `new DeleteShareGroupOffsetsResultHolder(Errors.GROUP_ID_NOT_FOUND.code(), exception.getMessage());` is better. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -8178,6 +8181,51 @@ public Optional<DeleteShareGroupStateParameters> shareGroupBuildPartitionDeleteR ); } + /** + * Returns a list of delete share group state request topic objects to be used with the persister. + * @param groupId - group ID of the share group + * @param requestData - the request data for DeleteShareGroupOffsets request + * @return List of objects representing the share group state delete request for topics. Review Comment: nit: Missing `errorTopicResponseList` from the parameters list. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -1580,83 +1574,51 @@ public CompletableFuture<DeleteShareGroupOffsetsResponseData> deleteShareGroupOf DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.INVALID_GROUP_ID)); } - Map<Uuid, String> requestTopicIdToNameMapping = new HashMap<>(); - List<DeleteShareGroupStateRequestData.DeleteStateData> deleteShareGroupStateRequestTopicsData = new ArrayList<>(requestData.topics().size()); - List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> deleteShareGroupOffsetsResponseTopicList = new ArrayList<>(requestData.topics().size()); - - requestData.topics().forEach(topic -> { - Uuid topicId = metadataImage.topics().topicNameToIdView().get(topic.topicName()); - if (topicId != null) { - requestTopicIdToNameMapping.put(topicId, topic.topicName()); - deleteShareGroupStateRequestTopicsData.add(new DeleteShareGroupStateRequestData.DeleteStateData() - .setTopicId(topicId) - .setPartitions( - topic.partitions().stream().map( - partitionIndex -> new DeleteShareGroupStateRequestData.PartitionData().setPartition(partitionIndex) - ).toList() - )); - } else { - deleteShareGroupOffsetsResponseTopicList.add(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic() - .setTopicName(topic.topicName()) - .setPartitions(topic.partitions().stream().map( - partition -> new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition() - .setPartitionIndex(partition) - .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) - .setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()) - ).toList())); - } - }); - - // If the request for the persister is empty, just complete the operation right away. - if (deleteShareGroupStateRequestTopicsData.isEmpty()) { + if (requestData.topics() == null || requestData.topics().isEmpty()) { return CompletableFuture.completedFuture( new DeleteShareGroupOffsetsResponseData() - .setResponses(deleteShareGroupOffsetsResponseTopicList)); + ); } - CompletableFuture<DeleteShareGroupOffsetsResponseData> future = new CompletableFuture<>(); + return runtime.scheduleReadOperation( + "share-group-delete-offsets-request", + topicPartitionFor(groupId), + (coordinator, lastCommittedOffset) -> coordinator.shareGroupDeleteOffsetsRequest(groupId, requestData) + ) + .thenCompose(resultHolder -> { + if (resultHolder == null) { + log.error("Failed to retrieve deleteState request parameters from group coordinator for the group {}", groupId); + return CompletableFuture.completedFuture( + DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.UNKNOWN_SERVER_ERROR) + ); + } + + if (resultHolder.topLevelError().code() != Errors.NONE.code()) { + log.error("Failed to retrieve deleteState request parameters from group coordinator for the group {}", groupId); + return CompletableFuture.completedFuture( + DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(resultHolder.topLevelError()) Review Comment: And here's a case where you want to maintain the error message so it's available to the admin client. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ########## @@ -612,6 +671,56 @@ public CoordinatorResult<Map<String, Map.Entry<DeleteShareGroupStateParameters, return new CoordinatorResult<>(List.of(), responseMap); } + /** + * Does the following checks to make sure that a DeleteShareGroupOffsets request is valid and can be processed further + * 1. Checks whether the provided group is empty + * 2. Checks the requested topics are presented in the metadataImage + * 3. Checks the requested share partitions are initialized for the group + * + * @param groupId - The group ID + * @param requestData - The request data for DeleteShareGroupOffsetsRequest + * @return {@link DeleteShareGroupOffsetsResultHolder} an object containing top level error code, list of topic responses + * and persister deleteState request parameters + */ + public DeleteShareGroupOffsetsResultHolder shareGroupDeleteOffsetsRequest( + String groupId, + DeleteShareGroupOffsetsRequestData requestData + ) { + try { + ShareGroup group = groupMetadataManager.shareGroup(groupId); + group.validateDeleteGroup(); + + List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponseList = new ArrayList<>(); + List<DeleteShareGroupStateRequestData.DeleteStateData> deleteShareGroupStateRequestTopicsData = + groupMetadataManager.sharePartitionsEligibleForOffsetDeletion( + groupId, + requestData, + errorTopicResponseList + ); + + if (deleteShareGroupStateRequestTopicsData.isEmpty()) { + return new DeleteShareGroupOffsetsResultHolder(Errors.NONE, errorTopicResponseList); + } + + DeleteShareGroupStateRequestData deleteShareGroupStateRequestData = new DeleteShareGroupStateRequestData() + .setGroupId(requestData.groupId()) + .setTopics(deleteShareGroupStateRequestTopicsData); + + return new DeleteShareGroupOffsetsResultHolder( + Errors.NONE, + errorTopicResponseList, + DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData) + ); + + } catch (GroupIdNotFoundException exception) { + log.error("groupId {} not found", groupId, exception); + return new DeleteShareGroupOffsetsResultHolder(Errors.forException(exception)); + } catch (GroupNotEmptyException exception) { + log.error("Provided group {} is not empty", groupId); + return new DeleteShareGroupOffsetsResultHolder(Errors.forException(exception)); Review Comment: `new DeleteShareGroupOffsetsResultHolder(Errors.NON_EMPTY_GROUP.code(), exception.getMessage());` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org