AndrewJSchofield commented on code in PR #19026:
URL: https://github.com/apache/kafka/pull/19026#discussion_r1977260665


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -454,6 +466,77 @@ public CompletableFuture<ShareGroupHeartbeatResponseData> 
shareGroupHeartbeat(
         ));
     }
 
+    // Visibility for testing
+    CompletableFuture<ShareGroupHeartbeatResponseData> persisterInitialize(
+        InitializeShareGroupStateParameters request,
+        ShareGroupHeartbeatResponseData defaultResponse
+    ) {
+        return persister.initializeState(request)
+            .thenCompose(
+                response -> 
handlePersisterInitializeResponse(request.groupTopicPartitionData().groupId(), 
response, defaultResponse)
+            ).exceptionally(exception -> {
+                GroupTopicPartitionData<PartitionStateData> gtp = 
request.groupTopicPartitionData();
+                log.error("Unable to initialize share group state {}, {}", 
gtp.groupId(), gtp.topicsData(), exception);
+                return new ShareGroupHeartbeatResponseData()
+                    .setErrorCode(Errors.forException(exception).code());
+            });
+    }
+
+    private CompletableFuture<ShareGroupHeartbeatResponseData> 
handlePersisterInitializeResponse(
+        String groupId,
+        InitializeShareGroupStateResult persisterInitializeResult,
+        ShareGroupHeartbeatResponseData defaultResponse
+    ) {
+
+        short persisterErrorCode = Errors.NONE.code();
+        for (TopicData<PartitionErrorData> topicData : 
persisterInitializeResult.topicsData()) {
+            Optional<PartitionErrorData> errData = 
topicData.partitions().stream().filter(partition -> partition.errorCode() != 
Errors.NONE.code()).findAny();
+            if (errData.isPresent()) {
+                persisterErrorCode = errData.get().errorCode();
+                break;
+            }
+        }
+
+        if (persisterErrorCode == Errors.NONE.code()) {
+            Map<Uuid, Set<Integer>> topicPartitionMap = new HashMap<>();
+            for (TopicData<PartitionErrorData> topicData : 
persisterInitializeResult.topicsData()) {
+                topicPartitionMap.put(
+                    topicData.topicId(),
+                    
topicData.partitions().stream().map(PartitionErrorData::partition).collect(Collectors.toSet())
+                );
+            }
+            if (topicPartitionMap.isEmpty()) {
+                return CompletableFuture.completedFuture(defaultResponse);
+            }
+            return performShareGroupStateMetadataInitialize(groupId, 
topicPartitionMap, defaultResponse);
+        } else {
+            log.error("Received error while calling initialize state for {} on 
persister {}.", groupId, persisterErrorCode);
+            return CompletableFuture.completedFuture(
+                new ShareGroupHeartbeatResponseData()
+                    .setErrorCode(persisterErrorCode)
+            );

Review Comment:
   Error message?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -454,6 +466,77 @@ public CompletableFuture<ShareGroupHeartbeatResponseData> 
shareGroupHeartbeat(
         ));
     }
 
+    // Visibility for testing
+    CompletableFuture<ShareGroupHeartbeatResponseData> persisterInitialize(
+        InitializeShareGroupStateParameters request,
+        ShareGroupHeartbeatResponseData defaultResponse
+    ) {
+        return persister.initializeState(request)
+            .thenCompose(
+                response -> 
handlePersisterInitializeResponse(request.groupTopicPartitionData().groupId(), 
response, defaultResponse)
+            ).exceptionally(exception -> {
+                GroupTopicPartitionData<PartitionStateData> gtp = 
request.groupTopicPartitionData();
+                log.error("Unable to initialize share group state {}, {}", 
gtp.groupId(), gtp.topicsData(), exception);
+                return new ShareGroupHeartbeatResponseData()
+                    .setErrorCode(Errors.forException(exception).code());
+            });
+    }
+
+    private CompletableFuture<ShareGroupHeartbeatResponseData> 
handlePersisterInitializeResponse(
+        String groupId,
+        InitializeShareGroupStateResult persisterInitializeResult,
+        ShareGroupHeartbeatResponseData defaultResponse
+    ) {
+

Review Comment:
   nit: Unnecessary blank line



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2322,11 +2345,107 @@ private 
CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> sh
         //    (subscribedTopicNames) to detect a full request as those must be 
set in a full request.
         // 2. The member's assignment has been updated.
         boolean isFullRequest = subscribedTopicNames != null;
+        List<String> initializeCandidateTopics = List.of();
         if (memberEpoch == 0 || isFullRequest || 
hasAssignedPartitionsChanged(member, updatedMember)) {
             
response.setAssignment(createShareGroupResponseAssignment(updatedMember));
+            initializeCandidateTopics = (subscribedTopicNames == null || 
subscribedTopicNames.isEmpty()) ?
+                group.subscribedTopicNames().keySet().stream().toList() : 
subscribedTopicNames;
         }
 
-        return new CoordinatorResult<>(records, response);
+        return new CoordinatorResult<>(
+            records,
+            Map.entry(
+                response,
+                maybeCreateInitializeShareGroupStateRequest(group, 
initializeCandidateTopics)
+            )
+        );
+    }
+
+    private Map<Uuid, Map.Entry<String, List<Integer>>> 
subscribedTopicsChangeMap(ShareGroup group, List<String> subscribedTopicNames) {

Review Comment:
   javadoc method comment please



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -454,6 +466,77 @@ public CompletableFuture<ShareGroupHeartbeatResponseData> 
shareGroupHeartbeat(
         ));
     }
 
+    // Visibility for testing
+    CompletableFuture<ShareGroupHeartbeatResponseData> persisterInitialize(
+        InitializeShareGroupStateParameters request,
+        ShareGroupHeartbeatResponseData defaultResponse
+    ) {
+        return persister.initializeState(request)
+            .thenCompose(
+                response -> 
handlePersisterInitializeResponse(request.groupTopicPartitionData().groupId(), 
response, defaultResponse)
+            ).exceptionally(exception -> {
+                GroupTopicPartitionData<PartitionStateData> gtp = 
request.groupTopicPartitionData();
+                log.error("Unable to initialize share group state {}, {}", 
gtp.groupId(), gtp.topicsData(), exception);
+                return new ShareGroupHeartbeatResponseData()
+                    .setErrorCode(Errors.forException(exception).code());
+            });
+    }
+
+    private CompletableFuture<ShareGroupHeartbeatResponseData> 
handlePersisterInitializeResponse(
+        String groupId,
+        InitializeShareGroupStateResult persisterInitializeResult,
+        ShareGroupHeartbeatResponseData defaultResponse
+    ) {
+
+        short persisterErrorCode = Errors.NONE.code();
+        for (TopicData<PartitionErrorData> topicData : 
persisterInitializeResult.topicsData()) {
+            Optional<PartitionErrorData> errData = 
topicData.partitions().stream().filter(partition -> partition.errorCode() != 
Errors.NONE.code()).findAny();
+            if (errData.isPresent()) {
+                persisterErrorCode = errData.get().errorCode();
+                break;
+            }
+        }
+
+        if (persisterErrorCode == Errors.NONE.code()) {
+            Map<Uuid, Set<Integer>> topicPartitionMap = new HashMap<>();
+            for (TopicData<PartitionErrorData> topicData : 
persisterInitializeResult.topicsData()) {
+                topicPartitionMap.put(
+                    topicData.topicId(),
+                    
topicData.partitions().stream().map(PartitionErrorData::partition).collect(Collectors.toSet())
+                );
+            }
+            if (topicPartitionMap.isEmpty()) {
+                return CompletableFuture.completedFuture(defaultResponse);
+            }
+            return performShareGroupStateMetadataInitialize(groupId, 
topicPartitionMap, defaultResponse);
+        } else {
+            log.error("Received error while calling initialize state for {} on 
persister {}.", groupId, persisterErrorCode);
+            return CompletableFuture.completedFuture(
+                new ShareGroupHeartbeatResponseData()
+                    .setErrorCode(persisterErrorCode)
+            );
+        }
+    }
+
+    private CompletableFuture<ShareGroupHeartbeatResponseData> 
performShareGroupStateMetadataInitialize(
+        String groupId,
+        Map<Uuid, Set<Integer>> topicPartitionMap,
+        ShareGroupHeartbeatResponseData defaultResponse
+    ) {
+        return runtime.scheduleWriteOperation(
+            "initialize-share-group-state",
+            topicPartitionFor(groupId),
+            Duration.ofMillis(config.offsetCommitTimeoutMs()),
+            coordinator -> coordinator.initializeShareGroupState(groupId, 
topicPartitionMap)
+        ).thenApply(
+            __ -> defaultResponse
+        ).exceptionally(exception -> {
+            log.error("Unable to initialize share group state partition 
metadata for {}.", groupId, exception);
+            return new ShareGroupHeartbeatResponseData()
+                .setErrorCode(Errors.forException(exception).code());
+        });

Review Comment:
   And error message here too.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -454,6 +466,77 @@ public CompletableFuture<ShareGroupHeartbeatResponseData> 
shareGroupHeartbeat(
         ));
     }
 
+    // Visibility for testing
+    CompletableFuture<ShareGroupHeartbeatResponseData> persisterInitialize(
+        InitializeShareGroupStateParameters request,
+        ShareGroupHeartbeatResponseData defaultResponse
+    ) {
+        return persister.initializeState(request)
+            .thenCompose(
+                response -> 
handlePersisterInitializeResponse(request.groupTopicPartitionData().groupId(), 
response, defaultResponse)
+            ).exceptionally(exception -> {
+                GroupTopicPartitionData<PartitionStateData> gtp = 
request.groupTopicPartitionData();
+                log.error("Unable to initialize share group state {}, {}", 
gtp.groupId(), gtp.topicsData(), exception);
+                return new ShareGroupHeartbeatResponseData()
+                    .setErrorCode(Errors.forException(exception).code());
+            });

Review Comment:
   Error message too?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -443,7 +448,14 @@ public CompletableFuture<ShareGroupHeartbeatResponseData> 
shareGroupHeartbeat(
             topicPartitionFor(request.groupId()),
             Duration.ofMillis(config.offsetCommitTimeoutMs()),
             coordinator -> coordinator.shareGroupHeartbeat(context, request)
-        ).exceptionally(exception -> handleOperationException(
+        ).thenCompose(result -> {
+            // This ensures that the previous group write has completed 
successfully
+            // before we start the persister initialize phase.
+            if (result.getValue().isPresent()) {
+                return persisterInitialize(result.getValue().get(), 
result.getKey());

Review Comment:
   My thought when writing the KIP was that, if `ShareGroupHeartbeat` needed to 
ask the persister to initialize one or more topic-partitions, it would schedule 
this work with the persister and simply 😄 omit these pending topic-partitions 
from the response to `ShareGroupHeartbeat`. The response should not be delayed 
until the persister has completed. The call to the persister involves one or 
more inter-broker RPCs, which could potentially need to be retried due to a 
cooordinator restart or load delay, so I don't think it's appropriate to do the 
initialization directly as part of the heartbeat processing.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2322,11 +2345,107 @@ private 
CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> sh
         //    (subscribedTopicNames) to detect a full request as those must be 
set in a full request.
         // 2. The member's assignment has been updated.
         boolean isFullRequest = subscribedTopicNames != null;
+        List<String> initializeCandidateTopics = List.of();
         if (memberEpoch == 0 || isFullRequest || 
hasAssignedPartitionsChanged(member, updatedMember)) {
             
response.setAssignment(createShareGroupResponseAssignment(updatedMember));
+            initializeCandidateTopics = (subscribedTopicNames == null || 
subscribedTopicNames.isEmpty()) ?
+                group.subscribedTopicNames().keySet().stream().toList() : 
subscribedTopicNames;
         }
 
-        return new CoordinatorResult<>(records, response);
+        return new CoordinatorResult<>(
+            records,
+            Map.entry(
+                response,
+                maybeCreateInitializeShareGroupStateRequest(group, 
initializeCandidateTopics)
+            )
+        );
+    }
+
+    private Map<Uuid, Map.Entry<String, List<Integer>>> 
subscribedTopicsChangeMap(ShareGroup group, List<String> subscribedTopicNames) {
+        Map<Uuid, Map.Entry<String, List<Integer>>> topicPartitionChangeMap = 
new HashMap<>();
+        Set<TopicImage> newImages = new HashSet<>();
+
+        TopicsImage topicsImage = metadataImage.topics();
+        if (topicsImage == null || topicsImage.isEmpty()) {
+            return Map.of();
+        }
+
+        for (String topicName : subscribedTopicNames) {
+            if (topicsImage.getTopic(topicName) == null) {
+                // error?
+                continue;
+            }
+            newImages.add(topicsImage.getTopic(topicName));
+        }
+
+        if (shareGroupPartitionMetadata.containsKey(group.groupId())) {
+            Map<Uuid, Set<Integer>> alreadyInitialized = 
shareGroupPartitionMetadata.get(group.groupId()).initializedTopics();
+            for (TopicImage newImage : newImages) {
+                int newImageNumPartitions = newImage.partitions().size();
+
+                if (alreadyInitialized.containsKey(newImage.id())) {
+                    // Check partition change
+                    int existingNumPartitions = 
alreadyInitialized.get(newImage.id()).size();
+
+                    // Partitions have increased (will only increase as kafka 
does not allow reduction).
+                    if (newImageNumPartitions != existingNumPartitions) {
+                        topicPartitionChangeMap.put(
+                            newImage.id(),
+                            Map.entry(
+                                newImage.name(),
+                                IntStream.range(existingNumPartitions, 
newImageNumPartitions).boxed().toList()
+                            )
+                        );
+                    }
+                } else {
+                    topicPartitionChangeMap.put(
+                        newImage.id(),
+                        Map.entry(
+                            newImage.name(),
+                            IntStream.range(0, 
newImageNumPartitions).boxed().toList()
+                        )
+                    );
+                }
+            }
+        } else {
+            for (TopicImage newImage : newImages) {
+                topicPartitionChangeMap.put(
+                    newImage.id(),
+                    Map.entry(
+                        newImage.name(),
+                        IntStream.range(0, 
newImage.partitions().size()).boxed().toList()
+                    )
+                );
+            }
+        }
+        return topicPartitionChangeMap;
+    }
+
+    private Optional<InitializeShareGroupStateParameters> 
maybeCreateInitializeShareGroupStateRequest(

Review Comment:
   javadoc method comment please.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to