junrao commented on code in PR #17700: URL: https://github.com/apache/kafka/pull/17700#discussion_r1873883681
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java: ########## @@ -172,6 +186,387 @@ private PollResult pollInternal(FetchRequestPreparer fetchRequestPreparer, } } + + /** + * Create fetch requests based on the configured {@link TempFetchMode}. + */ + @Override + protected Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() { + switch (fetchConfig.tempFetchMode) { + case SKIP_BUFFERED: + return super.prepareFetchRequests(); + + case SKIP_FETCH: + return prepareFetchRequests_option2(); + + case INCLUDE_NOMINAL: + return prepareFetchRequests_option3(); + + case SKIP_NODE: + return prepareFetchRequests_option4(); + + default: + throw new IllegalArgumentException("Invalid " + TempFetchMode.class.getSimpleName() + " option value: " + fetchConfig.tempFetchMode); + } + } + + private Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests_option1() { + // ------------------------------------------------------------------------------------------------------------- + // + // ####### ######## ######## #### ####### ## ## ## + // ## ## ## ## ## ## ## ## ### ## #### + // ## ## ## ## ## ## ## ## #### ## ## + // ## ## ######## ## ## ## ## ## ## ## ## + // ## ## ## ## ## ## ## ## #### ## + // ## ## ## ## ## ## ## ## ### ## + // ####### ## ## #### ####### ## ## ###### + // + // ------------------------------------------------------------------------------------------------------------- + // Option 1 is the existing behavior + // ------------------------------------------------------------------------------------------------------------- + return super.prepareFetchRequests(); Review Comment: Just want to understand the existing behavior a bit more. My understanding is the following. For ClassicKafkaConsumer, it will only send a fetch request to a broker if (1) there is no pending fetch request and (2) there is at least one partition for the node that doesn't have buffered data. (2) is done through the call to `fetchablePartitions()` in `AbstractFetch.prepareFetchRequests()` and (1) is done through the following code in `AbstractFetch.prepareFetchRequests()` . ` } else if (nodesWithPendingFetchRequests.contains(node.id())) ` It seems that the existing ClassicKafkaConsumer could still lead to unnecessary evictions in the fetch session. For example, two partitions (p1 and p2) are fetched from the same broker and are buffered in the consumer. The application polls p1's data and calls `ClassicKafkaConsumer.sendFetches`. This will generate a fetch request including p1, but with p2 being removed, causing it to be evicted on the server side. For AsyncKafkaConsumer, the above issue also exists. It seems to have an additional issue that it allows more than one pending fetch request. `FetchRequestManager.pollInternal()` sets `pendingFetchRequestFuture` to null as soon as an `UnsentRequest` is generated, but not completed. This allows a second fetch request to be generated before the first one completes. Is my understanding correct? If so, to address the issue in `ClassicKafkaConsumer`, we could only send a fetch request if all partitions for a broker have been drained in the fetch buffer and there is no pending fetch request. To address the issue in `AsyncKafkaConsumer`, we could track pending fetch requests and make sure there is only one at a time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org