junrao commented on code in PR #17700:
URL: https://github.com/apache/kafka/pull/17700#discussion_r1873883681


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java:
##########
@@ -172,6 +186,387 @@ private PollResult pollInternal(FetchRequestPreparer 
fetchRequestPreparer,
         }
     }
 
+
+    /**
+     * Create fetch requests based on the configured {@link TempFetchMode}.
+     */
+    @Override
+    protected Map<Node, FetchSessionHandler.FetchRequestData> 
prepareFetchRequests() {
+        switch (fetchConfig.tempFetchMode) {
+            case SKIP_BUFFERED:
+                return super.prepareFetchRequests();
+
+            case SKIP_FETCH:
+                return prepareFetchRequests_option2();
+
+            case INCLUDE_NOMINAL:
+                return prepareFetchRequests_option3();
+
+            case SKIP_NODE:
+                return prepareFetchRequests_option4();
+
+            default:
+                throw new IllegalArgumentException("Invalid " + 
TempFetchMode.class.getSimpleName() + " option value: " + 
fetchConfig.tempFetchMode);
+        }
+    }
+
+    private Map<Node, FetchSessionHandler.FetchRequestData> 
prepareFetchRequests_option1() {
+        // 
-------------------------------------------------------------------------------------------------------------
+        //
+        //  #######  ########  ######## ####  #######  ##    ##          ##
+        // ##     ## ##     ##    ##     ##  ##     ## ###   ##        ####
+        // ##     ## ##     ##    ##     ##  ##     ## ####  ##          ##
+        // ##     ## ########     ##     ##  ##     ## ## ## ##          ##
+        // ##     ## ##           ##     ##  ##     ## ##  ####          ##
+        // ##     ## ##           ##     ##  ##     ## ##   ###          ##
+        //  #######  ##           ##    ####  #######  ##    ##        ######
+        //
+        // 
-------------------------------------------------------------------------------------------------------------
+        // Option 1 is the existing behavior
+        // 
-------------------------------------------------------------------------------------------------------------
+        return super.prepareFetchRequests();

Review Comment:
   Just want to understand the existing behavior a bit more. My understanding 
is the following. 
   
   For ClassicKafkaConsumer, it will only send a fetch request to a broker if 
(1) there is no pending fetch request and (2) there is at least one partition 
for the node that doesn't have buffered data.
   
   (2) is done through the call to  `fetchablePartitions()` in 
`AbstractFetch.prepareFetchRequests()` and (1) is done through the following 
code in `AbstractFetch.prepareFetchRequests()` .
   `            } else if (nodesWithPendingFetchRequests.contains(node.id())) `
   
   It seems that the existing ClassicKafkaConsumer could still lead to 
unnecessary evictions in the fetch session. For example, two partitions (p1 and 
p2) are fetched from the same broker and are buffered in the consumer. The 
application polls p1's data and calls `ClassicKafkaConsumer.sendFetches`. This 
will generate a fetch request including p1, but with p2 being removed, causing 
it to be evicted on the server side.
   
   For AsyncKafkaConsumer, the above issue also exists. It seems to have an 
additional issue that it allows more than one pending fetch request. 
`FetchRequestManager.pollInternal()` sets `pendingFetchRequestFuture` to null 
as soon as an `UnsentRequest` is generated, but not completed. This allows a 
second fetch request to be generated before the first one completes.
   
   Is my understanding correct?
   
   If so, to address the issue in `ClassicKafkaConsumer`, we could only send a 
fetch request if all partitions for a broker have been drained in the fetch 
buffer and there is no pending fetch request. To address the issue in 
`AsyncKafkaConsumer`, we could track pending fetch requests and make sure there 
is only one at a time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to