divijvaidya commented on code in PR #12590: URL: https://github.com/apache/kafka/pull/12590#discussion_r964541075
########## clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java: ########## @@ -1798,15 +1802,27 @@ private void consumerCloseTest(final long closeTimeoutMs, // Expected exception } - // Ensure close has started and queued at least one more request after commitAsync + // Ensure close has started and queued at least one more request after commitAsync. + // + // Close enqueues two requests, but second is enqueued only after first has succeeded. First is + // LEAVE_GROUP as part of coordinator close and second is FETCH with epoch=FINAL_EPOCH. At this stage + // we expect only the first one to have been requested. Hence, waiting for total 2 requests, one for + // commit and another for LEAVE_GROUP. client.waitForRequests(2, 1000); // In graceful mode, commit response results in close() completing immediately without a timeout // In non-graceful mode, close() times out without an exception even though commit response is pending + int nonCloseRequests = 1; for (int i = 0; i < responses.size(); i++) { client.waitForRequests(1, 1000); - client.respondFrom(responses.get(i), coordinator); - if (i != responses.size() - 1) { + if (i == responses.size() - 1 && responses.get(i) instanceof FetchResponse) { + // last request is the close session request which is sent to the leader of the partition. + client.respondFrom(responses.get(i), node); + } else { + client.respondFrom(responses.get(i), coordinator); + } + if (i < nonCloseRequests) { + // the close request should not complete until non-close requests (commit requests) have completed. Review Comment: FYI reviewer This change is required since we have added new Fetch request as part of the close sequence. Thus, for a graceful close scenario, we need to mimic a server response to that Fetch request using the test utility `client.respondFrom`. The response might not necessarily come from the coordinator, instead the response will come from the `node` which was associated with the fetch session (which might be a node other than coordinator e.g. read replica) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org