dajac commented on code in PR #18848: URL: https://github.com/apache/kafka/pull/18848#discussion_r1954063627
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -845,6 +896,113 @@ public CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection (accumulator, newResults) -> newResults.forEach(result -> accumulator.add(result.duplicate()))); } + private CompletableFuture<Map<String, Errors>> deleteShareGroups(TopicPartition topicPartition, List<String> groupList) { + // topicPartition refers to internal topic __consumer_offsets + return runtime.scheduleReadOperation( + "delete-share-groups", + topicPartition, + (coordinator, offset) -> coordinator.sharePartitions(groupList, offset) + ) Review Comment: nit: The indentation is off here. We usually use 4 spaces for the arguments. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -845,6 +896,113 @@ public CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection (accumulator, newResults) -> newResults.forEach(result -> accumulator.add(result.duplicate()))); } + private CompletableFuture<Map<String, Errors>> deleteShareGroups(TopicPartition topicPartition, List<String> groupList) { Review Comment: nit: Let's put one argument per line to follow the format of the other methods in this file. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -845,6 +896,113 @@ public CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection (accumulator, newResults) -> newResults.forEach(result -> accumulator.add(result.duplicate()))); } + private CompletableFuture<Map<String, Errors>> deleteShareGroups(TopicPartition topicPartition, List<String> groupList) { + // topicPartition refers to internal topic __consumer_offsets + return runtime.scheduleReadOperation( Review Comment: For consistency reason, I suggest to use a write operation to ensure that you read the last state. Otherwise, there is a change that you have a share group non-committed yet and you would not see it with a read. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -845,6 +896,113 @@ public CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection (accumulator, newResults) -> newResults.forEach(result -> accumulator.add(result.duplicate()))); } + private CompletableFuture<Map<String, Errors>> deleteShareGroups(TopicPartition topicPartition, List<String> groupList) { + // topicPartition refers to internal topic __consumer_offsets + return runtime.scheduleReadOperation( + "delete-share-groups", + topicPartition, + (coordinator, offset) -> coordinator.sharePartitions(groupList, offset) + ) + .thenCompose(this::performShareGroupsDeletion) + .exceptionally(exception -> handleOperationException( + "delete-share-groups", + groupList, + exception, + (error, __) -> { + Map<String, Errors> errors = new HashMap<>(); + groupList.forEach(group -> errors.put(group, error)); + return errors; + }, + log + )); + } + + private CompletableFuture<Map<String, Errors>> performShareGroupsDeletion( + Map<String, Map<Uuid, List<Integer>>> keys + ) { + List<CompletableFuture<AbstractMap.SimpleEntry<String, DeleteShareGroupStateResult>>> futures = new ArrayList<>(); + for (Map.Entry<String, Map<Uuid, List<Integer>>> groupEntry : keys.entrySet()) { + List<TopicData<PartitionIdData>> topicData = new ArrayList<>(); + for (Map.Entry<Uuid, List<Integer>> topicEntry : groupEntry.getValue().entrySet()) { + topicData.add( + new TopicData<>( + topicEntry.getKey(), + topicEntry.getValue().stream().map(PartitionFactory::newPartitionIdData).toList() + ) + ); + } Review Comment: I wonder whether it would make sense to directly return the data structure that the persister needs from the group metadata manager. We could avoid all those conversions. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -822,21 +834,60 @@ public CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection }); groupsByTopicPartition.forEach((topicPartition, groupList) -> { - CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> future = - runtime.scheduleWriteOperation( - "delete-groups", - topicPartition, - Duration.ofMillis(config.offsetCommitTimeoutMs()), - coordinator -> coordinator.deleteGroups(context, groupList) - ).exceptionally(exception -> handleOperationException( - "delete-groups", - groupList, - exception, - (error, __) -> DeleteGroupsRequest.getErrorResultCollection(groupList, error), - log - )); - - futures.add(future); + CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> shareFuture = deleteShareGroups(topicPartition, groupList) Review Comment: It would be great if you could put an comment explaining the main flow here. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -822,21 +834,60 @@ public CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection }); groupsByTopicPartition.forEach((topicPartition, groupList) -> { - CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> future = - runtime.scheduleWriteOperation( - "delete-groups", - topicPartition, - Duration.ofMillis(config.offsetCommitTimeoutMs()), - coordinator -> coordinator.deleteGroups(context, groupList) - ).exceptionally(exception -> handleOperationException( - "delete-groups", - groupList, - exception, - (error, __) -> DeleteGroupsRequest.getErrorResultCollection(groupList, error), - log - )); - - futures.add(future); + CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> shareFuture = deleteShareGroups(topicPartition, groupList) + .thenCompose(groupErrMap -> { + DeleteGroupsResponseData.DeletableGroupResultCollection collection = new DeleteGroupsResponseData.DeletableGroupResultCollection(); + List<String> errGroupIds = new ArrayList<>(); + groupErrMap.forEach((groupId, error) -> { + if (error.code() != Errors.NONE.code()) { + log.error("Error deleting share group {} due to error {}", groupId, error); + errGroupIds.add(groupId); + collection.add( + new DeleteGroupsResponseData.DeletableGroupResult() + .setGroupId(groupId) + .setErrorCode(error.code()) + ); + } + }); + + Set<String> groupSet = new HashSet<>(groupList); + // Remove all share group ids which have errored out + // when deleting with persister. + errGroupIds.forEach(groupSet::remove); + + // If no non-share groupIds or non-error share group ids present + // return. + if (groupSet.isEmpty()) { + return CompletableFuture.completedFuture(collection); + } + + // Let us invoke the standard procedure of any non-share + // groups or successfully deleted share groups remaining. + List<String> retainedGroupIds = groupSet.stream().toList(); + return runtime.scheduleWriteOperation( Review Comment: nit: It may be worth extracting this into a method to reduce the code in the `thenCompose`. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -822,21 +834,60 @@ public CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection }); groupsByTopicPartition.forEach((topicPartition, groupList) -> { - CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> future = - runtime.scheduleWriteOperation( - "delete-groups", - topicPartition, - Duration.ofMillis(config.offsetCommitTimeoutMs()), - coordinator -> coordinator.deleteGroups(context, groupList) - ).exceptionally(exception -> handleOperationException( - "delete-groups", - groupList, - exception, - (error, __) -> DeleteGroupsRequest.getErrorResultCollection(groupList, error), - log - )); - - futures.add(future); + CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> shareFuture = deleteShareGroups(topicPartition, groupList) Review Comment: nit: future? shareFuture is not quite right because it is the result of the entire deletion. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ########## @@ -1627,6 +1637,374 @@ public void testDeleteGroups() throws Exception { assertEquals(expectedResultCollection, future.get()); } + @Test + public void testDeleteWithShareGroups() throws Exception { + CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); + Persister persister = mock(Persister.class); + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() + .setConfig(createConfig()) + .setRuntime(runtime) + .setMetrics(mock(GroupCoordinatorMetrics.class)) + .setPersister(persister) + .build(); + service.startup(() -> 3); + + DeleteGroupsResponseData.DeletableGroupResultCollection resultCollection1 = + new DeleteGroupsResponseData.DeletableGroupResultCollection(); + // share group + DeleteGroupsResponseData.DeletableGroupResult result1 = new DeleteGroupsResponseData.DeletableGroupResult() + .setGroupId("share-group-id-1"); + resultCollection1.add(result1); + + DeleteGroupsResponseData.DeletableGroupResultCollection resultCollection2 = + new DeleteGroupsResponseData.DeletableGroupResultCollection(); + // non-share group + DeleteGroupsResponseData.DeletableGroupResult result2 = new DeleteGroupsResponseData.DeletableGroupResult() + .setGroupId("group-id-2"); + resultCollection2.add(result2); + + // null + DeleteGroupsResponseData.DeletableGroupResult result3 = new DeleteGroupsResponseData.DeletableGroupResult() + .setGroupId(null) + .setErrorCode(Errors.INVALID_GROUP_ID.code()); + + DeleteGroupsResponseData.DeletableGroupResultCollection expectedResultCollection = + new DeleteGroupsResponseData.DeletableGroupResultCollection(); + expectedResultCollection.addAll(List.of( + result3.duplicate(), + result2.duplicate(), + result1.duplicate() + ) + ); + + Uuid shareGroupTopicId = Uuid.randomUuid(); + when(runtime.scheduleReadOperation( + ArgumentMatchers.eq("delete-share-groups"), + ArgumentMatchers.any(), + ArgumentMatchers.any() + )) + .thenReturn(CompletableFuture.completedFuture( + Map.of( + "share-group-id-1", + Map.of( + shareGroupTopicId, + List.of(0, 1) + ) + ) + ) + ) Review Comment: This looks weird. The indentation is also incorrect. There are many such cases in this file. I let you go through them. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -6280,6 +6281,29 @@ public void createGroupTombstoneRecords( group.createGroupTombstoneRecords(records); } + /** + * Returns all share partitions keys as a map from the input list of share groups. + * @param shareGroups - A list representing share groups. + * @return Map representing the share partition keys for all the groups in the input. + */ + public Map<String, Map<Uuid, List<Integer>>> sharePartitionKeysMap(List<ShareGroup> shareGroups) { + Map<String, Map<Uuid, List<Integer>>> keyMap = new HashMap<>(); + if (metadataImage == null) { + return Map.of(); + } + TopicsImage topicsImage = metadataImage.topics(); + for (ShareGroup shareGroup : shareGroups) { + String groupId = shareGroup.groupId(); + for (String topic : shareGroup.subscribedTopicNames().keySet()) { + TopicImage topicImage = topicsImage.getTopic(topic); Review Comment: The topic may not exist any more. For my understanding, how do we handle this case? Does the share coordinator deletes offsets of deleted topics? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ########## @@ -476,6 +479,33 @@ public CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection return new CoordinatorResult<>(records, resultCollection); } + /** + * Method returns all share partition keys corresponding to a list of groupIds. + * The groupIds are first filtered by type to restrict the list to share groups. + * @param groupIds - A list of groupIds as string + * @param committedOffset - The last committedOffset for the internal topic partition + * @return A map representing the share partition structure. + */ + public Map<String, Map<Uuid, List<Integer>>> sharePartitions(List<String> groupIds, long committedOffset) { + List<ShareGroup> shareGroups = new ArrayList<>(); + for (String groupId : groupIds) { Review Comment: nit: With the current structure of the code, you iterate on the group ids twice. It may be better to directly get the share partitions of the share group when you have it in hand. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ########## @@ -476,6 +479,33 @@ public CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection return new CoordinatorResult<>(records, resultCollection); } + /** + * Method returns all share partition keys corresponding to a list of groupIds. + * The groupIds are first filtered by type to restrict the list to share groups. + * @param groupIds - A list of groupIds as string + * @param committedOffset - The last committedOffset for the internal topic partition + * @return A map representing the share partition structure. + */ + public Map<String, Map<Uuid, List<Integer>>> sharePartitions(List<String> groupIds, long committedOffset) { + List<ShareGroup> shareGroups = new ArrayList<>(); + for (String groupId : groupIds) { + try { + Group group = groupMetadataManager.group(groupId); + if (group instanceof ShareGroup) { + shareGroups.add((ShareGroup) group); + } + } catch (ApiException exception) { + // We needn't do anything more than logging here as deleteGroups + // method is handling these cases. + // Even if some groups cannot be found, we + // must check the entire list. + log.error("Failed to find group {}", groupId, exception); Review Comment: nit: I would remove this one because it may be spammy. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ########## @@ -476,6 +479,33 @@ public CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection return new CoordinatorResult<>(records, resultCollection); } + /** + * Method returns all share partition keys corresponding to a list of groupIds. + * The groupIds are first filtered by type to restrict the list to share groups. + * @param groupIds - A list of groupIds as string + * @param committedOffset - The last committedOffset for the internal topic partition + * @return A map representing the share partition structure. + */ + public Map<String, Map<Uuid, List<Integer>>> sharePartitions(List<String> groupIds, long committedOffset) { + List<ShareGroup> shareGroups = new ArrayList<>(); + for (String groupId : groupIds) { + try { + Group group = groupMetadataManager.group(groupId); + if (group instanceof ShareGroup) { + shareGroups.add((ShareGroup) group); + } + } catch (ApiException exception) { Review Comment: Could we be specific about the exception that we handle here? I suppose that we care about the group id not found. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -845,6 +896,113 @@ public CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection (accumulator, newResults) -> newResults.forEach(result -> accumulator.add(result.duplicate()))); } + private CompletableFuture<Map<String, Errors>> deleteShareGroups(TopicPartition topicPartition, List<String> groupList) { + // topicPartition refers to internal topic __consumer_offsets + return runtime.scheduleReadOperation( + "delete-share-groups", + topicPartition, + (coordinator, offset) -> coordinator.sharePartitions(groupList, offset) + ) + .thenCompose(this::performShareGroupsDeletion) + .exceptionally(exception -> handleOperationException( + "delete-share-groups", + groupList, + exception, + (error, __) -> { + Map<String, Errors> errors = new HashMap<>(); + groupList.forEach(group -> errors.put(group, error)); + return errors; + }, + log + )); + } + + private CompletableFuture<Map<String, Errors>> performShareGroupsDeletion( + Map<String, Map<Uuid, List<Integer>>> keys + ) { + List<CompletableFuture<AbstractMap.SimpleEntry<String, DeleteShareGroupStateResult>>> futures = new ArrayList<>(); + for (Map.Entry<String, Map<Uuid, List<Integer>>> groupEntry : keys.entrySet()) { + List<TopicData<PartitionIdData>> topicData = new ArrayList<>(); + for (Map.Entry<Uuid, List<Integer>> topicEntry : groupEntry.getValue().entrySet()) { + topicData.add( + new TopicData<>( + topicEntry.getKey(), + topicEntry.getValue().stream().map(PartitionFactory::newPartitionIdData).toList() + ) + ); + } + + futures.add(deleteShareGroup(groupEntry.getKey(), topicData)); + } + + return persisterDeleteToGroupIdErrorMap(futures); + } + + private CompletableFuture<AbstractMap.SimpleEntry<String, DeleteShareGroupStateResult>> deleteShareGroup( + String groupId, + List<TopicData<PartitionIdData>> topicData + ) { + return persister.deleteState( + new DeleteShareGroupStateParameters.Builder() Review Comment: nit: Indentation seems to be off here too. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -6280,6 +6281,29 @@ public void createGroupTombstoneRecords( group.createGroupTombstoneRecords(records); } + /** + * Returns all share partitions keys as a map from the input list of share groups. + * @param shareGroups - A list representing share groups. + * @return Map representing the share partition keys for all the groups in the input. + */ + public Map<String, Map<Uuid, List<Integer>>> sharePartitionKeysMap(List<ShareGroup> shareGroups) { + Map<String, Map<Uuid, List<Integer>>> keyMap = new HashMap<>(); + if (metadataImage == null) { + return Map.of(); + } Review Comment: nit: metadata image is never null. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -822,21 +834,60 @@ public CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection }); groupsByTopicPartition.forEach((topicPartition, groupList) -> { - CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> future = - runtime.scheduleWriteOperation( - "delete-groups", - topicPartition, - Duration.ofMillis(config.offsetCommitTimeoutMs()), - coordinator -> coordinator.deleteGroups(context, groupList) - ).exceptionally(exception -> handleOperationException( - "delete-groups", - groupList, - exception, - (error, __) -> DeleteGroupsRequest.getErrorResultCollection(groupList, error), - log - )); - - futures.add(future); + CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> shareFuture = deleteShareGroups(topicPartition, groupList) + .thenCompose(groupErrMap -> { + DeleteGroupsResponseData.DeletableGroupResultCollection collection = new DeleteGroupsResponseData.DeletableGroupResultCollection(); + List<String> errGroupIds = new ArrayList<>(); + groupErrMap.forEach((groupId, error) -> { + if (error.code() != Errors.NONE.code()) { Review Comment: What kind of errors can we get here? Are they all expected/allowed by the DeleteGroups API? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -845,6 +896,113 @@ public CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection (accumulator, newResults) -> newResults.forEach(result -> accumulator.add(result.duplicate()))); } + private CompletableFuture<Map<String, Errors>> deleteShareGroups(TopicPartition topicPartition, List<String> groupList) { + // topicPartition refers to internal topic __consumer_offsets + return runtime.scheduleReadOperation( + "delete-share-groups", + topicPartition, + (coordinator, offset) -> coordinator.sharePartitions(groupList, offset) + ) + .thenCompose(this::performShareGroupsDeletion) + .exceptionally(exception -> handleOperationException( + "delete-share-groups", + groupList, + exception, + (error, __) -> { + Map<String, Errors> errors = new HashMap<>(); + groupList.forEach(group -> errors.put(group, error)); + return errors; + }, + log + )); + } + + private CompletableFuture<Map<String, Errors>> performShareGroupsDeletion( + Map<String, Map<Uuid, List<Integer>>> keys + ) { + List<CompletableFuture<AbstractMap.SimpleEntry<String, DeleteShareGroupStateResult>>> futures = new ArrayList<>(); + for (Map.Entry<String, Map<Uuid, List<Integer>>> groupEntry : keys.entrySet()) { + List<TopicData<PartitionIdData>> topicData = new ArrayList<>(); + for (Map.Entry<Uuid, List<Integer>> topicEntry : groupEntry.getValue().entrySet()) { + topicData.add( + new TopicData<>( + topicEntry.getKey(), + topicEntry.getValue().stream().map(PartitionFactory::newPartitionIdData).toList() + ) + ); + } + + futures.add(deleteShareGroup(groupEntry.getKey(), topicData)); + } + + return persisterDeleteToGroupIdErrorMap(futures); + } + + private CompletableFuture<AbstractMap.SimpleEntry<String, DeleteShareGroupStateResult>> deleteShareGroup( + String groupId, + List<TopicData<PartitionIdData>> topicData + ) { + return persister.deleteState( + new DeleteShareGroupStateParameters.Builder() + .setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionIdData>() + .setGroupId(groupId) + .setTopicsData(topicData) + .build() + ) + .build() + ) + .thenCompose(result -> CompletableFuture.completedFuture(new AbstractMap.SimpleEntry<>(groupId, result))) + .exceptionally(exception -> { + // In case the deleteState call fails, + // we should construct the appropriate response here + // so that the subsequent callbacks don't see runtime exceptions. + log.error("Unable to delete share group partition(s) - {}, {}", groupId, topicData); + List<TopicData<PartitionErrorData>> respTopicData = topicData.stream() + .map(reqTopicData -> new TopicData<>( + reqTopicData.topicId(), + reqTopicData.partitions().stream() + .map(reqPartData -> { + Errors err = Errors.forException(exception); + return PartitionFactory.newPartitionErrorData(reqPartData.partition(), err.code(), err.message()); + }) + .toList() + ) + ) + .toList(); + + return new AbstractMap.SimpleEntry<>(groupId, new DeleteShareGroupStateResult.Builder() + .setTopicsData(respTopicData) + .build() + ); + }); + } + + private CompletableFuture<Map<String, Errors>> persisterDeleteToGroupIdErrorMap( + List<CompletableFuture<AbstractMap.SimpleEntry<String, DeleteShareGroupStateResult>>> futures + ) { + return CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})) + .thenCompose(v -> { + Map<String, Errors> groupIds = new HashMap<>(); + for (CompletableFuture<AbstractMap.SimpleEntry<String, DeleteShareGroupStateResult>> future : futures) { + Map.Entry<String, DeleteShareGroupStateResult> entry = future.getNow(null); // safe as within allOff + groupIds.putIfAbsent(entry.getKey(), Errors.NONE); + for (TopicData<PartitionErrorData> topicData : entry.getValue().topicsData()) { + Optional<PartitionErrorData> errItem = topicData.partitions().stream() + .filter(errData -> errData.errorCode() != Errors.NONE.code()) + .findAny(); + + errItem.ifPresent(val -> { + log.error("Received error while deleting share group {} - {}", entry.getKey(), val); + groupIds.put(entry.getKey(), Errors.forCode(val.errorCode())); + } + ); Review Comment: nit: Format is incorrect here too. ``` errItem.ifPresent(val -> { log.error("Received error while deleting share group {} - {}", entry.getKey(), val); groupIds.put(entry.getKey(), Errors.forCode(val.errorCode())); }); ``` ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -822,21 +834,60 @@ public CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection }); groupsByTopicPartition.forEach((topicPartition, groupList) -> { - CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> future = - runtime.scheduleWriteOperation( - "delete-groups", - topicPartition, - Duration.ofMillis(config.offsetCommitTimeoutMs()), - coordinator -> coordinator.deleteGroups(context, groupList) - ).exceptionally(exception -> handleOperationException( - "delete-groups", - groupList, - exception, - (error, __) -> DeleteGroupsRequest.getErrorResultCollection(groupList, error), - log - )); - - futures.add(future); + CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> shareFuture = deleteShareGroups(topicPartition, groupList) + .thenCompose(groupErrMap -> { + DeleteGroupsResponseData.DeletableGroupResultCollection collection = new DeleteGroupsResponseData.DeletableGroupResultCollection(); + List<String> errGroupIds = new ArrayList<>(); + groupErrMap.forEach((groupId, error) -> { + if (error.code() != Errors.NONE.code()) { + log.error("Error deleting share group {} due to error {}", groupId, error); + errGroupIds.add(groupId); + collection.add( + new DeleteGroupsResponseData.DeletableGroupResult() + .setGroupId(groupId) + .setErrorCode(error.code()) + ); + } + }); + + Set<String> groupSet = new HashSet<>(groupList); + // Remove all share group ids which have errored out + // when deleting with persister. + errGroupIds.forEach(groupSet::remove); Review Comment: nit: removeAll? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -822,21 +834,60 @@ public CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection }); groupsByTopicPartition.forEach((topicPartition, groupList) -> { - CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> future = - runtime.scheduleWriteOperation( - "delete-groups", - topicPartition, - Duration.ofMillis(config.offsetCommitTimeoutMs()), - coordinator -> coordinator.deleteGroups(context, groupList) - ).exceptionally(exception -> handleOperationException( - "delete-groups", - groupList, - exception, - (error, __) -> DeleteGroupsRequest.getErrorResultCollection(groupList, error), - log - )); - - futures.add(future); + CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> shareFuture = deleteShareGroups(topicPartition, groupList) + .thenCompose(groupErrMap -> { + DeleteGroupsResponseData.DeletableGroupResultCollection collection = new DeleteGroupsResponseData.DeletableGroupResultCollection(); + List<String> errGroupIds = new ArrayList<>(); + groupErrMap.forEach((groupId, error) -> { + if (error.code() != Errors.NONE.code()) { + log.error("Error deleting share group {} due to error {}", groupId, error); + errGroupIds.add(groupId); + collection.add( + new DeleteGroupsResponseData.DeletableGroupResult() + .setGroupId(groupId) + .setErrorCode(error.code()) + ); + } + }); + + Set<String> groupSet = new HashSet<>(groupList); + // Remove all share group ids which have errored out + // when deleting with persister. + errGroupIds.forEach(groupSet::remove); + + // If no non-share groupIds or non-error share group ids present + // return. + if (groupSet.isEmpty()) { + return CompletableFuture.completedFuture(collection); + } + + // Let us invoke the standard procedure of any non-share + // groups or successfully deleted share groups remaining. + List<String> retainedGroupIds = groupSet.stream().toList(); + return runtime.scheduleWriteOperation( + "delete-groups", + topicPartition, + Duration.ofMillis(config.offsetCommitTimeoutMs()), + coordinator -> coordinator.deleteGroups(context, retainedGroupIds) + ).thenApply(deletedCollection -> { + deletedCollection.forEach(item -> collection.add(item.duplicate())); + return collection; + }) + .exceptionally(exception -> handleOperationException( + "delete-groups", + groupList, + exception, + (error, __) -> { + DeleteGroupsRequest.getErrorResultCollection(retainedGroupIds, error).forEach(item -> collection.add(item.duplicate())); + return collection; Review Comment: I don't like the fact that we add to collection in two places (L873 and L881). This is error prone. Could we handle it at the end for both? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -822,21 +834,60 @@ public CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection }); groupsByTopicPartition.forEach((topicPartition, groupList) -> { - CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> future = - runtime.scheduleWriteOperation( - "delete-groups", - topicPartition, - Duration.ofMillis(config.offsetCommitTimeoutMs()), - coordinator -> coordinator.deleteGroups(context, groupList) - ).exceptionally(exception -> handleOperationException( - "delete-groups", - groupList, - exception, - (error, __) -> DeleteGroupsRequest.getErrorResultCollection(groupList, error), - log - )); - - futures.add(future); + CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> shareFuture = deleteShareGroups(topicPartition, groupList) + .thenCompose(groupErrMap -> { + DeleteGroupsResponseData.DeletableGroupResultCollection collection = new DeleteGroupsResponseData.DeletableGroupResultCollection(); + List<String> errGroupIds = new ArrayList<>(); + groupErrMap.forEach((groupId, error) -> { + if (error.code() != Errors.NONE.code()) { + log.error("Error deleting share group {} due to error {}", groupId, error); + errGroupIds.add(groupId); + collection.add( + new DeleteGroupsResponseData.DeletableGroupResult() + .setGroupId(groupId) + .setErrorCode(error.code()) + ); + } + }); + + Set<String> groupSet = new HashSet<>(groupList); + // Remove all share group ids which have errored out + // when deleting with persister. + errGroupIds.forEach(groupSet::remove); + + // If no non-share groupIds or non-error share group ids present + // return. + if (groupSet.isEmpty()) { + return CompletableFuture.completedFuture(collection); + } + + // Let us invoke the standard procedure of any non-share + // groups or successfully deleted share groups remaining. + List<String> retainedGroupIds = groupSet.stream().toList(); + return runtime.scheduleWriteOperation( + "delete-groups", Review Comment: nit: Indentation is incorrect here too. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -16294,6 +16299,76 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) ); } + @Test + public void testSharePartitionKeyMap() { + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap())); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) + .build(); + + MetadataImage image = mock(MetadataImage.class); + TopicsImage topicsImage = mock(TopicsImage.class); + TopicImage t1image = mock(TopicImage.class); + TopicImage t2image = mock(TopicImage.class); + when(topicsImage.getTopic(anyString())) + .thenReturn(t1image) + .thenReturn(t2image); + + ShareGroup shareGroup = mock(ShareGroup.class); + when(shareGroup.subscribedTopicNames()) + .thenReturn(Map.of( + "t1", mock(SubscriptionCount.class), + "t2", mock(SubscriptionCount.class) + ) + ); Review Comment: nit: I would format it as follow: ``` when(shareGroup.subscribedTopicNames()).thenReturn(Map.of( "t1", mock(SubscriptionCount.class), "t2", mock(SubscriptionCount.class) )); ``` ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ########## @@ -1627,6 +1637,374 @@ public void testDeleteGroups() throws Exception { assertEquals(expectedResultCollection, future.get()); } + @Test + public void testDeleteWithShareGroups() throws Exception { + CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); + Persister persister = mock(Persister.class); + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() + .setConfig(createConfig()) + .setRuntime(runtime) + .setMetrics(mock(GroupCoordinatorMetrics.class)) + .setPersister(persister) + .build(); + service.startup(() -> 3); + + DeleteGroupsResponseData.DeletableGroupResultCollection resultCollection1 = + new DeleteGroupsResponseData.DeletableGroupResultCollection(); + // share group + DeleteGroupsResponseData.DeletableGroupResult result1 = new DeleteGroupsResponseData.DeletableGroupResult() + .setGroupId("share-group-id-1"); + resultCollection1.add(result1); + + DeleteGroupsResponseData.DeletableGroupResultCollection resultCollection2 = + new DeleteGroupsResponseData.DeletableGroupResultCollection(); + // non-share group + DeleteGroupsResponseData.DeletableGroupResult result2 = new DeleteGroupsResponseData.DeletableGroupResult() + .setGroupId("group-id-2"); + resultCollection2.add(result2); + + // null + DeleteGroupsResponseData.DeletableGroupResult result3 = new DeleteGroupsResponseData.DeletableGroupResult() + .setGroupId(null) + .setErrorCode(Errors.INVALID_GROUP_ID.code()); + + DeleteGroupsResponseData.DeletableGroupResultCollection expectedResultCollection = + new DeleteGroupsResponseData.DeletableGroupResultCollection(); + expectedResultCollection.addAll(List.of( + result3.duplicate(), + result2.duplicate(), + result1.duplicate() + ) + ); Review Comment: nit: ``` expectedResultCollection.addAll(List.of( result3.duplicate(), result2.duplicate(), result1.duplicate() )); ``` ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -16294,6 +16299,76 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) ); } + @Test + public void testSharePartitionKeyMap() { + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap())); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) + .build(); + + MetadataImage image = mock(MetadataImage.class); + TopicsImage topicsImage = mock(TopicsImage.class); + TopicImage t1image = mock(TopicImage.class); + TopicImage t2image = mock(TopicImage.class); + when(topicsImage.getTopic(anyString())) + .thenReturn(t1image) + .thenReturn(t2image); Review Comment: It may be easier to just build the MetadataImage that you need rather than mocking it. We have `MetadataImageBuilder` which is pretty handy for it. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -845,6 +896,113 @@ public CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection (accumulator, newResults) -> newResults.forEach(result -> accumulator.add(result.duplicate()))); } + private CompletableFuture<Map<String, Errors>> deleteShareGroups(TopicPartition topicPartition, List<String> groupList) { + // topicPartition refers to internal topic __consumer_offsets + return runtime.scheduleReadOperation( + "delete-share-groups", + topicPartition, + (coordinator, offset) -> coordinator.sharePartitions(groupList, offset) + ) + .thenCompose(this::performShareGroupsDeletion) + .exceptionally(exception -> handleOperationException( + "delete-share-groups", + groupList, + exception, + (error, __) -> { + Map<String, Errors> errors = new HashMap<>(); + groupList.forEach(group -> errors.put(group, error)); + return errors; + }, + log + )); + } + + private CompletableFuture<Map<String, Errors>> performShareGroupsDeletion( + Map<String, Map<Uuid, List<Integer>>> keys + ) { + List<CompletableFuture<AbstractMap.SimpleEntry<String, DeleteShareGroupStateResult>>> futures = new ArrayList<>(); + for (Map.Entry<String, Map<Uuid, List<Integer>>> groupEntry : keys.entrySet()) { + List<TopicData<PartitionIdData>> topicData = new ArrayList<>(); + for (Map.Entry<Uuid, List<Integer>> topicEntry : groupEntry.getValue().entrySet()) { + topicData.add( + new TopicData<>( + topicEntry.getKey(), + topicEntry.getValue().stream().map(PartitionFactory::newPartitionIdData).toList() + ) + ); + } + + futures.add(deleteShareGroup(groupEntry.getKey(), topicData)); + } + + return persisterDeleteToGroupIdErrorMap(futures); + } + + private CompletableFuture<AbstractMap.SimpleEntry<String, DeleteShareGroupStateResult>> deleteShareGroup( + String groupId, + List<TopicData<PartitionIdData>> topicData + ) { + return persister.deleteState( + new DeleteShareGroupStateParameters.Builder() + .setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionIdData>() + .setGroupId(groupId) + .setTopicsData(topicData) + .build() + ) + .build() + ) + .thenCompose(result -> CompletableFuture.completedFuture(new AbstractMap.SimpleEntry<>(groupId, result))) + .exceptionally(exception -> { + // In case the deleteState call fails, + // we should construct the appropriate response here + // so that the subsequent callbacks don't see runtime exceptions. + log.error("Unable to delete share group partition(s) - {}, {}", groupId, topicData); + List<TopicData<PartitionErrorData>> respTopicData = topicData.stream() + .map(reqTopicData -> new TopicData<>( + reqTopicData.topicId(), + reqTopicData.partitions().stream() + .map(reqPartData -> { + Errors err = Errors.forException(exception); + return PartitionFactory.newPartitionErrorData(reqPartData.partition(), err.code(), err.message()); + }) + .toList() + ) + ) + .toList(); + + return new AbstractMap.SimpleEntry<>(groupId, new DeleteShareGroupStateResult.Builder() + .setTopicsData(respTopicData) + .build() + ); + }); + } + + private CompletableFuture<Map<String, Errors>> persisterDeleteToGroupIdErrorMap( + List<CompletableFuture<AbstractMap.SimpleEntry<String, DeleteShareGroupStateResult>>> futures + ) { + return CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})) + .thenCompose(v -> { Review Comment: nit: We would usually format it as follow: ``` return CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})).thenCompose(v -> { ... }); ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -16294,6 +16299,76 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) ); } + @Test + public void testSharePartitionKeyMap() { + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap())); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) + .build(); + + MetadataImage image = mock(MetadataImage.class); + TopicsImage topicsImage = mock(TopicsImage.class); + TopicImage t1image = mock(TopicImage.class); + TopicImage t2image = mock(TopicImage.class); + when(topicsImage.getTopic(anyString())) + .thenReturn(t1image) + .thenReturn(t2image); + + ShareGroup shareGroup = mock(ShareGroup.class); + when(shareGroup.subscribedTopicNames()) + .thenReturn(Map.of( + "t1", mock(SubscriptionCount.class), + "t2", mock(SubscriptionCount.class) + ) + ); + + when(shareGroup.groupId()) + .thenReturn("share-group"); + when(image.topics()) + .thenReturn(topicsImage); + when(image.provenance()) + .thenReturn(new MetadataProvenance(-1, -1, -1, true)); + + when(t1image.partitions()) + .thenReturn( + Map.of( + 0, mock(PartitionRegistration.class), + 1, mock(PartitionRegistration.class) + ) + ); + Uuid t1Uuid = Uuid.randomUuid(); + when(t1image.id()).thenReturn(t1Uuid); + + when(t2image.partitions()) + .thenReturn( + Map.of( + 0, mock(PartitionRegistration.class), + 1, mock(PartitionRegistration.class) + ) + ); + Uuid t2Uuid = Uuid.randomUuid(); + when(t2image.id()).thenReturn(t2Uuid); + + context.groupMetadataManager.onNewMetadataImage(image, mock(MetadataDelta.class)); + Map<String, Map<Uuid, List<Integer>>> keyMap = context.groupMetadataManager.sharePartitionKeysMap(List.of(shareGroup)); + assertEquals(1, keyMap.size()); + assertEquals(2, keyMap.get("share-group").size()); + for (Uuid topic : List.of(t1Uuid, t2Uuid)) { + assertEquals(2, keyMap.get("share-group").get(topic).size()); + assertTrue(keyMap.get("share-group").get(topic).contains(0)); + assertTrue(keyMap.get("share-group").get(topic).contains(1)); + } Review Comment: We usually prefer to build the expected map and to use assertEquals(expectedMap, actualMap). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org