AndrewJSchofield commented on code in PR #17580:
URL: https://github.com/apache/kafka/pull/17580#discussion_r1838138773


##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -478,6 +495,74 @@ public CompletableFuture<ReadShareGroupStateResponseData> 
readState(RequestConte
         });
     }
 
+    /**
+     * Util method to create and make a write state request,
+     * if leader epoch update is needed. Otw, return a completed
+     * future.
+     * @param coordinatorKey - The share partition key object
+     * @param partitionData - Share partition information we from read request
+     * @return Completable future of ReadShareGroupStateResponseData
+     */
+    private CompletableFuture<ReadShareGroupStateResponseData> 
maybeUpdateLeaderEpoch(
+        SharePartitionKey coordinatorKey,
+        ReadShareGroupStateRequestData.PartitionData partitionData
+    ) {
+        if (partitionData.leaderEpoch() != -1) {   // no need to issue write 
as leaderEpoch is -1 (no change)
+            WriteShareGroupStateRequestData writeStateLeaderEpoch = new 
WriteShareGroupStateRequestData()
+                .setGroupId(coordinatorKey.groupId())
+                .setTopics(Collections.singletonList(new 
WriteShareGroupStateRequestData.WriteStateData()
+                    .setTopicId(coordinatorKey.topicId())
+                    .setPartitions(Collections.singletonList(new 
WriteShareGroupStateRequestData.PartitionData()
+                        .setPartition(partitionData.partition())
+                        .setLeaderEpoch(partitionData.leaderEpoch())
+                        // force below attributes to be noop
+                        .setStateEpoch(-1)
+                        .setStateBatches(Collections.emptyList())
+                        .setStartOffset(-1))))
+                );
+
+

Review Comment:
   nit: Lots of blank lines here.



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -289,6 +289,56 @@ public CoordinatorResult<WriteShareGroupStateResponseData, 
CoordinatorRecord> wr
         return new CoordinatorResult<>(Collections.singletonList(record), 
responseData);
     }
 
+    /**
+     * This is a special case method when only the leaderEpoch needs to be 
updated.
+     * It can happen if a read state call for a share partition has the 
highest leaderEpoch

Review Comment:
   It can happen if a read state call for a share partition has a higher 
leaderEpoch value than seen so far.



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -289,6 +289,56 @@ public CoordinatorResult<WriteShareGroupStateResponseData, 
CoordinatorRecord> wr
         return new CoordinatorResult<>(Collections.singletonList(record), 
responseData);
     }
 
+    /**
+     * This is a special case method when only the leaderEpoch needs to be 
updated.
+     * It can happen if a read state call for a share partition has the 
highest leaderEpoch
+     * value seen so far.
+     * In case the update is not required, no record will be generated along 
with a success response.
+     * @param request - represents WriteShareGroupStateRequestData
+     * @return CoordinatorResult object
+     */
+    public CoordinatorResult<WriteShareGroupStateResponseData, 
CoordinatorRecord> writeLeaderEpoch(
+        WriteShareGroupStateRequestData request
+    ) {
+        // only one key will be there in the request by design
+        Optional<CoordinatorResult<WriteShareGroupStateResponseData, 
CoordinatorRecord>> error = maybeGetWriteStateError(request);
+        if (error.isPresent()) {
+            return error.get();
+        }
+
+        WriteShareGroupStateRequestData.WriteStateData topicData = 
request.topics().get(0);
+        WriteShareGroupStateRequestData.PartitionData partitionData = 
topicData.partitions().get(0);
+        SharePartitionKey key = 
SharePartitionKey.getInstance(request.groupId(), topicData.topicId(), 
partitionData.partition());
+
+        int leaderEpoch = 
request.topics().get(0).partitions().get(0).leaderEpoch();

Review Comment:
   This can be `partitionData.leaderEpoch()` to save some lookups.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to