apoorvmittal10 commented on code in PR #17283: URL: https://github.com/apache/kafka/pull/17283#discussion_r1793821651
########## core/src/main/java/kafka/server/share/ShareFetchUtils.java: ########## @@ -46,59 +47,59 @@ public class ShareFetchUtils { // Process the replica manager fetch response to update share partitions and futures. We acquire the fetched data // from share partitions. - static CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> processFetchResponse( + static Map<TopicIdPartition, ShareFetchResponseData.PartitionData> processFetchResponse( ShareFetchData shareFetchData, Map<TopicIdPartition, FetchPartitionData> responseData, Map<SharePartitionKey, SharePartition> partitionCacheMap, ReplicaManager replicaManager ) { - Map<TopicIdPartition, CompletableFuture<ShareFetchResponseData.PartitionData>> futures = new HashMap<>(); + Map<TopicIdPartition, ShareFetchResponseData.PartitionData> response = new HashMap<>(); responseData.forEach((topicIdPartition, fetchPartitionData) -> { SharePartition sharePartition = partitionCacheMap.get(new SharePartitionKey( shareFetchData.groupId(), topicIdPartition)); - futures.put(topicIdPartition, sharePartition.acquire(shareFetchData.memberId(), fetchPartitionData) - .handle((acquiredRecords, throwable) -> { - log.trace("Acquired records for topicIdPartition: {} with share fetch data: {}, records: {}", - topicIdPartition, shareFetchData, acquiredRecords); - ShareFetchResponseData.PartitionData partitionData = new ShareFetchResponseData.PartitionData() - .setPartitionIndex(topicIdPartition.partition()); + ShareFetchResponseData.PartitionData partitionData = new ShareFetchResponseData.PartitionData() + .setPartitionIndex(topicIdPartition.partition()) + .setAcknowledgeErrorCode(Errors.NONE.code()); - if (throwable != null) { - partitionData.setErrorCode(Errors.forException(throwable).code()); - return partitionData; - } + if (fetchPartitionData.error.code() != Errors.NONE.code()) { + partitionData + .setRecords(null) + .setErrorCode(fetchPartitionData.error.code()) + .setAcquiredRecords(Collections.emptyList()); - if (fetchPartitionData.error.code() == Errors.OFFSET_OUT_OF_RANGE.code()) { - // In case we get OFFSET_OUT_OF_RANGE error, that's because the Log Start Offset is later than the fetch offset. - // So, we would update the start and end offset of the share partition and still return an empty - // response and let the client retry the fetch. This way we do not lose out on the data that - // would be returned for other share partitions in the fetch request. - sharePartition.updateCacheAndOffsets(offsetForEarliestTimestamp(topicIdPartition, replicaManager)); - partitionData.setPartitionIndex(topicIdPartition.partition()) - .setRecords(null) - .setErrorCode(Errors.NONE.code()) - .setAcquiredRecords(Collections.emptyList()) - .setAcknowledgeErrorCode(Errors.NONE.code()); - return partitionData; - } - - // Maybe, in the future, check if no records are acquired, and we want to retry - // replica manager fetch. Depends on the share partition manager implementation, - // if we want parallel requests for the same share partition or not. - partitionData.setPartitionIndex(topicIdPartition.partition()) - .setRecords(fetchPartitionData.records) - .setErrorCode(fetchPartitionData.error.code()) - .setAcquiredRecords(acquiredRecords) - .setAcknowledgeErrorCode(Errors.NONE.code()); - return partitionData; - })); - }); - return CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0])).thenApply(v -> { - Map<TopicIdPartition, ShareFetchResponseData.PartitionData> processedResult = new HashMap<>(); - futures.forEach((topicIdPartition, future) -> processedResult.put(topicIdPartition, future.join())); - return processedResult; + // In case we get OFFSET_OUT_OF_RANGE error, that's because the Log Start Offset is later than the fetch offset. + // So, we would update the start and end offset of the share partition and still return an empty + // response and let the client retry the fetch. This way we do not lose out on the data that + // would be returned for other share partitions in the fetch request. + if (fetchPartitionData.error.code() == Errors.OFFSET_OUT_OF_RANGE.code()) { + sharePartition.updateCacheAndOffsets(offsetForEarliestTimestamp(topicIdPartition, replicaManager)); + // We set the error code to NONE, as we have updated the start offset of the share partition + // and the client can retry the fetch. + partitionData.setErrorCode(Errors.NONE.code()); + } + } else { + List<AcquiredRecords> acquiredRecords = sharePartition.acquire(shareFetchData.memberId(), fetchPartitionData); + log.trace("Acquired records for topicIdPartition: {} with share fetch data: {}, records: {}", + topicIdPartition, shareFetchData, acquiredRecords); + // Maybe, in the future, check if no records are acquired, and we want to retry + // replica manager fetch. Depends on the share partition manager implementation, + // if we want parallel requests for the same share partition or not. + if (acquiredRecords.isEmpty()) { + partitionData + .setRecords(null) + .setErrorCode(Errors.NONE.code()) Review Comment: Done. ########## core/src/main/java/kafka/server/share/ShareFetchUtils.java: ########## @@ -46,59 +47,59 @@ public class ShareFetchUtils { // Process the replica manager fetch response to update share partitions and futures. We acquire the fetched data // from share partitions. - static CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> processFetchResponse( + static Map<TopicIdPartition, ShareFetchResponseData.PartitionData> processFetchResponse( Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org