AndrewJSchofield commented on code in PR #18968: URL: https://github.com/apache/kafka/pull/18968#discussion_r1965580105
########## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java: ########## @@ -317,16 +319,12 @@ public CoordinatorResult<WriteShareGroupStateResponseData, CoordinatorRecord> wr CoordinatorRecord record = generateShareStateRecord(partitionData, key); // build successful response if record is correctly created - WriteShareGroupStateResponseData responseData = new WriteShareGroupStateResponseData() - .setResults( - Collections.singletonList( - WriteShareGroupStateResponse.toResponseWriteStateResult(key.topicId(), - Collections.singletonList( - WriteShareGroupStateResponse.toResponsePartitionResult( - key.partition() - )) - )) - ); + WriteShareGroupStateResponseData responseData = new WriteShareGroupStateResponseData().setResults( Review Comment: I think the previous style of ``` new WriteShareGroupStateResponse() .setResults( ``` was more in line with the rest of the code you've written. ########## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java: ########## @@ -808,6 +811,106 @@ public CompletableFuture<DeleteShareGroupStateResponseData> deleteState(RequestC }); } + @Override + public CompletableFuture<InitializeShareGroupStateResponseData> initializeState(RequestContext context, InitializeShareGroupStateRequestData request) { + // Send an empty response if the coordinator is not active. + if (!isActive.get()) { + return CompletableFuture.completedFuture( + generateErrorInitStateResponse( + request, + Errors.COORDINATOR_NOT_AVAILABLE, + "Share coordinator is not available." + ) + ); + } + + String groupId = request.groupId(); + // Send an empty response if groupId is invalid. + if (isGroupIdEmpty(groupId)) { + log.error("Group id must be specified and non-empty: {}", request); + return CompletableFuture.completedFuture( + new InitializeShareGroupStateResponseData() + ); + } + + // Send an empty response if topic data is empty. + if (isEmpty(request.topics())) { + log.error("Topic Data is empty: {}", request); + return CompletableFuture.completedFuture( + new InitializeShareGroupStateResponseData() + ); + } + + // A map to store the futures for each topicId and partition. + Map<Uuid, Map<Integer, CompletableFuture<InitializeShareGroupStateResponseData>>> futureMap = new HashMap<>(); + + // The request received here could have multiple keys of structure group:topic:partition. However, + // the initializeState method in ShareCoordinatorShard expects a single key in the request. Hence, we will + // be looping over the keys below and constructing new InitializeShareGroupStateRequestData objects to pass + // onto the shard method. + + for (InitializeShareGroupStateRequestData.InitializeStateData topicData : request.topics()) { + Uuid topicId = topicData.topicId(); + for (InitializeShareGroupStateRequestData.PartitionData partitionData : topicData.partitions()) { + SharePartitionKey coordinatorKey = SharePartitionKey.getInstance(request.groupId(), topicId, partitionData.partition()); + + InitializeShareGroupStateRequestData requestForCurrentPartition = new InitializeShareGroupStateRequestData() + .setGroupId(groupId) + .setTopics(List.of(new InitializeShareGroupStateRequestData.InitializeStateData() + .setTopicId(topicId) + .setPartitions(List.of(partitionData)))); + + CompletableFuture<InitializeShareGroupStateResponseData> initializeFuture = runtime.scheduleWriteOperation( + "initialize-share-group-state", + topicPartitionFor(coordinatorKey), + Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()), + coordinator -> coordinator.initializeState(requestForCurrentPartition) + ).exceptionally(deleteException -> Review Comment: nit: initializeException? ########## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java: ########## @@ -808,6 +811,106 @@ public CompletableFuture<DeleteShareGroupStateResponseData> deleteState(RequestC }); } + @Override + public CompletableFuture<InitializeShareGroupStateResponseData> initializeState(RequestContext context, InitializeShareGroupStateRequestData request) { + // Send an empty response if the coordinator is not active. + if (!isActive.get()) { + return CompletableFuture.completedFuture( + generateErrorInitStateResponse( + request, + Errors.COORDINATOR_NOT_AVAILABLE, + "Share coordinator is not available." + ) + ); + } + + String groupId = request.groupId(); + // Send an empty response if groupId is invalid. + if (isGroupIdEmpty(groupId)) { + log.error("Group id must be specified and non-empty: {}", request); + return CompletableFuture.completedFuture( + new InitializeShareGroupStateResponseData() + ); + } + + // Send an empty response if topic data is empty. + if (isEmpty(request.topics())) { + log.error("Topic Data is empty: {}", request); + return CompletableFuture.completedFuture( + new InitializeShareGroupStateResponseData() + ); + } + + // A map to store the futures for each topicId and partition. + Map<Uuid, Map<Integer, CompletableFuture<InitializeShareGroupStateResponseData>>> futureMap = new HashMap<>(); + + // The request received here could have multiple keys of structure group:topic:partition. However, + // the initializeState method in ShareCoordinatorShard expects a single key in the request. Hence, we will + // be looping over the keys below and constructing new InitializeShareGroupStateRequestData objects to pass + // onto the shard method. + + for (InitializeShareGroupStateRequestData.InitializeStateData topicData : request.topics()) { + Uuid topicId = topicData.topicId(); + for (InitializeShareGroupStateRequestData.PartitionData partitionData : topicData.partitions()) { + SharePartitionKey coordinatorKey = SharePartitionKey.getInstance(request.groupId(), topicId, partitionData.partition()); + + InitializeShareGroupStateRequestData requestForCurrentPartition = new InitializeShareGroupStateRequestData() + .setGroupId(groupId) + .setTopics(List.of(new InitializeShareGroupStateRequestData.InitializeStateData() + .setTopicId(topicId) + .setPartitions(List.of(partitionData)))); + + CompletableFuture<InitializeShareGroupStateResponseData> initializeFuture = runtime.scheduleWriteOperation( + "initialize-share-group-state", + topicPartitionFor(coordinatorKey), + Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()), + coordinator -> coordinator.initializeState(requestForCurrentPartition) + ).exceptionally(deleteException -> + handleOperationException( + "initialize-share-group-state", + request, + deleteException, + (error, message) -> InitializeShareGroupStateResponse.toErrorResponseData( + topicData.topicId(), + partitionData.partition(), + error, + "Unable to initialize share group state: " + deleteException.getMessage() + ), + log + )); + + futureMap.computeIfAbsent(topicId, k -> new HashMap<>()) + .put(partitionData.partition(), initializeFuture); + } + } + + // Combine all futures into a single CompletableFuture<Void>. + CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(futureMap.values().stream() + .flatMap(map -> map.values().stream()).toArray(CompletableFuture[]::new)); + + // Transform the combined CompletableFuture<Void> into CompletableFuture<DeleteShareGroupStateResponseData>. + return combinedFuture.thenApply(v -> { + List<InitializeShareGroupStateResponseData.InitializeStateResult> initializeStateResult = new ArrayList<>(futureMap.size()); + futureMap.forEach( + (topicId, topicEntry) -> { + List<InitializeShareGroupStateResponseData.PartitionResult> partitionResults = new ArrayList<>(topicEntry.size()); + topicEntry.forEach( + (partitionId, responseFuture) -> { + // ResponseFut would already be completed by now since we have used Review Comment: The variable name was different in the code you copied here :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org