AndrewJSchofield commented on code in PR #16377:
URL: https://github.com/apache/kafka/pull/16377#discussion_r1644000033


##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -317,6 +364,197 @@ public void close() throws Exception {
         // TODO: Provide Implementation
     }
 
+    private ShareSessionKey shareSessionKey(String groupId, Uuid memberId) {
+        return new ShareSessionKey(groupId, memberId);
+    }
+
+    private static String partitionsToLogString(Collection<TopicIdPartition> 
partitions) {
+        return FetchSession.partitionsToLogString(partitions, 
log.isTraceEnabled());
+    }
+
+    /**
+     * Recursive function to process all the fetch requests present inside the 
fetch queue
+     */
+    private void maybeProcessFetchQueue() {
+        if (!processFetchQueueLock.compareAndSet(false, true)) {
+            // The queue is already being processed hence avoid re-triggering.
+            return;
+        }
+
+        // Initialize the topic partitions for which the fetch should be 
attempted.
+        Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = 
new LinkedHashMap<>();
+        ShareFetchPartitionData shareFetchPartitionData = fetchQueue.poll();
+        try {
+            assert shareFetchPartitionData != null;
+            shareFetchPartitionData.topicIdPartitions.forEach(topicIdPartition 
-> {
+                SharePartitionKey sharePartitionKey = sharePartitionKey(
+                    shareFetchPartitionData.groupId,
+                    topicIdPartition
+                );
+                SharePartition sharePartition = 
partitionCacheMap.computeIfAbsent(sharePartitionKey,
+                    k -> new SharePartition(shareFetchPartitionData.groupId, 
topicIdPartition, maxInFlightMessages, maxDeliveryCount,
+                        recordLockDurationMs, timer, time));
+                int partitionMaxBytes = 
shareFetchPartitionData.partitionMaxBytes.getOrDefault(topicIdPartition, 0);
+                // Add the share partition to the list of partitions to be 
fetched only if we can
+                // acquire the fetch lock on it.
+                if (sharePartition.maybeAcquireFetchLock()) {
+                    // Fetching over a topic should be able to proceed if any 
one of the following 2 conditions are met:
+                    // 1. The fetch is to happen somewhere in between the 
record states cached in the share partition.
+                    //    This is because in this case we don't need to check 
for the partition limit for in flight messages
+                    // 2. If condition 1 is not true, then that means we will 
be fetching new records which haven't been cached before.
+                    //    In this case it is necessary to check if the 
partition limit for in flight messages has been reached.
+                    if (sharePartition.nextFetchOffset() != 
(sharePartition.endOffset() + 1) || sharePartition.canFetchRecords()) {

Review Comment:
   I think this logic would be better encapsulated within the SharePartition 
class.



##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -317,6 +364,197 @@ public void close() throws Exception {
         // TODO: Provide Implementation
     }
 
+    private ShareSessionKey shareSessionKey(String groupId, Uuid memberId) {
+        return new ShareSessionKey(groupId, memberId);
+    }
+
+    private static String partitionsToLogString(Collection<TopicIdPartition> 
partitions) {
+        return FetchSession.partitionsToLogString(partitions, 
log.isTraceEnabled());
+    }
+
+    /**
+     * Recursive function to process all the fetch requests present inside the 
fetch queue
+     */
+    private void maybeProcessFetchQueue() {
+        if (!processFetchQueueLock.compareAndSet(false, true)) {
+            // The queue is already being processed hence avoid re-triggering.
+            return;
+        }
+
+        // Initialize the topic partitions for which the fetch should be 
attempted.
+        Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = 
new LinkedHashMap<>();
+        ShareFetchPartitionData shareFetchPartitionData = fetchQueue.poll();

Review Comment:
   I'm not convinced that this will always return an entry. Adding an entry and 
then calling this method are not synchronized (which is fine) but I expect 
there is a situation in which the added entry has already been processed before 
this method is called.



##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -317,6 +364,197 @@ public void close() throws Exception {
         // TODO: Provide Implementation
     }
 
+    private ShareSessionKey shareSessionKey(String groupId, Uuid memberId) {
+        return new ShareSessionKey(groupId, memberId);
+    }
+
+    private static String partitionsToLogString(Collection<TopicIdPartition> 
partitions) {
+        return FetchSession.partitionsToLogString(partitions, 
log.isTraceEnabled());
+    }
+
+    /**
+     * Recursive function to process all the fetch requests present inside the 
fetch queue
+     */
+    private void maybeProcessFetchQueue() {
+        if (!processFetchQueueLock.compareAndSet(false, true)) {
+            // The queue is already being processed hence avoid re-triggering.
+            return;
+        }
+
+        // Initialize the topic partitions for which the fetch should be 
attempted.
+        Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = 
new LinkedHashMap<>();
+        ShareFetchPartitionData shareFetchPartitionData = fetchQueue.poll();
+        try {
+            assert shareFetchPartitionData != null;
+            shareFetchPartitionData.topicIdPartitions.forEach(topicIdPartition 
-> {
+                SharePartitionKey sharePartitionKey = sharePartitionKey(
+                    shareFetchPartitionData.groupId,
+                    topicIdPartition
+                );
+                SharePartition sharePartition = 
partitionCacheMap.computeIfAbsent(sharePartitionKey,
+                    k -> new SharePartition(shareFetchPartitionData.groupId, 
topicIdPartition, maxInFlightMessages, maxDeliveryCount,
+                        recordLockDurationMs, timer, time));
+                int partitionMaxBytes = 
shareFetchPartitionData.partitionMaxBytes.getOrDefault(topicIdPartition, 0);
+                // Add the share partition to the list of partitions to be 
fetched only if we can
+                // acquire the fetch lock on it.
+                if (sharePartition.maybeAcquireFetchLock()) {
+                    // Fetching over a topic should be able to proceed if any 
one of the following 2 conditions are met:
+                    // 1. The fetch is to happen somewhere in between the 
record states cached in the share partition.
+                    //    This is because in this case we don't need to check 
for the partition limit for in flight messages
+                    // 2. If condition 1 is not true, then that means we will 
be fetching new records which haven't been cached before.
+                    //    In this case it is necessary to check if the 
partition limit for in flight messages has been reached.
+                    if (sharePartition.nextFetchOffset() != 
(sharePartition.endOffset() + 1) || sharePartition.canFetchRecords()) {
+                        topicPartitionData.put(
+                            topicIdPartition,
+                            new FetchRequest.PartitionData(
+                                topicIdPartition.topicId(),
+                                sharePartition.nextFetchOffset(),
+                                0,
+                                partitionMaxBytes,
+                                Optional.empty()
+                            )
+                        );
+                    } else {
+                        sharePartition.releaseFetchLock();
+                        log.info("Record lock partition limit exceeded for 
SharePartition with key {}, " +
+                            "cannot acquire more records", sharePartitionKey);
+                    }
+                }
+            });
+
+            if (topicPartitionData.isEmpty()) {
+                // No locks for share partitions could be acquired, so we 
complete the request and
+                // will re-fetch for the client in next poll.
+                
shareFetchPartitionData.future.complete(Collections.emptyMap());
+                // Though if no partitions can be locked then there must be 
some other request which
+                // is in-flight and should release the lock. But it's safe to 
release the lock as
+                // the lock on share partition already exists which 
facilitates correct behaviour
+                // with multiple requests from queue being processed.
+                releaseProcessFetchQueueLock();
+                return;
+            }
+
+            log.trace("Fetchable share partitions data: {} with groupId: {} 
fetch params: {}",
+                topicPartitionData, shareFetchPartitionData.groupId, 
shareFetchPartitionData.fetchParams);
+
+            replicaManager.fetchMessages(
+                shareFetchPartitionData.fetchParams,
+                CollectionConverters.asScala(
+                    topicPartitionData.entrySet().stream().map(entry ->
+                        new Tuple2<>(entry.getKey(), 
entry.getValue())).collect(Collectors.toList())
+                ),
+                QuotaFactory.UnboundedQuota$.MODULE$,
+                responsePartitionData -> {
+                    log.trace("Data successfully retrieved by replica manager: 
{}", responsePartitionData);
+                    List<Tuple2<TopicIdPartition, FetchPartitionData>> 
responseData = CollectionConverters.asJava(
+                        responsePartitionData);
+                    processFetchResponse(shareFetchPartitionData, 
responseData).whenComplete(
+                        (result, throwable) -> {
+                            if (throwable != null) {
+                                log.error("Error processing fetch response for 
share partitions", throwable);
+                                
shareFetchPartitionData.future.completeExceptionally(throwable);
+                            } else {
+                                
shareFetchPartitionData.future.complete(result);
+                            }
+                            // Releasing the lock to move ahead with the next 
request in queue.
+                            
releaseFetchQueueAndPartitionsLock(shareFetchPartitionData.groupId, 
topicPartitionData.keySet());

Review Comment:
   Hmm. Doesn't this mean only a single fetch at a time is possible for the 
entire SharePartitionManager? The fetch queue lock is only released once the 
replica manager has returned the records. This strikes me as a bad thing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to