divijvaidya commented on code in PR #12590:
URL: https://github.com/apache/kafka/pull/12590#discussion_r964541075


##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -1798,15 +1802,27 @@ private void consumerCloseTest(final long 
closeTimeoutMs,
                 // Expected exception
             }
 
-            // Ensure close has started and queued at least one more request 
after commitAsync
+            // Ensure close has started and queued at least one more request 
after commitAsync.
+            //
+            // Close enqueues two requests, but second is enqueued only after 
first has succeeded. First is
+            // LEAVE_GROUP as part of coordinator close and second is FETCH 
with epoch=FINAL_EPOCH. At this stage
+            // we expect only the first one to have been requested. Hence, 
waiting for total 2 requests, one for
+            // commit and another for LEAVE_GROUP.
             client.waitForRequests(2, 1000);
 
             // In graceful mode, commit response results in close() completing 
immediately without a timeout
             // In non-graceful mode, close() times out without an exception 
even though commit response is pending
+            int nonCloseRequests = 1;
             for (int i = 0; i < responses.size(); i++) {
                 client.waitForRequests(1, 1000);
-                client.respondFrom(responses.get(i), coordinator);
-                if (i != responses.size() - 1) {
+                if (i == responses.size() - 1 && responses.get(i) instanceof 
FetchResponse) {
+                    // last request is the close session request which is sent 
to the leader of the partition.
+                    client.respondFrom(responses.get(i), node);
+                } else {
+                    client.respondFrom(responses.get(i), coordinator);
+                }
+                if (i < nonCloseRequests) {
+                    // the close request should not complete until non-close 
requests (commit requests) have completed.

Review Comment:
   FYI reviewer
   
   This change is required since we have added new Fetch request as part of the 
close sequence. Thus, for a graceful close scenario, we need to mimic a server 
response to that Fetch request using the test utility `client.respondFrom`. The 
response might not necessarily come from the coordinator, instead the response 
will come from the `node` which was associated with the fetch session (which 
might be a node other than coordinator e.g. read replica)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to