AndrewJSchofield commented on code in PR #18976: URL: https://github.com/apache/kafka/pull/18976#discussion_r1977599300
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -1206,6 +1210,93 @@ public CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGrou return future; } + /** + * See {@link GroupCoordinator#deleteShareGroupOffsets(RequestContext, DeleteShareGroupOffsetsRequestData)}. + */ + @Override + public CompletableFuture<DeleteShareGroupOffsetsResponseData> deleteShareGroupOffsets( + RequestContext context, + DeleteShareGroupOffsetsRequestData requestData + ) { + if (!isActive.get()) { + return CompletableFuture.completedFuture( + DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.COORDINATOR_NOT_AVAILABLE)); + } + + if (metadataImage == null) { + return CompletableFuture.completedFuture( + DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.COORDINATOR_NOT_AVAILABLE)); + } + + Map<Uuid, String> requestTopicIdToNameMapping = new HashMap<>(); + List<DeleteShareGroupStateRequestData.DeleteStateData> deleteShareGroupStateRequestTopicsData = new ArrayList<>(requestData.topics().size()); + List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> deleteShareGroupOffsetsResponseTopicList = new ArrayList<>(requestData.topics().size()); + + requestData.topics().forEach(topic -> { + Uuid topicId = metadataImage.topics().topicNameToIdView().get(topic.topicName()); + if (topicId != null) { + requestTopicIdToNameMapping.put(topicId, topic.topicName()); + deleteShareGroupStateRequestTopicsData.add(new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(topicId) + .setPartitions( + topic.partitions().stream().map( + partitionIndex -> new DeleteShareGroupStateRequestData.PartitionData().setPartition(partitionIndex) + ).toList() + )); + } else { + deleteShareGroupOffsetsResponseTopicList.add(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic() + .setTopicName(topic.topicName()) + .setPartitions(topic.partitions().stream().map( + partition -> new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition() + .setPartitionIndex(partition) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) + .setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()) + ).toList())); + } + }); + + // If the request for the persister is empty, just complete the operation right away. + if (deleteShareGroupStateRequestTopicsData.isEmpty()) { + return CompletableFuture.completedFuture( + new DeleteShareGroupOffsetsResponseData() + .setResponses(deleteShareGroupOffsetsResponseTopicList)); + } + + DeleteShareGroupStateRequestData deleteShareGroupStateRequestData = new DeleteShareGroupStateRequestData() + .setGroupId(requestData.groupId()) + .setTopics(deleteShareGroupStateRequestTopicsData); + CompletableFuture<DeleteShareGroupOffsetsResponseData> future = new CompletableFuture<>(); + persister.deleteState(DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData)) + .whenComplete((result, error) -> { + if (error != null) { + log.error("Failed to delete share partitions"); Review Comment: I think you should replace "delete share partitions" with "delete share group state". -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org