apoorvmittal10 commented on code in PR #17283:
URL: https://github.com/apache/kafka/pull/17283#discussion_r1778526687


##########
core/src/main/java/kafka/server/share/ShareFetchUtils.java:
##########
@@ -46,59 +47,52 @@ public class ShareFetchUtils {
 
     // Process the replica manager fetch response to update share partitions 
and futures. We acquire the fetched data
     // from share partitions.
-    static CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> processFetchResponse(
+    static Map<TopicIdPartition, ShareFetchResponseData.PartitionData> 
processFetchResponse(
             ShareFetchData shareFetchData,
             Map<TopicIdPartition, FetchPartitionData> responseData,
             Map<SharePartitionKey, SharePartition> partitionCacheMap,
             ReplicaManager replicaManager
     ) {
-        Map<TopicIdPartition, 
CompletableFuture<ShareFetchResponseData.PartitionData>> futures = new 
HashMap<>();
+        Map<TopicIdPartition, ShareFetchResponseData.PartitionData> response = 
new HashMap<>();
         responseData.forEach((topicIdPartition, fetchPartitionData) -> {
 
             SharePartition sharePartition = partitionCacheMap.get(new 
SharePartitionKey(
                 shareFetchData.groupId(), topicIdPartition));
-            futures.put(topicIdPartition, 
sharePartition.acquire(shareFetchData.memberId(), fetchPartitionData)
-                    .handle((acquiredRecords, throwable) -> {
-                        log.trace("Acquired records for topicIdPartition: {} 
with share fetch data: {}, records: {}",
-                                topicIdPartition, shareFetchData, 
acquiredRecords);
-                        ShareFetchResponseData.PartitionData partitionData = 
new ShareFetchResponseData.PartitionData()
-                                
.setPartitionIndex(topicIdPartition.partition());
+            ShareFetchResponseData.PartitionData partitionData = new 
ShareFetchResponseData.PartitionData()
+                .setPartitionIndex(topicIdPartition.partition())
+                .setAcknowledgeErrorCode(Errors.NONE.code());
 
-                        if (throwable != null) {
-                            
partitionData.setErrorCode(Errors.forException(throwable).code());
-                            return partitionData;
-                        }
+            if (fetchPartitionData.error.code() != Errors.NONE.code()) {
+                partitionData
+                    .setRecords(null)
+                    .setErrorCode(fetchPartitionData.error.code())
+                    .setAcquiredRecords(Collections.emptyList());
 
-                        if (fetchPartitionData.error.code() == 
Errors.OFFSET_OUT_OF_RANGE.code()) {
-                            // In case we get OFFSET_OUT_OF_RANGE error, 
that's because the Log Start Offset is later than the fetch offset.
-                            // So, we would update the start and end offset of 
the share partition and still return an empty
-                            // response and let the client retry the fetch. 
This way we do not lose out on the data that
-                            // would be returned for other share partitions in 
the fetch request.
-                            
sharePartition.updateCacheAndOffsets(offsetForEarliestTimestamp(topicIdPartition,
 replicaManager));
-                            
partitionData.setPartitionIndex(topicIdPartition.partition())
-                                    .setRecords(null)
-                                    .setErrorCode(Errors.NONE.code())
-                                    
.setAcquiredRecords(Collections.emptyList())
-                                    
.setAcknowledgeErrorCode(Errors.NONE.code());
-                            return partitionData;
-                        }
-
-                        // Maybe, in the future, check if no records are 
acquired, and we want to retry
-                        // replica manager fetch. Depends on the share 
partition manager implementation,
-                        // if we want parallel requests for the same share 
partition or not.
-                        
partitionData.setPartitionIndex(topicIdPartition.partition())
-                                .setRecords(fetchPartitionData.records)
-                                .setErrorCode(fetchPartitionData.error.code())
-                                .setAcquiredRecords(acquiredRecords)
-                                .setAcknowledgeErrorCode(Errors.NONE.code());
-                        return partitionData;
-                    }));
-        });
-        return CompletableFuture.allOf(futures.values().toArray(new 
CompletableFuture[0])).thenApply(v -> {
-            Map<TopicIdPartition, ShareFetchResponseData.PartitionData> 
processedResult = new HashMap<>();
-            futures.forEach((topicIdPartition, future) -> 
processedResult.put(topicIdPartition, future.join()));
-            return processedResult;
+                // In case we get OFFSET_OUT_OF_RANGE error, that's because 
the Log Start Offset is later than the fetch offset.
+                // So, we would update the start and end offset of the share 
partition and still return an empty
+                // response and let the client retry the fetch. This way we do 
not lose out on the data that
+                // would be returned for other share partitions in the fetch 
request.
+                if (fetchPartitionData.error.code() == 
Errors.OFFSET_OUT_OF_RANGE.code()) {
+                    
sharePartition.updateCacheAndOffsets(offsetForEarliestTimestamp(topicIdPartition,
 replicaManager));
+                    // We set the error code to NONE, as we have updated the 
start offset of the share partition
+                    // and the client can retry the fetch.
+                    partitionData.setErrorCode(Errors.NONE.code());
+                }
+            } else {
+                List<AcquiredRecords> acquiredRecords = 
sharePartition.acquire(shareFetchData.memberId(), fetchPartitionData);
+                log.trace("Acquired records for topicIdPartition: {} with 
share fetch data: {}, records: {}",
+                    topicIdPartition, shareFetchData, acquiredRecords);
+                // Maybe, in the future, check if no records are acquired, and 
we want to retry

Review Comment:
   Make sense, I have added the code for that and also a test case to verify 
same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to