AndrewJSchofield commented on code in PR #19026: URL: https://github.com/apache/kafka/pull/19026#discussion_r1977260665
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -454,6 +466,77 @@ public CompletableFuture<ShareGroupHeartbeatResponseData> shareGroupHeartbeat( )); } + // Visibility for testing + CompletableFuture<ShareGroupHeartbeatResponseData> persisterInitialize( + InitializeShareGroupStateParameters request, + ShareGroupHeartbeatResponseData defaultResponse + ) { + return persister.initializeState(request) + .thenCompose( + response -> handlePersisterInitializeResponse(request.groupTopicPartitionData().groupId(), response, defaultResponse) + ).exceptionally(exception -> { + GroupTopicPartitionData<PartitionStateData> gtp = request.groupTopicPartitionData(); + log.error("Unable to initialize share group state {}, {}", gtp.groupId(), gtp.topicsData(), exception); + return new ShareGroupHeartbeatResponseData() + .setErrorCode(Errors.forException(exception).code()); + }); + } + + private CompletableFuture<ShareGroupHeartbeatResponseData> handlePersisterInitializeResponse( + String groupId, + InitializeShareGroupStateResult persisterInitializeResult, + ShareGroupHeartbeatResponseData defaultResponse + ) { + + short persisterErrorCode = Errors.NONE.code(); + for (TopicData<PartitionErrorData> topicData : persisterInitializeResult.topicsData()) { + Optional<PartitionErrorData> errData = topicData.partitions().stream().filter(partition -> partition.errorCode() != Errors.NONE.code()).findAny(); + if (errData.isPresent()) { + persisterErrorCode = errData.get().errorCode(); + break; + } + } + + if (persisterErrorCode == Errors.NONE.code()) { + Map<Uuid, Set<Integer>> topicPartitionMap = new HashMap<>(); + for (TopicData<PartitionErrorData> topicData : persisterInitializeResult.topicsData()) { + topicPartitionMap.put( + topicData.topicId(), + topicData.partitions().stream().map(PartitionErrorData::partition).collect(Collectors.toSet()) + ); + } + if (topicPartitionMap.isEmpty()) { + return CompletableFuture.completedFuture(defaultResponse); + } + return performShareGroupStateMetadataInitialize(groupId, topicPartitionMap, defaultResponse); + } else { + log.error("Received error while calling initialize state for {} on persister {}.", groupId, persisterErrorCode); + return CompletableFuture.completedFuture( + new ShareGroupHeartbeatResponseData() + .setErrorCode(persisterErrorCode) + ); Review Comment: Error message? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -454,6 +466,77 @@ public CompletableFuture<ShareGroupHeartbeatResponseData> shareGroupHeartbeat( )); } + // Visibility for testing + CompletableFuture<ShareGroupHeartbeatResponseData> persisterInitialize( + InitializeShareGroupStateParameters request, + ShareGroupHeartbeatResponseData defaultResponse + ) { + return persister.initializeState(request) + .thenCompose( + response -> handlePersisterInitializeResponse(request.groupTopicPartitionData().groupId(), response, defaultResponse) + ).exceptionally(exception -> { + GroupTopicPartitionData<PartitionStateData> gtp = request.groupTopicPartitionData(); + log.error("Unable to initialize share group state {}, {}", gtp.groupId(), gtp.topicsData(), exception); + return new ShareGroupHeartbeatResponseData() + .setErrorCode(Errors.forException(exception).code()); + }); + } + + private CompletableFuture<ShareGroupHeartbeatResponseData> handlePersisterInitializeResponse( + String groupId, + InitializeShareGroupStateResult persisterInitializeResult, + ShareGroupHeartbeatResponseData defaultResponse + ) { + Review Comment: nit: Unnecessary blank line ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2322,11 +2345,107 @@ private CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> sh // (subscribedTopicNames) to detect a full request as those must be set in a full request. // 2. The member's assignment has been updated. boolean isFullRequest = subscribedTopicNames != null; + List<String> initializeCandidateTopics = List.of(); if (memberEpoch == 0 || isFullRequest || hasAssignedPartitionsChanged(member, updatedMember)) { response.setAssignment(createShareGroupResponseAssignment(updatedMember)); + initializeCandidateTopics = (subscribedTopicNames == null || subscribedTopicNames.isEmpty()) ? + group.subscribedTopicNames().keySet().stream().toList() : subscribedTopicNames; } - return new CoordinatorResult<>(records, response); + return new CoordinatorResult<>( + records, + Map.entry( + response, + maybeCreateInitializeShareGroupStateRequest(group, initializeCandidateTopics) + ) + ); + } + + private Map<Uuid, Map.Entry<String, List<Integer>>> subscribedTopicsChangeMap(ShareGroup group, List<String> subscribedTopicNames) { Review Comment: javadoc method comment please ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -454,6 +466,77 @@ public CompletableFuture<ShareGroupHeartbeatResponseData> shareGroupHeartbeat( )); } + // Visibility for testing + CompletableFuture<ShareGroupHeartbeatResponseData> persisterInitialize( + InitializeShareGroupStateParameters request, + ShareGroupHeartbeatResponseData defaultResponse + ) { + return persister.initializeState(request) + .thenCompose( + response -> handlePersisterInitializeResponse(request.groupTopicPartitionData().groupId(), response, defaultResponse) + ).exceptionally(exception -> { + GroupTopicPartitionData<PartitionStateData> gtp = request.groupTopicPartitionData(); + log.error("Unable to initialize share group state {}, {}", gtp.groupId(), gtp.topicsData(), exception); + return new ShareGroupHeartbeatResponseData() + .setErrorCode(Errors.forException(exception).code()); + }); + } + + private CompletableFuture<ShareGroupHeartbeatResponseData> handlePersisterInitializeResponse( + String groupId, + InitializeShareGroupStateResult persisterInitializeResult, + ShareGroupHeartbeatResponseData defaultResponse + ) { + + short persisterErrorCode = Errors.NONE.code(); + for (TopicData<PartitionErrorData> topicData : persisterInitializeResult.topicsData()) { + Optional<PartitionErrorData> errData = topicData.partitions().stream().filter(partition -> partition.errorCode() != Errors.NONE.code()).findAny(); + if (errData.isPresent()) { + persisterErrorCode = errData.get().errorCode(); + break; + } + } + + if (persisterErrorCode == Errors.NONE.code()) { + Map<Uuid, Set<Integer>> topicPartitionMap = new HashMap<>(); + for (TopicData<PartitionErrorData> topicData : persisterInitializeResult.topicsData()) { + topicPartitionMap.put( + topicData.topicId(), + topicData.partitions().stream().map(PartitionErrorData::partition).collect(Collectors.toSet()) + ); + } + if (topicPartitionMap.isEmpty()) { + return CompletableFuture.completedFuture(defaultResponse); + } + return performShareGroupStateMetadataInitialize(groupId, topicPartitionMap, defaultResponse); + } else { + log.error("Received error while calling initialize state for {} on persister {}.", groupId, persisterErrorCode); + return CompletableFuture.completedFuture( + new ShareGroupHeartbeatResponseData() + .setErrorCode(persisterErrorCode) + ); + } + } + + private CompletableFuture<ShareGroupHeartbeatResponseData> performShareGroupStateMetadataInitialize( + String groupId, + Map<Uuid, Set<Integer>> topicPartitionMap, + ShareGroupHeartbeatResponseData defaultResponse + ) { + return runtime.scheduleWriteOperation( + "initialize-share-group-state", + topicPartitionFor(groupId), + Duration.ofMillis(config.offsetCommitTimeoutMs()), + coordinator -> coordinator.initializeShareGroupState(groupId, topicPartitionMap) + ).thenApply( + __ -> defaultResponse + ).exceptionally(exception -> { + log.error("Unable to initialize share group state partition metadata for {}.", groupId, exception); + return new ShareGroupHeartbeatResponseData() + .setErrorCode(Errors.forException(exception).code()); + }); Review Comment: And error message here too. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -454,6 +466,77 @@ public CompletableFuture<ShareGroupHeartbeatResponseData> shareGroupHeartbeat( )); } + // Visibility for testing + CompletableFuture<ShareGroupHeartbeatResponseData> persisterInitialize( + InitializeShareGroupStateParameters request, + ShareGroupHeartbeatResponseData defaultResponse + ) { + return persister.initializeState(request) + .thenCompose( + response -> handlePersisterInitializeResponse(request.groupTopicPartitionData().groupId(), response, defaultResponse) + ).exceptionally(exception -> { + GroupTopicPartitionData<PartitionStateData> gtp = request.groupTopicPartitionData(); + log.error("Unable to initialize share group state {}, {}", gtp.groupId(), gtp.topicsData(), exception); + return new ShareGroupHeartbeatResponseData() + .setErrorCode(Errors.forException(exception).code()); + }); Review Comment: Error message too? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -443,7 +448,14 @@ public CompletableFuture<ShareGroupHeartbeatResponseData> shareGroupHeartbeat( topicPartitionFor(request.groupId()), Duration.ofMillis(config.offsetCommitTimeoutMs()), coordinator -> coordinator.shareGroupHeartbeat(context, request) - ).exceptionally(exception -> handleOperationException( + ).thenCompose(result -> { + // This ensures that the previous group write has completed successfully + // before we start the persister initialize phase. + if (result.getValue().isPresent()) { + return persisterInitialize(result.getValue().get(), result.getKey()); Review Comment: My thought when writing the KIP was that, if `ShareGroupHeartbeat` needed to ask the persister to initialize one or more topic-partitions, it would schedule this work with the persister and simply 😄 omit these pending topic-partitions from the response to `ShareGroupHeartbeat`. The response should not be delayed until the persister has completed. The call to the persister involves one or more inter-broker RPCs, which could potentially need to be retried due to a cooordinator restart or load delay, so I don't think it's appropriate to do the initialization directly as part of the heartbeat processing. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2322,11 +2345,107 @@ private CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> sh // (subscribedTopicNames) to detect a full request as those must be set in a full request. // 2. The member's assignment has been updated. boolean isFullRequest = subscribedTopicNames != null; + List<String> initializeCandidateTopics = List.of(); if (memberEpoch == 0 || isFullRequest || hasAssignedPartitionsChanged(member, updatedMember)) { response.setAssignment(createShareGroupResponseAssignment(updatedMember)); + initializeCandidateTopics = (subscribedTopicNames == null || subscribedTopicNames.isEmpty()) ? + group.subscribedTopicNames().keySet().stream().toList() : subscribedTopicNames; } - return new CoordinatorResult<>(records, response); + return new CoordinatorResult<>( + records, + Map.entry( + response, + maybeCreateInitializeShareGroupStateRequest(group, initializeCandidateTopics) + ) + ); + } + + private Map<Uuid, Map.Entry<String, List<Integer>>> subscribedTopicsChangeMap(ShareGroup group, List<String> subscribedTopicNames) { + Map<Uuid, Map.Entry<String, List<Integer>>> topicPartitionChangeMap = new HashMap<>(); + Set<TopicImage> newImages = new HashSet<>(); + + TopicsImage topicsImage = metadataImage.topics(); + if (topicsImage == null || topicsImage.isEmpty()) { + return Map.of(); + } + + for (String topicName : subscribedTopicNames) { + if (topicsImage.getTopic(topicName) == null) { + // error? + continue; + } + newImages.add(topicsImage.getTopic(topicName)); + } + + if (shareGroupPartitionMetadata.containsKey(group.groupId())) { + Map<Uuid, Set<Integer>> alreadyInitialized = shareGroupPartitionMetadata.get(group.groupId()).initializedTopics(); + for (TopicImage newImage : newImages) { + int newImageNumPartitions = newImage.partitions().size(); + + if (alreadyInitialized.containsKey(newImage.id())) { + // Check partition change + int existingNumPartitions = alreadyInitialized.get(newImage.id()).size(); + + // Partitions have increased (will only increase as kafka does not allow reduction). + if (newImageNumPartitions != existingNumPartitions) { + topicPartitionChangeMap.put( + newImage.id(), + Map.entry( + newImage.name(), + IntStream.range(existingNumPartitions, newImageNumPartitions).boxed().toList() + ) + ); + } + } else { + topicPartitionChangeMap.put( + newImage.id(), + Map.entry( + newImage.name(), + IntStream.range(0, newImageNumPartitions).boxed().toList() + ) + ); + } + } + } else { + for (TopicImage newImage : newImages) { + topicPartitionChangeMap.put( + newImage.id(), + Map.entry( + newImage.name(), + IntStream.range(0, newImage.partitions().size()).boxed().toList() + ) + ); + } + } + return topicPartitionChangeMap; + } + + private Optional<InitializeShareGroupStateParameters> maybeCreateInitializeShareGroupStateRequest( Review Comment: javadoc method comment please. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org