AndrewJSchofield commented on code in PR #19478:
URL: https://github.com/apache/kafka/pull/19478#discussion_r2056862969


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1258,39 +1258,104 @@ private CompletableFuture<Map<String, Errors>> 
persisterDeleteToGroupIdErrorMap(
         });
     }
 
-    private CompletableFuture<DeleteShareGroupOffsetsResponseData> 
persistDeleteShareGroupOffsets(
-        DeleteShareGroupStateParameters deleteStateRequestParameters,
-        
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> 
errorTopicResponseList
+    private CompletableFuture<DeleteShareGroupOffsetsResponseData> 
completeDeleteShareGroupOffsets(
+        String groupId,
+        Map<Uuid, String> successTopics,
+        
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> 
errorTopicResponses
     ) {
-        return persister.deleteState(deleteStateRequestParameters)
-            .thenCompose(result -> {
-                if (result == null || result.topicsData() == null) {
-                    log.error("Result is null for the delete share group 
state");
-                    Exception exception = new IllegalStateException("Result is 
null for the delete share group state");
-                    return CompletableFuture.completedFuture(
-                        
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.forException(exception))
-                    );
-                }
-                result.topicsData().forEach(topicData ->
-                    errorTopicResponseList.add(
-                        new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
-                            .setTopicId(topicData.topicId())
-                            
.setTopicName(metadataImage.topics().topicIdToNameView().get(topicData.topicId()))
-                            .setPartitions(topicData.partitions().stream().map(
-                                partitionData -> new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
-                                    
.setPartitionIndex(partitionData.partition())
-                                    .setErrorMessage(partitionData.errorCode() 
== Errors.NONE.code() ? null : 
Errors.forCode(partitionData.errorCode()).message())
-                                    .setErrorCode(partitionData.errorCode())
-                            ).toList())
-                    )
-                );
+        return runtime.scheduleWriteOperation(
+            "complete-delete-share-group-offsets",
+            topicPartitionFor(groupId),
+            Duration.ofMillis(config.offsetCommitTimeoutMs()),
+            coordinator -> 
coordinator.completeDeleteShareGroupOffsets(groupId, successTopics, 
errorTopicResponses)
+        ).exceptionally(exception -> handleOperationException(
+            "complete-delete-share-group-offsets",
+            groupId,
+            exception,
+            (error, __) -> 
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(error),
+            log
+        ));
+    }
 
-                return CompletableFuture.completedFuture(
-                    new DeleteShareGroupOffsetsResponseData()
-                        .setResponses(errorTopicResponseList)
+    private CompletableFuture<DeleteShareGroupOffsetsResponseData> 
handleDeleteShareGroupStateResult(
+        String groupId,
+        DeleteShareGroupStateResult result,
+        
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> 
errorTopicResponses
+    ) {
+        if (result == null || result.topicsData() == null) {
+            log.error("Result is null for the delete share group state");
+            Exception exception = new IllegalStateException("Result is null 
for the delete share group state");
+            return CompletableFuture.completedFuture(
+                
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.forException(exception))
+            );
+        }
+        Map<Uuid, String> successTopics = new HashMap<>();
+        result.topicsData().forEach(topicData -> {
+            Optional<PartitionErrorData> errItem = 
topicData.partitions().stream()
+                .filter(errData -> errData.errorCode() != Errors.NONE.code())
+                .findAny();
+
+            if (errItem.isPresent()) {
+                errorTopicResponses.add(
+                    new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
+                        .setTopicId(topicData.topicId())
+                        
.setTopicName(metadataImage.topics().topicIdToNameView().get(topicData.topicId()))
+                        
.setErrorMessage(Errors.forCode(errItem.get().errorCode()).message())
+                        .setErrorCode(errItem.get().errorCode())
                 );
-            }).exceptionally(throwable -> {
-                log.error("Failed to delete share group state");
+            } else {
+                successTopics.put(
+                    topicData.topicId(),
+                    
metadataImage.topics().topicIdToNameView().get(topicData.topicId())
+                );
+            }
+        });
+
+        // If there are no topics for which persister delete state request 
succeeded, then we can return directly from here
+        if (successTopics.isEmpty()) {
+            return CompletableFuture.completedFuture(
+                new DeleteShareGroupOffsetsResponseData()
+                    .setResponses(errorTopicResponses)
+            );
+        }
+
+        return completeDeleteShareGroupOffsets(groupId, successTopics, 
errorTopicResponses);
+    }
+
+    private CompletableFuture<DeleteShareGroupOffsetsResponseData> deleteState(

Review Comment:
   The sequence of steps in the `GroupCoordinatorService` is:
   * `deleteShareGroupOffsets`
   * `deleteState`
   * `handleDeleteShareGroupStateResult`
   * `completeDeleteShareGroupOffsets`
   
   I would prefer to see these methods arranged in the source code in the same 
order as they are executed just for ease of comprehension.
   
   I suggest a couple of method renamings too:
   * `deleteState` is too ambiguous, and I suggest 
`deleteShareGroupOffsetsState`
   * `handleDeleteShareGroupStateResult` sounds like it's deleting all of the 
state for the group, so perhaps `handleDeleteShareGroupOffsetStateResult`



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -8232,50 +8232,125 @@ public Optional<DeleteShareGroupStateParameters> 
shareGroupBuildPartitionDeleteR
 
     /**
      * Returns a list of delete share group state request topic objects to be 
used with the persister.
-     * @param groupId - group ID of the share group
-     * @param requestData - the request data for DeleteShareGroupOffsets 
request
-     * @param errorTopicResponseList - the list of topics not found in the 
metadata image
+     * @param groupId                    group ID of the share group
+     * @param requestData                the request data for 
DeleteShareGroupOffsets request
+     * @param errorTopicResponseList     the list of topics not found in the 
metadata image
+     * @param records                    List of coordinator records to append 
to
+     *
      * @return List of objects representing the share group state delete 
request for topics.
      */
     public List<DeleteShareGroupStateRequestData.DeleteStateData> 
sharePartitionsEligibleForOffsetDeletion(
         String groupId,
         DeleteShareGroupOffsetsRequestData requestData,
-        
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> 
errorTopicResponseList
+        
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> 
errorTopicResponseList,
+        List<CoordinatorRecord> records
     ) {
         List<DeleteShareGroupStateRequestData.DeleteStateData> 
deleteShareGroupStateRequestTopicsData = new ArrayList<>();
+        Map<Uuid, Set<Integer>> initializedTopics = new HashMap<>();
+
+        ShareGroupStatePartitionMetadataInfo currentMap = 
shareGroupPartitionMetadata.get(groupId);
+
+        if (currentMap == null) {
+            return deleteShareGroupStateRequestTopicsData;
+        }
+
+        currentMap.initializedTopics().forEach((topicId, partitions) -> 
initializedTopics.put(topicId, new HashSet<>(partitions)));
+        Set<Uuid> deletingTopics = new HashSet<>(currentMap.deletingTopics());
 
-        Map<Uuid, Set<Integer>> initializedSharePartitions = 
initializedShareGroupPartitions(groupId);
         requestData.topics().forEach(topic -> {
-            Uuid topicId = 
metadataImage.topics().topicNameToIdView().get(topic.topicName());
-            if (topicId != null) {
+            TopicImage topicImage = 
metadataImage.topics().getTopic(topic.topicName());
+            if (topicImage != null) {
+                Uuid topicId = topicImage.id();
                 // A deleteState request to persister should only be sent with 
those topic partitions for which corresponding
                 // share partitions are initialized for the group.
-                if (initializedSharePartitions.containsKey(topicId)) {
+                if (initializedTopics.containsKey(topicId)) {
                     List<DeleteShareGroupStateRequestData.PartitionData> 
partitions = new ArrayList<>();
-                    topic.partitions().forEach(partition -> {
-                        if 
(initializedSharePartitions.get(topicId).contains(partition)) {
-                            partitions.add(new 
DeleteShareGroupStateRequestData.PartitionData().setPartition(partition));
-                        }
-                    });
-                    deleteShareGroupStateRequestTopicsData.add(new 
DeleteShareGroupStateRequestData.DeleteStateData()
-                        .setTopicId(topicId)
-                        .setPartitions(partitions));
+                    initializedTopics.get(topicId).forEach(partition ->
+                        partitions.add(new 
DeleteShareGroupStateRequestData.PartitionData().setPartition(partition)));
+                    deleteShareGroupStateRequestTopicsData.add(
+                        new DeleteShareGroupStateRequestData.DeleteStateData()
+                            .setTopicId(topicId)
+                            .setPartitions(partitions)
+                    );
+                    // Removing the topic from initializedTopics map.
+                    initializedTopics.remove(topicId);
+                    // Adding the topic to deletingTopics map.
+                    deletingTopics.add(topicId);
+                } else if (deletingTopics.contains(topicId)) {
+                    // If the topic for which delete share group offsets 
request is sent is already present in the deletingTopics set,
+                    // we will include that topic in the delete share group 
state request.
+                    List<DeleteShareGroupStateRequestData.PartitionData> 
partitions = new ArrayList<>();
+                    topicImage.partitions().keySet().forEach(partition ->
+                        partitions.add(new 
DeleteShareGroupStateRequestData.PartitionData().setPartition(partition)));
+                    deleteShareGroupStateRequestTopicsData.add(
+                        new DeleteShareGroupStateRequestData.DeleteStateData()
+                            .setTopicId(topicId)
+                            .setPartitions(partitions)
+                    );

Review Comment:
   I would add another else here like this:
   ```
                   } else {
                       errorTopicResponseList.add(new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
                               .setTopicName(topic.topicName())
                               
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
                               .setErrorMessage("There is no offset information 
to delete.")
                       );
   ```
   
   If the topic is not present in the set of initialized or deleting topics, 
the error code `UNKNOWN_TOPIC_OR_PARTITION` can be returned with a specific 
error message. Then this can show up in the admin client and command-line tools.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to