AndrewJSchofield commented on code in PR #18968:
URL: https://github.com/apache/kafka/pull/18968#discussion_r1965580105


##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -317,16 +319,12 @@ public 
CoordinatorResult<WriteShareGroupStateResponseData, CoordinatorRecord> wr
 
         CoordinatorRecord record = generateShareStateRecord(partitionData, 
key);
         // build successful response if record is correctly created
-        WriteShareGroupStateResponseData responseData = new 
WriteShareGroupStateResponseData()
-            .setResults(
-                Collections.singletonList(
-                    
WriteShareGroupStateResponse.toResponseWriteStateResult(key.topicId(),
-                        Collections.singletonList(
-                            
WriteShareGroupStateResponse.toResponsePartitionResult(
-                                key.partition()
-                            ))
-                    ))
-            );
+        WriteShareGroupStateResponseData responseData = new 
WriteShareGroupStateResponseData().setResults(

Review Comment:
   I think the previous style of
   
   ```
   new WriteShareGroupStateResponse()
   .setResults(
   ```
   
   was more in line with the rest of the code you've written.



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -808,6 +811,106 @@ public 
CompletableFuture<DeleteShareGroupStateResponseData> deleteState(RequestC
         });
     }
 
+    @Override
+    public CompletableFuture<InitializeShareGroupStateResponseData> 
initializeState(RequestContext context, InitializeShareGroupStateRequestData 
request) {
+        // Send an empty response if the coordinator is not active.
+        if (!isActive.get()) {
+            return CompletableFuture.completedFuture(
+                generateErrorInitStateResponse(
+                    request,
+                    Errors.COORDINATOR_NOT_AVAILABLE,
+                    "Share coordinator is not available."
+                )
+            );
+        }
+
+        String groupId = request.groupId();
+        // Send an empty response if groupId is invalid.
+        if (isGroupIdEmpty(groupId)) {
+            log.error("Group id must be specified and non-empty: {}", request);
+            return CompletableFuture.completedFuture(
+                new InitializeShareGroupStateResponseData()
+            );
+        }
+
+        // Send an empty response if topic data is empty.
+        if (isEmpty(request.topics())) {
+            log.error("Topic Data is empty: {}", request);
+            return CompletableFuture.completedFuture(
+                new InitializeShareGroupStateResponseData()
+            );
+        }
+
+        // A map to store the futures for each topicId and partition.
+        Map<Uuid, Map<Integer, 
CompletableFuture<InitializeShareGroupStateResponseData>>> futureMap = new 
HashMap<>();
+
+        // The request received here could have multiple keys of structure 
group:topic:partition. However,
+        // the initializeState method in ShareCoordinatorShard expects a 
single key in the request. Hence, we will
+        // be looping over the keys below and constructing new 
InitializeShareGroupStateRequestData objects to pass
+        // onto the shard method.
+
+        for (InitializeShareGroupStateRequestData.InitializeStateData 
topicData : request.topics()) {
+            Uuid topicId = topicData.topicId();
+            for (InitializeShareGroupStateRequestData.PartitionData 
partitionData : topicData.partitions()) {
+                SharePartitionKey coordinatorKey = 
SharePartitionKey.getInstance(request.groupId(), topicId, 
partitionData.partition());
+
+                InitializeShareGroupStateRequestData 
requestForCurrentPartition = new InitializeShareGroupStateRequestData()
+                    .setGroupId(groupId)
+                    .setTopics(List.of(new 
InitializeShareGroupStateRequestData.InitializeStateData()
+                        .setTopicId(topicId)
+                        .setPartitions(List.of(partitionData))));
+
+                CompletableFuture<InitializeShareGroupStateResponseData> 
initializeFuture = runtime.scheduleWriteOperation(
+                    "initialize-share-group-state",
+                    topicPartitionFor(coordinatorKey),
+                    Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()),
+                    coordinator -> 
coordinator.initializeState(requestForCurrentPartition)
+                ).exceptionally(deleteException ->

Review Comment:
   nit: initializeException?



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -808,6 +811,106 @@ public 
CompletableFuture<DeleteShareGroupStateResponseData> deleteState(RequestC
         });
     }
 
+    @Override
+    public CompletableFuture<InitializeShareGroupStateResponseData> 
initializeState(RequestContext context, InitializeShareGroupStateRequestData 
request) {
+        // Send an empty response if the coordinator is not active.
+        if (!isActive.get()) {
+            return CompletableFuture.completedFuture(
+                generateErrorInitStateResponse(
+                    request,
+                    Errors.COORDINATOR_NOT_AVAILABLE,
+                    "Share coordinator is not available."
+                )
+            );
+        }
+
+        String groupId = request.groupId();
+        // Send an empty response if groupId is invalid.
+        if (isGroupIdEmpty(groupId)) {
+            log.error("Group id must be specified and non-empty: {}", request);
+            return CompletableFuture.completedFuture(
+                new InitializeShareGroupStateResponseData()
+            );
+        }
+
+        // Send an empty response if topic data is empty.
+        if (isEmpty(request.topics())) {
+            log.error("Topic Data is empty: {}", request);
+            return CompletableFuture.completedFuture(
+                new InitializeShareGroupStateResponseData()
+            );
+        }
+
+        // A map to store the futures for each topicId and partition.
+        Map<Uuid, Map<Integer, 
CompletableFuture<InitializeShareGroupStateResponseData>>> futureMap = new 
HashMap<>();
+
+        // The request received here could have multiple keys of structure 
group:topic:partition. However,
+        // the initializeState method in ShareCoordinatorShard expects a 
single key in the request. Hence, we will
+        // be looping over the keys below and constructing new 
InitializeShareGroupStateRequestData objects to pass
+        // onto the shard method.
+
+        for (InitializeShareGroupStateRequestData.InitializeStateData 
topicData : request.topics()) {
+            Uuid topicId = topicData.topicId();
+            for (InitializeShareGroupStateRequestData.PartitionData 
partitionData : topicData.partitions()) {
+                SharePartitionKey coordinatorKey = 
SharePartitionKey.getInstance(request.groupId(), topicId, 
partitionData.partition());
+
+                InitializeShareGroupStateRequestData 
requestForCurrentPartition = new InitializeShareGroupStateRequestData()
+                    .setGroupId(groupId)
+                    .setTopics(List.of(new 
InitializeShareGroupStateRequestData.InitializeStateData()
+                        .setTopicId(topicId)
+                        .setPartitions(List.of(partitionData))));
+
+                CompletableFuture<InitializeShareGroupStateResponseData> 
initializeFuture = runtime.scheduleWriteOperation(
+                    "initialize-share-group-state",
+                    topicPartitionFor(coordinatorKey),
+                    Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()),
+                    coordinator -> 
coordinator.initializeState(requestForCurrentPartition)
+                ).exceptionally(deleteException ->
+                    handleOperationException(
+                        "initialize-share-group-state",
+                        request,
+                        deleteException,
+                        (error, message) -> 
InitializeShareGroupStateResponse.toErrorResponseData(
+                            topicData.topicId(),
+                            partitionData.partition(),
+                            error,
+                            "Unable to initialize share group state: " + 
deleteException.getMessage()
+                        ),
+                        log
+                    ));
+
+                futureMap.computeIfAbsent(topicId, k -> new HashMap<>())
+                    .put(partitionData.partition(), initializeFuture);
+            }
+        }
+
+        // Combine all futures into a single CompletableFuture<Void>.
+        CompletableFuture<Void> combinedFuture = 
CompletableFuture.allOf(futureMap.values().stream()
+            .flatMap(map -> 
map.values().stream()).toArray(CompletableFuture[]::new));
+
+        // Transform the combined CompletableFuture<Void> into 
CompletableFuture<DeleteShareGroupStateResponseData>.
+        return combinedFuture.thenApply(v -> {
+            List<InitializeShareGroupStateResponseData.InitializeStateResult> 
initializeStateResult = new ArrayList<>(futureMap.size());
+            futureMap.forEach(
+                (topicId, topicEntry) -> {
+                    
List<InitializeShareGroupStateResponseData.PartitionResult> partitionResults = 
new ArrayList<>(topicEntry.size());
+                    topicEntry.forEach(
+                        (partitionId, responseFuture) -> {
+                            // ResponseFut would already be completed by now 
since we have used

Review Comment:
   The variable name was different in the code you copied here :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to