AndrewJSchofield commented on code in PR #17580: URL: https://github.com/apache/kafka/pull/17580#discussion_r1838138773
########## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java: ########## @@ -478,6 +495,74 @@ public CompletableFuture<ReadShareGroupStateResponseData> readState(RequestConte }); } + /** + * Util method to create and make a write state request, + * if leader epoch update is needed. Otw, return a completed + * future. + * @param coordinatorKey - The share partition key object + * @param partitionData - Share partition information we from read request + * @return Completable future of ReadShareGroupStateResponseData + */ + private CompletableFuture<ReadShareGroupStateResponseData> maybeUpdateLeaderEpoch( + SharePartitionKey coordinatorKey, + ReadShareGroupStateRequestData.PartitionData partitionData + ) { + if (partitionData.leaderEpoch() != -1) { // no need to issue write as leaderEpoch is -1 (no change) + WriteShareGroupStateRequestData writeStateLeaderEpoch = new WriteShareGroupStateRequestData() + .setGroupId(coordinatorKey.groupId()) + .setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData() + .setTopicId(coordinatorKey.topicId()) + .setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData() + .setPartition(partitionData.partition()) + .setLeaderEpoch(partitionData.leaderEpoch()) + // force below attributes to be noop + .setStateEpoch(-1) + .setStateBatches(Collections.emptyList()) + .setStartOffset(-1)))) + ); + + Review Comment: nit: Lots of blank lines here. ########## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java: ########## @@ -289,6 +289,56 @@ public CoordinatorResult<WriteShareGroupStateResponseData, CoordinatorRecord> wr return new CoordinatorResult<>(Collections.singletonList(record), responseData); } + /** + * This is a special case method when only the leaderEpoch needs to be updated. + * It can happen if a read state call for a share partition has the highest leaderEpoch Review Comment: It can happen if a read state call for a share partition has a higher leaderEpoch value than seen so far. ########## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java: ########## @@ -289,6 +289,56 @@ public CoordinatorResult<WriteShareGroupStateResponseData, CoordinatorRecord> wr return new CoordinatorResult<>(Collections.singletonList(record), responseData); } + /** + * This is a special case method when only the leaderEpoch needs to be updated. + * It can happen if a read state call for a share partition has the highest leaderEpoch + * value seen so far. + * In case the update is not required, no record will be generated along with a success response. + * @param request - represents WriteShareGroupStateRequestData + * @return CoordinatorResult object + */ + public CoordinatorResult<WriteShareGroupStateResponseData, CoordinatorRecord> writeLeaderEpoch( + WriteShareGroupStateRequestData request + ) { + // only one key will be there in the request by design + Optional<CoordinatorResult<WriteShareGroupStateResponseData, CoordinatorRecord>> error = maybeGetWriteStateError(request); + if (error.isPresent()) { + return error.get(); + } + + WriteShareGroupStateRequestData.WriteStateData topicData = request.topics().get(0); + WriteShareGroupStateRequestData.PartitionData partitionData = topicData.partitions().get(0); + SharePartitionKey key = SharePartitionKey.getInstance(request.groupId(), topicData.topicId(), partitionData.partition()); + + int leaderEpoch = request.topics().get(0).partitions().get(0).leaderEpoch(); Review Comment: This can be `partitionData.leaderEpoch()` to save some lookups. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org