junrao commented on code in PR #17283:
URL: https://github.com/apache/kafka/pull/17283#discussion_r1793880036


##########
core/src/main/java/kafka/server/share/ShareFetchUtils.java:
##########
@@ -44,61 +45,61 @@
 public class ShareFetchUtils {
     private static final Logger log = 
LoggerFactory.getLogger(ShareFetchUtils.class);
 
-    // Process the replica manager fetch response to update share partitions 
and futures. We acquire the fetched data
-    // from share partitions.
-    static CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> processFetchResponse(
+    /**
+     * Process the replica manager fetch response to create share fetch 
response. The response is created
+     * by acquiring records from the share partition.
+     */
+    static Map<TopicIdPartition, ShareFetchResponseData.PartitionData> 
processFetchResponse(
             ShareFetchData shareFetchData,
             Map<TopicIdPartition, FetchPartitionData> responseData,
             Map<SharePartitionKey, SharePartition> partitionCacheMap,
             ReplicaManager replicaManager
     ) {
-        Map<TopicIdPartition, 
CompletableFuture<ShareFetchResponseData.PartitionData>> futures = new 
HashMap<>();
+        Map<TopicIdPartition, ShareFetchResponseData.PartitionData> response = 
new HashMap<>();
         responseData.forEach((topicIdPartition, fetchPartitionData) -> {
 
             SharePartition sharePartition = partitionCacheMap.get(new 
SharePartitionKey(
                 shareFetchData.groupId(), topicIdPartition));
-            futures.put(topicIdPartition, 
sharePartition.acquire(shareFetchData.memberId(), fetchPartitionData)
-                    .handle((acquiredRecords, throwable) -> {
-                        log.trace("Acquired records for topicIdPartition: {} 
with share fetch data: {}, records: {}",
-                                topicIdPartition, shareFetchData, 
acquiredRecords);
-                        ShareFetchResponseData.PartitionData partitionData = 
new ShareFetchResponseData.PartitionData()
-                                
.setPartitionIndex(topicIdPartition.partition());
+            ShareFetchResponseData.PartitionData partitionData = new 
ShareFetchResponseData.PartitionData()
+                .setPartitionIndex(topicIdPartition.partition());
 
-                        if (throwable != null) {
-                            
partitionData.setErrorCode(Errors.forException(throwable).code());
-                            return partitionData;
-                        }
+            if (fetchPartitionData.error.code() != Errors.NONE.code()) {
+                partitionData
+                    .setRecords(null)
+                    .setErrorCode(fetchPartitionData.error.code())

Review Comment:
   Should we set error message too?



##########
core/src/main/java/kafka/server/share/ShareFetchUtils.java:
##########
@@ -44,61 +45,61 @@
 public class ShareFetchUtils {
     private static final Logger log = 
LoggerFactory.getLogger(ShareFetchUtils.class);
 
-    // Process the replica manager fetch response to update share partitions 
and futures. We acquire the fetched data
-    // from share partitions.
-    static CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> processFetchResponse(
+    /**
+     * Process the replica manager fetch response to create share fetch 
response. The response is created
+     * by acquiring records from the share partition.
+     */
+    static Map<TopicIdPartition, ShareFetchResponseData.PartitionData> 
processFetchResponse(
             ShareFetchData shareFetchData,
             Map<TopicIdPartition, FetchPartitionData> responseData,
             Map<SharePartitionKey, SharePartition> partitionCacheMap,
             ReplicaManager replicaManager
     ) {
-        Map<TopicIdPartition, 
CompletableFuture<ShareFetchResponseData.PartitionData>> futures = new 
HashMap<>();
+        Map<TopicIdPartition, ShareFetchResponseData.PartitionData> response = 
new HashMap<>();
         responseData.forEach((topicIdPartition, fetchPartitionData) -> {
 
             SharePartition sharePartition = partitionCacheMap.get(new 
SharePartitionKey(
                 shareFetchData.groupId(), topicIdPartition));
-            futures.put(topicIdPartition, 
sharePartition.acquire(shareFetchData.memberId(), fetchPartitionData)
-                    .handle((acquiredRecords, throwable) -> {
-                        log.trace("Acquired records for topicIdPartition: {} 
with share fetch data: {}, records: {}",
-                                topicIdPartition, shareFetchData, 
acquiredRecords);
-                        ShareFetchResponseData.PartitionData partitionData = 
new ShareFetchResponseData.PartitionData()
-                                
.setPartitionIndex(topicIdPartition.partition());
+            ShareFetchResponseData.PartitionData partitionData = new 
ShareFetchResponseData.PartitionData()
+                .setPartitionIndex(topicIdPartition.partition());
 
-                        if (throwable != null) {
-                            
partitionData.setErrorCode(Errors.forException(throwable).code());
-                            return partitionData;
-                        }
+            if (fetchPartitionData.error.code() != Errors.NONE.code()) {
+                partitionData
+                    .setRecords(null)
+                    .setErrorCode(fetchPartitionData.error.code())
+                    .setAcquiredRecords(Collections.emptyList());
 
-                        if (fetchPartitionData.error.code() == 
Errors.OFFSET_OUT_OF_RANGE.code()) {
-                            // In case we get OFFSET_OUT_OF_RANGE error, 
that's because the Log Start Offset is later than the fetch offset.
-                            // So, we would update the start and end offset of 
the share partition and still return an empty
-                            // response and let the client retry the fetch. 
This way we do not lose out on the data that
-                            // would be returned for other share partitions in 
the fetch request.
-                            
sharePartition.updateCacheAndOffsets(offsetForEarliestTimestamp(topicIdPartition,
 replicaManager));
-                            
partitionData.setPartitionIndex(topicIdPartition.partition())
-                                    .setRecords(null)
-                                    .setErrorCode(Errors.NONE.code())
-                                    
.setAcquiredRecords(Collections.emptyList())
-                                    
.setAcknowledgeErrorCode(Errors.NONE.code());
-                            return partitionData;
-                        }
-
-                        // Maybe, in the future, check if no records are 
acquired, and we want to retry
-                        // replica manager fetch. Depends on the share 
partition manager implementation,
-                        // if we want parallel requests for the same share 
partition or not.
-                        
partitionData.setPartitionIndex(topicIdPartition.partition())
-                                .setRecords(fetchPartitionData.records)
-                                .setErrorCode(fetchPartitionData.error.code())
-                                .setAcquiredRecords(acquiredRecords)
-                                .setAcknowledgeErrorCode(Errors.NONE.code());
-                        return partitionData;
-                    }));
-        });
-        return CompletableFuture.allOf(futures.values().toArray(new 
CompletableFuture[0])).thenApply(v -> {
-            Map<TopicIdPartition, ShareFetchResponseData.PartitionData> 
processedResult = new HashMap<>();
-            futures.forEach((topicIdPartition, future) -> 
processedResult.put(topicIdPartition, future.join()));
-            return processedResult;
+                // In case we get OFFSET_OUT_OF_RANGE error, that's because 
the Log Start Offset is later than the fetch offset.
+                // So, we would update the start and end offset of the share 
partition and still return an empty
+                // response and let the client retry the fetch. This way we do 
not lose out on the data that
+                // would be returned for other share partitions in the fetch 
request.
+                if (fetchPartitionData.error.code() == 
Errors.OFFSET_OUT_OF_RANGE.code()) {
+                    
sharePartition.updateCacheAndOffsets(offsetForEarliestTimestamp(topicIdPartition,
 replicaManager));
+                    // We set the error code to NONE, as we have updated the 
start offset of the share partition
+                    // and the client can retry the fetch.
+                    partitionData.setErrorCode(Errors.NONE.code());
+                }
+            } else {
+                List<AcquiredRecords> acquiredRecords = 
sharePartition.acquire(shareFetchData.memberId(), fetchPartitionData);
+                log.trace("Acquired records for topicIdPartition: {} with 
share fetch data: {}, records: {}",
+                    topicIdPartition, shareFetchData, acquiredRecords);
+                // Maybe, in the future, check if no records are acquired, and 
we want to retry
+                // replica manager fetch. Depends on the share partition 
manager implementation,
+                // if we want parallel requests for the same share partition 
or not.
+                if (acquiredRecords.isEmpty()) {
+                    partitionData
+                        .setRecords(null)
+                        .setAcquiredRecords(Collections.emptyList());
+                } else {
+                    partitionData
+                        .setRecords(fetchPartitionData.records)
+                        .setErrorCode(fetchPartitionData.error.code())

Review Comment:
   No need to set error code here since we don't set it in the `if` path?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to