dajac commented on code in PR #18848:
URL: https://github.com/apache/kafka/pull/18848#discussion_r1954063627


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -845,6 +896,113 @@ public 
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
             (accumulator, newResults) -> newResults.forEach(result -> 
accumulator.add(result.duplicate())));
     }
 
+    private CompletableFuture<Map<String, Errors>> 
deleteShareGroups(TopicPartition topicPartition, List<String> groupList) {
+        // topicPartition refers to internal topic __consumer_offsets
+        return runtime.scheduleReadOperation(
+                "delete-share-groups",
+                topicPartition,
+                (coordinator, offset) -> 
coordinator.sharePartitions(groupList, offset)
+            )

Review Comment:
   nit: The indentation is off here. We usually use 4 spaces for the arguments.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -845,6 +896,113 @@ public 
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
             (accumulator, newResults) -> newResults.forEach(result -> 
accumulator.add(result.duplicate())));
     }
 
+    private CompletableFuture<Map<String, Errors>> 
deleteShareGroups(TopicPartition topicPartition, List<String> groupList) {

Review Comment:
   nit: Let's put one argument per line to follow the format of the other 
methods in this file.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -845,6 +896,113 @@ public 
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
             (accumulator, newResults) -> newResults.forEach(result -> 
accumulator.add(result.duplicate())));
     }
 
+    private CompletableFuture<Map<String, Errors>> 
deleteShareGroups(TopicPartition topicPartition, List<String> groupList) {
+        // topicPartition refers to internal topic __consumer_offsets
+        return runtime.scheduleReadOperation(

Review Comment:
   For consistency reason, I suggest to use a write operation to ensure that 
you read the last state. Otherwise, there is a change that you have a share 
group non-committed yet and you would not see it with a read.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -845,6 +896,113 @@ public 
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
             (accumulator, newResults) -> newResults.forEach(result -> 
accumulator.add(result.duplicate())));
     }
 
+    private CompletableFuture<Map<String, Errors>> 
deleteShareGroups(TopicPartition topicPartition, List<String> groupList) {
+        // topicPartition refers to internal topic __consumer_offsets
+        return runtime.scheduleReadOperation(
+                "delete-share-groups",
+                topicPartition,
+                (coordinator, offset) -> 
coordinator.sharePartitions(groupList, offset)
+            )
+            .thenCompose(this::performShareGroupsDeletion)
+            .exceptionally(exception -> handleOperationException(
+                "delete-share-groups",
+                groupList,
+                exception,
+                (error, __) -> {
+                    Map<String, Errors> errors = new HashMap<>();
+                    groupList.forEach(group -> errors.put(group, error));
+                    return errors;
+                },
+                log
+            ));
+    }
+
+    private CompletableFuture<Map<String, Errors>> performShareGroupsDeletion(
+        Map<String, Map<Uuid, List<Integer>>> keys
+    ) {
+        List<CompletableFuture<AbstractMap.SimpleEntry<String, 
DeleteShareGroupStateResult>>> futures = new ArrayList<>();
+        for (Map.Entry<String, Map<Uuid, List<Integer>>> groupEntry : 
keys.entrySet()) {
+            List<TopicData<PartitionIdData>> topicData = new ArrayList<>();
+            for (Map.Entry<Uuid, List<Integer>> topicEntry : 
groupEntry.getValue().entrySet()) {
+                topicData.add(
+                    new TopicData<>(
+                        topicEntry.getKey(),
+                        
topicEntry.getValue().stream().map(PartitionFactory::newPartitionIdData).toList()
+                    )
+                );
+            }

Review Comment:
   I wonder whether it would make sense to directly return the data structure 
that the persister needs from the group metadata manager. We could avoid all 
those conversions.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -822,21 +834,60 @@ public 
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
         });
 
         groupsByTopicPartition.forEach((topicPartition, groupList) -> {
-            
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
-                runtime.scheduleWriteOperation(
-                    "delete-groups",
-                    topicPartition,
-                    Duration.ofMillis(config.offsetCommitTimeoutMs()),
-                    coordinator -> coordinator.deleteGroups(context, groupList)
-                ).exceptionally(exception -> handleOperationException(
-                    "delete-groups",
-                    groupList,
-                    exception,
-                    (error, __) -> 
DeleteGroupsRequest.getErrorResultCollection(groupList, error),
-                    log
-                ));
-
-            futures.add(future);
+            
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
shareFuture = deleteShareGroups(topicPartition, groupList)

Review Comment:
   It would be great if you could put an comment explaining the main flow here.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -822,21 +834,60 @@ public 
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
         });
 
         groupsByTopicPartition.forEach((topicPartition, groupList) -> {
-            
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
-                runtime.scheduleWriteOperation(
-                    "delete-groups",
-                    topicPartition,
-                    Duration.ofMillis(config.offsetCommitTimeoutMs()),
-                    coordinator -> coordinator.deleteGroups(context, groupList)
-                ).exceptionally(exception -> handleOperationException(
-                    "delete-groups",
-                    groupList,
-                    exception,
-                    (error, __) -> 
DeleteGroupsRequest.getErrorResultCollection(groupList, error),
-                    log
-                ));
-
-            futures.add(future);
+            
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
shareFuture = deleteShareGroups(topicPartition, groupList)
+                .thenCompose(groupErrMap -> {
+                    DeleteGroupsResponseData.DeletableGroupResultCollection 
collection = new DeleteGroupsResponseData.DeletableGroupResultCollection();
+                    List<String> errGroupIds = new ArrayList<>();
+                    groupErrMap.forEach((groupId, error) -> {
+                        if (error.code() != Errors.NONE.code()) {
+                            log.error("Error deleting share group {} due to 
error {}", groupId, error);
+                            errGroupIds.add(groupId);
+                            collection.add(
+                                new 
DeleteGroupsResponseData.DeletableGroupResult()
+                                    .setGroupId(groupId)
+                                    .setErrorCode(error.code())
+                            );
+                        }
+                    });
+
+                    Set<String> groupSet = new HashSet<>(groupList);
+                    // Remove all share group ids which have errored out
+                    // when deleting with persister.
+                    errGroupIds.forEach(groupSet::remove);
+
+                    // If no non-share groupIds or non-error share group ids 
present
+                    // return.
+                    if (groupSet.isEmpty()) {
+                        return CompletableFuture.completedFuture(collection);
+                    }
+
+                    // Let us invoke the standard procedure of any non-share
+                    // groups or successfully deleted share groups remaining.
+                    List<String> retainedGroupIds = groupSet.stream().toList();
+                    return runtime.scheduleWriteOperation(

Review Comment:
   nit: It may be worth extracting this into a method to reduce the code in the 
`thenCompose`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -822,21 +834,60 @@ public 
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
         });
 
         groupsByTopicPartition.forEach((topicPartition, groupList) -> {
-            
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
-                runtime.scheduleWriteOperation(
-                    "delete-groups",
-                    topicPartition,
-                    Duration.ofMillis(config.offsetCommitTimeoutMs()),
-                    coordinator -> coordinator.deleteGroups(context, groupList)
-                ).exceptionally(exception -> handleOperationException(
-                    "delete-groups",
-                    groupList,
-                    exception,
-                    (error, __) -> 
DeleteGroupsRequest.getErrorResultCollection(groupList, error),
-                    log
-                ));
-
-            futures.add(future);
+            
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
shareFuture = deleteShareGroups(topicPartition, groupList)

Review Comment:
   nit: future? shareFuture is not quite right because it is the result of the 
entire deletion.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -1627,6 +1637,374 @@ public void testDeleteGroups() throws Exception {
         assertEquals(expectedResultCollection, future.get());
     }
 
+    @Test
+    public void testDeleteWithShareGroups() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(Persister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setMetrics(mock(GroupCoordinatorMetrics.class))
+            .setPersister(persister)
+            .build();
+        service.startup(() -> 3);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection1 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        // share group
+        DeleteGroupsResponseData.DeletableGroupResult result1 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("share-group-id-1");
+        resultCollection1.add(result1);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection2 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        // non-share group
+        DeleteGroupsResponseData.DeletableGroupResult result2 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("group-id-2");
+        resultCollection2.add(result2);
+
+        // null
+        DeleteGroupsResponseData.DeletableGroupResult result3 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId(null)
+            .setErrorCode(Errors.INVALID_GROUP_ID.code());
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        expectedResultCollection.addAll(List.of(
+                result3.duplicate(),
+                result2.duplicate(),
+                result1.duplicate()
+            )
+        );
+
+        Uuid shareGroupTopicId = Uuid.randomUuid();
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("delete-share-groups"),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.any()
+        ))
+            .thenReturn(CompletableFuture.completedFuture(
+                    Map.of(
+                        "share-group-id-1",
+                        Map.of(
+                            shareGroupTopicId,
+                            List.of(0, 1)
+                        )
+                    )
+                )
+            )

Review Comment:
   This looks weird. The indentation is also incorrect. There are many such 
cases in this file. I let you go through them.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -6280,6 +6281,29 @@ public void createGroupTombstoneRecords(
         group.createGroupTombstoneRecords(records);
     }
 
+    /**
+     * Returns all share partitions keys as a map from the input list of share 
groups.
+     * @param shareGroups - A list representing share groups.
+     * @return Map representing the share partition keys for all the groups in 
the input.
+     */
+    public Map<String, Map<Uuid, List<Integer>>> 
sharePartitionKeysMap(List<ShareGroup> shareGroups) {
+        Map<String, Map<Uuid, List<Integer>>> keyMap = new HashMap<>();
+        if (metadataImage == null) {
+            return Map.of();
+        }
+        TopicsImage topicsImage = metadataImage.topics();
+        for (ShareGroup shareGroup : shareGroups) {
+            String groupId = shareGroup.groupId();
+            for (String topic : shareGroup.subscribedTopicNames().keySet()) {
+                TopicImage topicImage = topicsImage.getTopic(topic);

Review Comment:
   The topic may not exist any more. For my understanding, how do we handle 
this case? Does the share coordinator deletes offsets of deleted topics?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -476,6 +479,33 @@ public 
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection
         return new CoordinatorResult<>(records, resultCollection);
     }
 
+    /**
+     * Method returns all share partition keys corresponding to a list of 
groupIds.
+     * The groupIds are first filtered by type to restrict the list to share 
groups.
+     * @param groupIds - A list of groupIds as string
+     * @param committedOffset - The last committedOffset for the internal 
topic partition
+     * @return A map representing the share partition structure.
+     */
+    public Map<String, Map<Uuid, List<Integer>>> sharePartitions(List<String> 
groupIds, long committedOffset) {
+        List<ShareGroup> shareGroups = new ArrayList<>();
+        for (String groupId : groupIds) {

Review Comment:
   nit: With the current structure of the code, you iterate on the group ids 
twice. It may be better to directly get the share partitions of the share group 
when you have it in hand.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -476,6 +479,33 @@ public 
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection
         return new CoordinatorResult<>(records, resultCollection);
     }
 
+    /**
+     * Method returns all share partition keys corresponding to a list of 
groupIds.
+     * The groupIds are first filtered by type to restrict the list to share 
groups.
+     * @param groupIds - A list of groupIds as string
+     * @param committedOffset - The last committedOffset for the internal 
topic partition
+     * @return A map representing the share partition structure.
+     */
+    public Map<String, Map<Uuid, List<Integer>>> sharePartitions(List<String> 
groupIds, long committedOffset) {
+        List<ShareGroup> shareGroups = new ArrayList<>();
+        for (String groupId : groupIds) {
+            try {
+                Group group = groupMetadataManager.group(groupId);
+                if (group instanceof ShareGroup) {
+                    shareGroups.add((ShareGroup) group);
+                }
+            } catch (ApiException exception) {
+                // We needn't do anything more than logging here as 
deleteGroups
+                // method is handling these cases.
+                // Even if some groups cannot be found, we
+                // must check the entire list.
+                log.error("Failed to find group {}", groupId, exception);

Review Comment:
   nit: I would remove this one because it may be spammy.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -476,6 +479,33 @@ public 
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection
         return new CoordinatorResult<>(records, resultCollection);
     }
 
+    /**
+     * Method returns all share partition keys corresponding to a list of 
groupIds.
+     * The groupIds are first filtered by type to restrict the list to share 
groups.
+     * @param groupIds - A list of groupIds as string
+     * @param committedOffset - The last committedOffset for the internal 
topic partition
+     * @return A map representing the share partition structure.
+     */
+    public Map<String, Map<Uuid, List<Integer>>> sharePartitions(List<String> 
groupIds, long committedOffset) {
+        List<ShareGroup> shareGroups = new ArrayList<>();
+        for (String groupId : groupIds) {
+            try {
+                Group group = groupMetadataManager.group(groupId);
+                if (group instanceof ShareGroup) {
+                    shareGroups.add((ShareGroup) group);
+                }
+            } catch (ApiException exception) {

Review Comment:
   Could we be specific about the exception that we handle here? I suppose that 
we care about the group id not found.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -845,6 +896,113 @@ public 
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
             (accumulator, newResults) -> newResults.forEach(result -> 
accumulator.add(result.duplicate())));
     }
 
+    private CompletableFuture<Map<String, Errors>> 
deleteShareGroups(TopicPartition topicPartition, List<String> groupList) {
+        // topicPartition refers to internal topic __consumer_offsets
+        return runtime.scheduleReadOperation(
+                "delete-share-groups",
+                topicPartition,
+                (coordinator, offset) -> 
coordinator.sharePartitions(groupList, offset)
+            )
+            .thenCompose(this::performShareGroupsDeletion)
+            .exceptionally(exception -> handleOperationException(
+                "delete-share-groups",
+                groupList,
+                exception,
+                (error, __) -> {
+                    Map<String, Errors> errors = new HashMap<>();
+                    groupList.forEach(group -> errors.put(group, error));
+                    return errors;
+                },
+                log
+            ));
+    }
+
+    private CompletableFuture<Map<String, Errors>> performShareGroupsDeletion(
+        Map<String, Map<Uuid, List<Integer>>> keys
+    ) {
+        List<CompletableFuture<AbstractMap.SimpleEntry<String, 
DeleteShareGroupStateResult>>> futures = new ArrayList<>();
+        for (Map.Entry<String, Map<Uuid, List<Integer>>> groupEntry : 
keys.entrySet()) {
+            List<TopicData<PartitionIdData>> topicData = new ArrayList<>();
+            for (Map.Entry<Uuid, List<Integer>> topicEntry : 
groupEntry.getValue().entrySet()) {
+                topicData.add(
+                    new TopicData<>(
+                        topicEntry.getKey(),
+                        
topicEntry.getValue().stream().map(PartitionFactory::newPartitionIdData).toList()
+                    )
+                );
+            }
+
+            futures.add(deleteShareGroup(groupEntry.getKey(), topicData));
+        }
+
+        return persisterDeleteToGroupIdErrorMap(futures);
+    }
+
+    private CompletableFuture<AbstractMap.SimpleEntry<String, 
DeleteShareGroupStateResult>> deleteShareGroup(
+        String groupId,
+        List<TopicData<PartitionIdData>> topicData
+    ) {
+        return persister.deleteState(
+                new DeleteShareGroupStateParameters.Builder()

Review Comment:
   nit: Indentation seems to be off here too.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -6280,6 +6281,29 @@ public void createGroupTombstoneRecords(
         group.createGroupTombstoneRecords(records);
     }
 
+    /**
+     * Returns all share partitions keys as a map from the input list of share 
groups.
+     * @param shareGroups - A list representing share groups.
+     * @return Map representing the share partition keys for all the groups in 
the input.
+     */
+    public Map<String, Map<Uuid, List<Integer>>> 
sharePartitionKeysMap(List<ShareGroup> shareGroups) {
+        Map<String, Map<Uuid, List<Integer>>> keyMap = new HashMap<>();
+        if (metadataImage == null) {
+            return Map.of();
+        }

Review Comment:
   nit: metadata image is never null.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -822,21 +834,60 @@ public 
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
         });
 
         groupsByTopicPartition.forEach((topicPartition, groupList) -> {
-            
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
-                runtime.scheduleWriteOperation(
-                    "delete-groups",
-                    topicPartition,
-                    Duration.ofMillis(config.offsetCommitTimeoutMs()),
-                    coordinator -> coordinator.deleteGroups(context, groupList)
-                ).exceptionally(exception -> handleOperationException(
-                    "delete-groups",
-                    groupList,
-                    exception,
-                    (error, __) -> 
DeleteGroupsRequest.getErrorResultCollection(groupList, error),
-                    log
-                ));
-
-            futures.add(future);
+            
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
shareFuture = deleteShareGroups(topicPartition, groupList)
+                .thenCompose(groupErrMap -> {
+                    DeleteGroupsResponseData.DeletableGroupResultCollection 
collection = new DeleteGroupsResponseData.DeletableGroupResultCollection();
+                    List<String> errGroupIds = new ArrayList<>();
+                    groupErrMap.forEach((groupId, error) -> {
+                        if (error.code() != Errors.NONE.code()) {

Review Comment:
   What kind of errors can we get here? Are they all expected/allowed by the 
DeleteGroups API?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -845,6 +896,113 @@ public 
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
             (accumulator, newResults) -> newResults.forEach(result -> 
accumulator.add(result.duplicate())));
     }
 
+    private CompletableFuture<Map<String, Errors>> 
deleteShareGroups(TopicPartition topicPartition, List<String> groupList) {
+        // topicPartition refers to internal topic __consumer_offsets
+        return runtime.scheduleReadOperation(
+                "delete-share-groups",
+                topicPartition,
+                (coordinator, offset) -> 
coordinator.sharePartitions(groupList, offset)
+            )
+            .thenCompose(this::performShareGroupsDeletion)
+            .exceptionally(exception -> handleOperationException(
+                "delete-share-groups",
+                groupList,
+                exception,
+                (error, __) -> {
+                    Map<String, Errors> errors = new HashMap<>();
+                    groupList.forEach(group -> errors.put(group, error));
+                    return errors;
+                },
+                log
+            ));
+    }
+
+    private CompletableFuture<Map<String, Errors>> performShareGroupsDeletion(
+        Map<String, Map<Uuid, List<Integer>>> keys
+    ) {
+        List<CompletableFuture<AbstractMap.SimpleEntry<String, 
DeleteShareGroupStateResult>>> futures = new ArrayList<>();
+        for (Map.Entry<String, Map<Uuid, List<Integer>>> groupEntry : 
keys.entrySet()) {
+            List<TopicData<PartitionIdData>> topicData = new ArrayList<>();
+            for (Map.Entry<Uuid, List<Integer>> topicEntry : 
groupEntry.getValue().entrySet()) {
+                topicData.add(
+                    new TopicData<>(
+                        topicEntry.getKey(),
+                        
topicEntry.getValue().stream().map(PartitionFactory::newPartitionIdData).toList()
+                    )
+                );
+            }
+
+            futures.add(deleteShareGroup(groupEntry.getKey(), topicData));
+        }
+
+        return persisterDeleteToGroupIdErrorMap(futures);
+    }
+
+    private CompletableFuture<AbstractMap.SimpleEntry<String, 
DeleteShareGroupStateResult>> deleteShareGroup(
+        String groupId,
+        List<TopicData<PartitionIdData>> topicData
+    ) {
+        return persister.deleteState(
+                new DeleteShareGroupStateParameters.Builder()
+                    .setGroupTopicPartitionData(new 
GroupTopicPartitionData.Builder<PartitionIdData>()
+                        .setGroupId(groupId)
+                        .setTopicsData(topicData)
+                        .build()
+                    )
+                    .build()
+            )
+            .thenCompose(result -> CompletableFuture.completedFuture(new 
AbstractMap.SimpleEntry<>(groupId, result)))
+            .exceptionally(exception -> {
+                // In case the deleteState call fails,
+                // we should construct the appropriate response here
+                // so that the subsequent callbacks don't see runtime 
exceptions.
+                log.error("Unable to delete share group partition(s) - {}, 
{}", groupId, topicData);
+                List<TopicData<PartitionErrorData>> respTopicData = 
topicData.stream()
+                    .map(reqTopicData -> new TopicData<>(
+                            reqTopicData.topicId(),
+                            reqTopicData.partitions().stream()
+                                .map(reqPartData -> {
+                                    Errors err = 
Errors.forException(exception);
+                                    return 
PartitionFactory.newPartitionErrorData(reqPartData.partition(), err.code(), 
err.message());
+                                })
+                                .toList()
+                        )
+                    )
+                    .toList();
+
+                return new AbstractMap.SimpleEntry<>(groupId, new 
DeleteShareGroupStateResult.Builder()
+                    .setTopicsData(respTopicData)
+                    .build()
+                );
+            });
+    }
+
+    private CompletableFuture<Map<String, Errors>> 
persisterDeleteToGroupIdErrorMap(
+        List<CompletableFuture<AbstractMap.SimpleEntry<String, 
DeleteShareGroupStateResult>>> futures
+    ) {
+        return CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[]{}))
+            .thenCompose(v -> {
+                Map<String, Errors> groupIds = new HashMap<>();
+                for (CompletableFuture<AbstractMap.SimpleEntry<String, 
DeleteShareGroupStateResult>> future : futures) {
+                    Map.Entry<String, DeleteShareGroupStateResult> entry = 
future.getNow(null);  // safe as within allOff
+                    groupIds.putIfAbsent(entry.getKey(), Errors.NONE);
+                    for (TopicData<PartitionErrorData> topicData : 
entry.getValue().topicsData()) {
+                        Optional<PartitionErrorData> errItem = 
topicData.partitions().stream()
+                            .filter(errData -> errData.errorCode() != 
Errors.NONE.code())
+                            .findAny();
+
+                        errItem.ifPresent(val -> {
+                                log.error("Received error while deleting share 
group {} - {}", entry.getKey(), val);
+                                groupIds.put(entry.getKey(), 
Errors.forCode(val.errorCode()));
+                            }
+                        );

Review Comment:
   nit: Format is incorrect here too.
   
   ```
   errItem.ifPresent(val -> {
       log.error("Received error while deleting share group {} - {}", 
entry.getKey(), val);
       groupIds.put(entry.getKey(), Errors.forCode(val.errorCode()));
   });
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -822,21 +834,60 @@ public 
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
         });
 
         groupsByTopicPartition.forEach((topicPartition, groupList) -> {
-            
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
-                runtime.scheduleWriteOperation(
-                    "delete-groups",
-                    topicPartition,
-                    Duration.ofMillis(config.offsetCommitTimeoutMs()),
-                    coordinator -> coordinator.deleteGroups(context, groupList)
-                ).exceptionally(exception -> handleOperationException(
-                    "delete-groups",
-                    groupList,
-                    exception,
-                    (error, __) -> 
DeleteGroupsRequest.getErrorResultCollection(groupList, error),
-                    log
-                ));
-
-            futures.add(future);
+            
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
shareFuture = deleteShareGroups(topicPartition, groupList)
+                .thenCompose(groupErrMap -> {
+                    DeleteGroupsResponseData.DeletableGroupResultCollection 
collection = new DeleteGroupsResponseData.DeletableGroupResultCollection();
+                    List<String> errGroupIds = new ArrayList<>();
+                    groupErrMap.forEach((groupId, error) -> {
+                        if (error.code() != Errors.NONE.code()) {
+                            log.error("Error deleting share group {} due to 
error {}", groupId, error);
+                            errGroupIds.add(groupId);
+                            collection.add(
+                                new 
DeleteGroupsResponseData.DeletableGroupResult()
+                                    .setGroupId(groupId)
+                                    .setErrorCode(error.code())
+                            );
+                        }
+                    });
+
+                    Set<String> groupSet = new HashSet<>(groupList);
+                    // Remove all share group ids which have errored out
+                    // when deleting with persister.
+                    errGroupIds.forEach(groupSet::remove);

Review Comment:
   nit: removeAll?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -822,21 +834,60 @@ public 
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
         });
 
         groupsByTopicPartition.forEach((topicPartition, groupList) -> {
-            
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
-                runtime.scheduleWriteOperation(
-                    "delete-groups",
-                    topicPartition,
-                    Duration.ofMillis(config.offsetCommitTimeoutMs()),
-                    coordinator -> coordinator.deleteGroups(context, groupList)
-                ).exceptionally(exception -> handleOperationException(
-                    "delete-groups",
-                    groupList,
-                    exception,
-                    (error, __) -> 
DeleteGroupsRequest.getErrorResultCollection(groupList, error),
-                    log
-                ));
-
-            futures.add(future);
+            
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
shareFuture = deleteShareGroups(topicPartition, groupList)
+                .thenCompose(groupErrMap -> {
+                    DeleteGroupsResponseData.DeletableGroupResultCollection 
collection = new DeleteGroupsResponseData.DeletableGroupResultCollection();
+                    List<String> errGroupIds = new ArrayList<>();
+                    groupErrMap.forEach((groupId, error) -> {
+                        if (error.code() != Errors.NONE.code()) {
+                            log.error("Error deleting share group {} due to 
error {}", groupId, error);
+                            errGroupIds.add(groupId);
+                            collection.add(
+                                new 
DeleteGroupsResponseData.DeletableGroupResult()
+                                    .setGroupId(groupId)
+                                    .setErrorCode(error.code())
+                            );
+                        }
+                    });
+
+                    Set<String> groupSet = new HashSet<>(groupList);
+                    // Remove all share group ids which have errored out
+                    // when deleting with persister.
+                    errGroupIds.forEach(groupSet::remove);
+
+                    // If no non-share groupIds or non-error share group ids 
present
+                    // return.
+                    if (groupSet.isEmpty()) {
+                        return CompletableFuture.completedFuture(collection);
+                    }
+
+                    // Let us invoke the standard procedure of any non-share
+                    // groups or successfully deleted share groups remaining.
+                    List<String> retainedGroupIds = groupSet.stream().toList();
+                    return runtime.scheduleWriteOperation(
+                            "delete-groups",
+                            topicPartition,
+                            Duration.ofMillis(config.offsetCommitTimeoutMs()),
+                            coordinator -> coordinator.deleteGroups(context, 
retainedGroupIds)
+                        ).thenApply(deletedCollection -> {
+                            deletedCollection.forEach(item -> 
collection.add(item.duplicate()));
+                            return collection;
+                        })
+                        .exceptionally(exception -> handleOperationException(
+                            "delete-groups",
+                            groupList,
+                            exception,
+                            (error, __) -> {
+                                
DeleteGroupsRequest.getErrorResultCollection(retainedGroupIds, 
error).forEach(item -> collection.add(item.duplicate()));
+                                return collection;

Review Comment:
   I don't like the fact that we add to collection in two places (L873 and 
L881). This is error prone. Could we handle it at the end for both?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -822,21 +834,60 @@ public 
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
         });
 
         groupsByTopicPartition.forEach((topicPartition, groupList) -> {
-            
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
-                runtime.scheduleWriteOperation(
-                    "delete-groups",
-                    topicPartition,
-                    Duration.ofMillis(config.offsetCommitTimeoutMs()),
-                    coordinator -> coordinator.deleteGroups(context, groupList)
-                ).exceptionally(exception -> handleOperationException(
-                    "delete-groups",
-                    groupList,
-                    exception,
-                    (error, __) -> 
DeleteGroupsRequest.getErrorResultCollection(groupList, error),
-                    log
-                ));
-
-            futures.add(future);
+            
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
shareFuture = deleteShareGroups(topicPartition, groupList)
+                .thenCompose(groupErrMap -> {
+                    DeleteGroupsResponseData.DeletableGroupResultCollection 
collection = new DeleteGroupsResponseData.DeletableGroupResultCollection();
+                    List<String> errGroupIds = new ArrayList<>();
+                    groupErrMap.forEach((groupId, error) -> {
+                        if (error.code() != Errors.NONE.code()) {
+                            log.error("Error deleting share group {} due to 
error {}", groupId, error);
+                            errGroupIds.add(groupId);
+                            collection.add(
+                                new 
DeleteGroupsResponseData.DeletableGroupResult()
+                                    .setGroupId(groupId)
+                                    .setErrorCode(error.code())
+                            );
+                        }
+                    });
+
+                    Set<String> groupSet = new HashSet<>(groupList);
+                    // Remove all share group ids which have errored out
+                    // when deleting with persister.
+                    errGroupIds.forEach(groupSet::remove);
+
+                    // If no non-share groupIds or non-error share group ids 
present
+                    // return.
+                    if (groupSet.isEmpty()) {
+                        return CompletableFuture.completedFuture(collection);
+                    }
+
+                    // Let us invoke the standard procedure of any non-share
+                    // groups or successfully deleted share groups remaining.
+                    List<String> retainedGroupIds = groupSet.stream().toList();
+                    return runtime.scheduleWriteOperation(
+                            "delete-groups",

Review Comment:
   nit: Indentation is incorrect here too.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -16294,6 +16299,76 @@ barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3)
         );
     }
 
+    @Test
+    public void testSharePartitionKeyMap() {
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        assignor.prepareGroupAssignment(new 
GroupAssignment(Collections.emptyMap()));
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            .build();
+
+        MetadataImage image = mock(MetadataImage.class);
+        TopicsImage topicsImage = mock(TopicsImage.class);
+        TopicImage t1image = mock(TopicImage.class);
+        TopicImage t2image = mock(TopicImage.class);
+        when(topicsImage.getTopic(anyString()))
+            .thenReturn(t1image)
+            .thenReturn(t2image);
+
+        ShareGroup shareGroup = mock(ShareGroup.class);
+        when(shareGroup.subscribedTopicNames())
+            .thenReturn(Map.of(
+                    "t1", mock(SubscriptionCount.class),
+                    "t2", mock(SubscriptionCount.class)
+                )
+            );

Review Comment:
   nit: I would format it as follow:
   
   ```
   when(shareGroup.subscribedTopicNames()).thenReturn(Map.of(
       "t1", mock(SubscriptionCount.class),
       "t2", mock(SubscriptionCount.class)
   ));
   
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -1627,6 +1637,374 @@ public void testDeleteGroups() throws Exception {
         assertEquals(expectedResultCollection, future.get());
     }
 
+    @Test
+    public void testDeleteWithShareGroups() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(Persister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setMetrics(mock(GroupCoordinatorMetrics.class))
+            .setPersister(persister)
+            .build();
+        service.startup(() -> 3);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection1 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        // share group
+        DeleteGroupsResponseData.DeletableGroupResult result1 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("share-group-id-1");
+        resultCollection1.add(result1);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection2 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        // non-share group
+        DeleteGroupsResponseData.DeletableGroupResult result2 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("group-id-2");
+        resultCollection2.add(result2);
+
+        // null
+        DeleteGroupsResponseData.DeletableGroupResult result3 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId(null)
+            .setErrorCode(Errors.INVALID_GROUP_ID.code());
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        expectedResultCollection.addAll(List.of(
+                result3.duplicate(),
+                result2.duplicate(),
+                result1.duplicate()
+            )
+        );

Review Comment:
   nit:
   
   ```
   expectedResultCollection.addAll(List.of(
       result3.duplicate(),
       result2.duplicate(),
       result1.duplicate()
   ));
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -16294,6 +16299,76 @@ barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3)
         );
     }
 
+    @Test
+    public void testSharePartitionKeyMap() {
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        assignor.prepareGroupAssignment(new 
GroupAssignment(Collections.emptyMap()));
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            .build();
+
+        MetadataImage image = mock(MetadataImage.class);
+        TopicsImage topicsImage = mock(TopicsImage.class);
+        TopicImage t1image = mock(TopicImage.class);
+        TopicImage t2image = mock(TopicImage.class);
+        when(topicsImage.getTopic(anyString()))
+            .thenReturn(t1image)
+            .thenReturn(t2image);

Review Comment:
   It may be easier to just build the MetadataImage that you need rather than 
mocking it. We have `MetadataImageBuilder` which is pretty handy for it.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -845,6 +896,113 @@ public 
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
             (accumulator, newResults) -> newResults.forEach(result -> 
accumulator.add(result.duplicate())));
     }
 
+    private CompletableFuture<Map<String, Errors>> 
deleteShareGroups(TopicPartition topicPartition, List<String> groupList) {
+        // topicPartition refers to internal topic __consumer_offsets
+        return runtime.scheduleReadOperation(
+                "delete-share-groups",
+                topicPartition,
+                (coordinator, offset) -> 
coordinator.sharePartitions(groupList, offset)
+            )
+            .thenCompose(this::performShareGroupsDeletion)
+            .exceptionally(exception -> handleOperationException(
+                "delete-share-groups",
+                groupList,
+                exception,
+                (error, __) -> {
+                    Map<String, Errors> errors = new HashMap<>();
+                    groupList.forEach(group -> errors.put(group, error));
+                    return errors;
+                },
+                log
+            ));
+    }
+
+    private CompletableFuture<Map<String, Errors>> performShareGroupsDeletion(
+        Map<String, Map<Uuid, List<Integer>>> keys
+    ) {
+        List<CompletableFuture<AbstractMap.SimpleEntry<String, 
DeleteShareGroupStateResult>>> futures = new ArrayList<>();
+        for (Map.Entry<String, Map<Uuid, List<Integer>>> groupEntry : 
keys.entrySet()) {
+            List<TopicData<PartitionIdData>> topicData = new ArrayList<>();
+            for (Map.Entry<Uuid, List<Integer>> topicEntry : 
groupEntry.getValue().entrySet()) {
+                topicData.add(
+                    new TopicData<>(
+                        topicEntry.getKey(),
+                        
topicEntry.getValue().stream().map(PartitionFactory::newPartitionIdData).toList()
+                    )
+                );
+            }
+
+            futures.add(deleteShareGroup(groupEntry.getKey(), topicData));
+        }
+
+        return persisterDeleteToGroupIdErrorMap(futures);
+    }
+
+    private CompletableFuture<AbstractMap.SimpleEntry<String, 
DeleteShareGroupStateResult>> deleteShareGroup(
+        String groupId,
+        List<TopicData<PartitionIdData>> topicData
+    ) {
+        return persister.deleteState(
+                new DeleteShareGroupStateParameters.Builder()
+                    .setGroupTopicPartitionData(new 
GroupTopicPartitionData.Builder<PartitionIdData>()
+                        .setGroupId(groupId)
+                        .setTopicsData(topicData)
+                        .build()
+                    )
+                    .build()
+            )
+            .thenCompose(result -> CompletableFuture.completedFuture(new 
AbstractMap.SimpleEntry<>(groupId, result)))
+            .exceptionally(exception -> {
+                // In case the deleteState call fails,
+                // we should construct the appropriate response here
+                // so that the subsequent callbacks don't see runtime 
exceptions.
+                log.error("Unable to delete share group partition(s) - {}, 
{}", groupId, topicData);
+                List<TopicData<PartitionErrorData>> respTopicData = 
topicData.stream()
+                    .map(reqTopicData -> new TopicData<>(
+                            reqTopicData.topicId(),
+                            reqTopicData.partitions().stream()
+                                .map(reqPartData -> {
+                                    Errors err = 
Errors.forException(exception);
+                                    return 
PartitionFactory.newPartitionErrorData(reqPartData.partition(), err.code(), 
err.message());
+                                })
+                                .toList()
+                        )
+                    )
+                    .toList();
+
+                return new AbstractMap.SimpleEntry<>(groupId, new 
DeleteShareGroupStateResult.Builder()
+                    .setTopicsData(respTopicData)
+                    .build()
+                );
+            });
+    }
+
+    private CompletableFuture<Map<String, Errors>> 
persisterDeleteToGroupIdErrorMap(
+        List<CompletableFuture<AbstractMap.SimpleEntry<String, 
DeleteShareGroupStateResult>>> futures
+    ) {
+        return CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[]{}))
+            .thenCompose(v -> {

Review Comment:
   nit: We would usually format it as follow:
   
   ```
   return CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[]{})).thenCompose(v -> {
     ...
   });



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -16294,6 +16299,76 @@ barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3)
         );
     }
 
+    @Test
+    public void testSharePartitionKeyMap() {
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        assignor.prepareGroupAssignment(new 
GroupAssignment(Collections.emptyMap()));
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            .build();
+
+        MetadataImage image = mock(MetadataImage.class);
+        TopicsImage topicsImage = mock(TopicsImage.class);
+        TopicImage t1image = mock(TopicImage.class);
+        TopicImage t2image = mock(TopicImage.class);
+        when(topicsImage.getTopic(anyString()))
+            .thenReturn(t1image)
+            .thenReturn(t2image);
+
+        ShareGroup shareGroup = mock(ShareGroup.class);
+        when(shareGroup.subscribedTopicNames())
+            .thenReturn(Map.of(
+                    "t1", mock(SubscriptionCount.class),
+                    "t2", mock(SubscriptionCount.class)
+                )
+            );
+
+        when(shareGroup.groupId())
+            .thenReturn("share-group");
+        when(image.topics())
+            .thenReturn(topicsImage);
+        when(image.provenance())
+            .thenReturn(new MetadataProvenance(-1, -1, -1, true));
+
+        when(t1image.partitions())
+            .thenReturn(
+                Map.of(
+                    0, mock(PartitionRegistration.class),
+                    1, mock(PartitionRegistration.class)
+                )
+            );
+        Uuid t1Uuid = Uuid.randomUuid();
+        when(t1image.id()).thenReturn(t1Uuid);
+
+        when(t2image.partitions())
+            .thenReturn(
+                Map.of(
+                    0, mock(PartitionRegistration.class),
+                    1, mock(PartitionRegistration.class)
+                )
+            );
+        Uuid t2Uuid = Uuid.randomUuid();
+        when(t2image.id()).thenReturn(t2Uuid);
+
+        context.groupMetadataManager.onNewMetadataImage(image, 
mock(MetadataDelta.class));
+        Map<String, Map<Uuid, List<Integer>>> keyMap = 
context.groupMetadataManager.sharePartitionKeysMap(List.of(shareGroup));
+        assertEquals(1, keyMap.size());
+        assertEquals(2, keyMap.get("share-group").size());
+        for (Uuid topic : List.of(t1Uuid, t2Uuid)) {
+            assertEquals(2, keyMap.get("share-group").get(topic).size());
+            assertTrue(keyMap.get("share-group").get(topic).contains(0));
+            assertTrue(keyMap.get("share-group").get(topic).contains(1));
+        }

Review Comment:
   We usually prefer to build the expected map and to use 
assertEquals(expectedMap, actualMap).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to