divijvaidya commented on code in PR #12590:
URL: https://github.com/apache/kafka/pull/12590#discussion_r977489099


##########
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java:
##########
@@ -590,6 +595,14 @@ public boolean handleResponse(FetchResponse response, 
short version) {
         }
     }
 
+    /**
+     * The client will initiate the session close on next fetch request.
+     */
+    public void notifyClose() {
+        log.info("Set the metadata for next fetch request to close the 
existing session ID={} ", nextMetadata.sessionId());
+        nextMetadata = nextMetadata.nextCloseExisting();

Review Comment:
   `SessionHandler` should not be reused here after close because:
   1. We drain all completed fetches before calling close of sessions. Hence, 
no completed fetches will use session.
   2. `Fetcher` is only called from the `Consumer`. `Consumer` has a single 
threaded access i.e. while it is processing the `close`, we don't expect it to 
poll or call `Fetcher.sendFetches`, session handler will not be used.
   3. `SessionHandler` map will be cleared after the close request is sent in 
the `Fetcher.close()`
   4. We have ensured that no other thread (e.g. FetchResponse future) can use 
`Fetcher` while it is being closed by acquiring a lock on `Fetcher` (at 
`synchronized (Fetcher.this)`) before close starts. This ensures that 
sessionHandler is not called by anyone before close is complete (which should 
clear the sessionHandler map).
   
   Is my understanding correct here?
   
   Regarding the test, what kind validation/assertion would you like to see 
from it? I can't think of a test that might be useful for us here.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -1933,11 +1941,79 @@ private Map<String, String> 
topicPartitionTags(TopicPartition tp) {
         }
     }
 
+    // Visible for testing
+    void maybeCloseFetchSessions(final Timer timer) {
+        final Cluster cluster = metadata.fetch();
+        final List<RequestFuture<ClientResponse>> requestFutures = new 
ArrayList<>();
+        for (final Map.Entry<Integer, FetchSessionHandler> entry : 
sessionHandlers.entrySet()) {

Review Comment:
   Fixed in latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to