AndrewJSchofield commented on code in PR #17283: URL: https://github.com/apache/kafka/pull/17283#discussion_r1778270660
########## core/src/main/java/kafka/server/share/ShareFetchUtils.java: ########## @@ -46,59 +47,52 @@ public class ShareFetchUtils { // Process the replica manager fetch response to update share partitions and futures. We acquire the fetched data // from share partitions. - static CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> processFetchResponse( + static Map<TopicIdPartition, ShareFetchResponseData.PartitionData> processFetchResponse( ShareFetchData shareFetchData, Map<TopicIdPartition, FetchPartitionData> responseData, Map<SharePartitionKey, SharePartition> partitionCacheMap, ReplicaManager replicaManager ) { - Map<TopicIdPartition, CompletableFuture<ShareFetchResponseData.PartitionData>> futures = new HashMap<>(); + Map<TopicIdPartition, ShareFetchResponseData.PartitionData> response = new HashMap<>(); responseData.forEach((topicIdPartition, fetchPartitionData) -> { SharePartition sharePartition = partitionCacheMap.get(new SharePartitionKey( shareFetchData.groupId(), topicIdPartition)); - futures.put(topicIdPartition, sharePartition.acquire(shareFetchData.memberId(), fetchPartitionData) - .handle((acquiredRecords, throwable) -> { - log.trace("Acquired records for topicIdPartition: {} with share fetch data: {}, records: {}", - topicIdPartition, shareFetchData, acquiredRecords); - ShareFetchResponseData.PartitionData partitionData = new ShareFetchResponseData.PartitionData() - .setPartitionIndex(topicIdPartition.partition()); + ShareFetchResponseData.PartitionData partitionData = new ShareFetchResponseData.PartitionData() + .setPartitionIndex(topicIdPartition.partition()) + .setAcknowledgeErrorCode(Errors.NONE.code()); - if (throwable != null) { - partitionData.setErrorCode(Errors.forException(throwable).code()); - return partitionData; - } + if (fetchPartitionData.error.code() != Errors.NONE.code()) { + partitionData + .setRecords(null) + .setErrorCode(fetchPartitionData.error.code()) + .setAcquiredRecords(Collections.emptyList()); - if (fetchPartitionData.error.code() == Errors.OFFSET_OUT_OF_RANGE.code()) { - // In case we get OFFSET_OUT_OF_RANGE error, that's because the Log Start Offset is later than the fetch offset. - // So, we would update the start and end offset of the share partition and still return an empty - // response and let the client retry the fetch. This way we do not lose out on the data that - // would be returned for other share partitions in the fetch request. - sharePartition.updateCacheAndOffsets(offsetForEarliestTimestamp(topicIdPartition, replicaManager)); - partitionData.setPartitionIndex(topicIdPartition.partition()) - .setRecords(null) - .setErrorCode(Errors.NONE.code()) - .setAcquiredRecords(Collections.emptyList()) - .setAcknowledgeErrorCode(Errors.NONE.code()); - return partitionData; - } - - // Maybe, in the future, check if no records are acquired, and we want to retry - // replica manager fetch. Depends on the share partition manager implementation, - // if we want parallel requests for the same share partition or not. - partitionData.setPartitionIndex(topicIdPartition.partition()) - .setRecords(fetchPartitionData.records) - .setErrorCode(fetchPartitionData.error.code()) - .setAcquiredRecords(acquiredRecords) - .setAcknowledgeErrorCode(Errors.NONE.code()); - return partitionData; - })); - }); - return CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0])).thenApply(v -> { - Map<TopicIdPartition, ShareFetchResponseData.PartitionData> processedResult = new HashMap<>(); - futures.forEach((topicIdPartition, future) -> processedResult.put(topicIdPartition, future.join())); - return processedResult; + // In case we get OFFSET_OUT_OF_RANGE error, that's because the Log Start Offset is later than the fetch offset. + // So, we would update the start and end offset of the share partition and still return an empty + // response and let the client retry the fetch. This way we do not lose out on the data that + // would be returned for other share partitions in the fetch request. + if (fetchPartitionData.error.code() == Errors.OFFSET_OUT_OF_RANGE.code()) { + sharePartition.updateCacheAndOffsets(offsetForEarliestTimestamp(topicIdPartition, replicaManager)); + // We set the error code to NONE, as we have updated the start offset of the share partition + // and the client can retry the fetch. + partitionData.setErrorCode(Errors.NONE.code()); + } + } else { + List<AcquiredRecords> acquiredRecords = sharePartition.acquire(shareFetchData.memberId(), fetchPartitionData); + log.trace("Acquired records for topicIdPartition: {} with share fetch data: {}, records: {}", + topicIdPartition, shareFetchData, acquiredRecords); + // Maybe, in the future, check if no records are acquired, and we want to retry Review Comment: If no records were acquired, we should probably not be returning a bunch of records of which none were acquired. ########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -119,18 +120,11 @@ public void onComplete() { }); log.trace("Data successfully retrieved by replica manager: {}", responseData); - ShareFetchUtils.processFetchResponse(shareFetchData, responseData, partitionCacheMap, replicaManager) - .whenComplete((result, throwable) -> { - if (throwable != null) { - log.error("Error processing fetch response for share partitions", throwable); - shareFetchData.future().completeExceptionally(throwable); - } else { - shareFetchData.future().complete(result); - } - // Releasing the lock to move ahead with the next request in queue. - releasePartitionLocks(shareFetchData.groupId(), topicPartitionData.keySet()); - }); - + Map<TopicIdPartition, ShareFetchResponseData.PartitionData> result = Review Comment: I think the semantics of the exception handling are different now. If processFetchResponse throws, you'll skip the release of the locks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org