kirktrue commented on code in PR #15186:
URL: https://github.com/apache/kafka/pull/15186#discussion_r1459293494

##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##########
@@ -376,25 +376,21 @@ protected Map<Node, FetchSessionHandler.FetchRequestData> 
prepareCloseFetchSessi
         final Cluster cluster = metadata.fetch();
         Map<Node, FetchSessionHandler.Builder> fetchable = new HashMap<>();
 
-        try {
-            sessionHandlers.forEach((fetchTargetNodeId, sessionHandler) -> {
-                // set the session handler to notify close. This will set the 
next metadata request to send close message.
-                sessionHandler.notifyClose();
+        sessionHandlers.forEach((fetchTargetNodeId, sessionHandler) -> {

Review Comment:
   @philipnee—it doesn't look like there's actually an issue with respect to 
stale partition data in the `FetchSessionHandler`. It's reset on every call to 
[`prepareFetchRequests()`](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java#L406).
   
   The `FetchSessionHandler` uses an inner `Builder` class to keep track of the 
partitions the user is requesting. For each distinct fetch cycle [a new 
`Builder` is 
created](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java#L443).
 Each partition that is requested is [added to the newly-created `Builder` 
instance](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java#L452).
 When all the [fetchable 
partitions](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java#L324)
 have been added, [the `Builder.build()` method is 
called](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java#L459)
 which creates the `FetchRequestData`.
   
   So the data kept in the `FetchSessionHandler` is ephemeral. Any partition 
leadership changes would happen somewhere upstream and get updated in the 
`SubscriptionState`, well before we get to the fetch request generation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to