philipnee commented on code in PR #12590: URL: https://github.com/apache/kafka/pull/12590#discussion_r1054670620
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java: ########## @@ -1933,11 +1943,79 @@ private Map<String, String> topicPartitionTags(TopicPartition tp) { } } + // Visible for testing + void maybeCloseFetchSessions(final Timer timer) { + final Cluster cluster = metadata.fetch(); + final List<RequestFuture<ClientResponse>> requestFutures = new ArrayList<>(); + for (final Map.Entry<Integer, FetchSessionHandler> entry : sessionHandlers.entrySet()) { + final FetchSessionHandler sessionHandler = entry.getValue(); + // set the session handler to notify close. This will set the next metadata request to send close message. + sessionHandler.notifyClose(); + + final int sessionId = sessionHandler.sessionId(); + final Integer fetchTargetNodeId = entry.getKey(); + // FetchTargetNode may not be available as it may have disconnected the connection. In such cases, we will + // skip sending the close request. + final Node fetchTarget = cluster.nodeById(fetchTargetNodeId); + if (fetchTarget == null || client.isUnavailable(fetchTarget)) { + log.debug("Skip sending close session request to broker {} since it is not reachable", fetchTarget); + continue; + } + + final RequestFuture<ClientResponse> responseFuture = sendFetchRequestToNode(sessionHandler.newBuilder().build(), fetchTarget); + responseFuture.addListener(new RequestFutureListener<ClientResponse>() { + @Override + public void onSuccess(ClientResponse value) { + log.debug("Successfully sent a close message for fetch session: {} to node: {}", sessionId, fetchTarget); Review Comment: do we really need to log all of the successful close? I think logging the failure ones could be sufficient. ########## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java: ########## @@ -2403,17 +2404,40 @@ private ClusterResourceListeners configureClusterResourceListeners(Deserializer< return clusterResourceListeners; } - private void close(long timeoutMs, boolean swallowException) { + private void close(Duration timeout, boolean swallowException) { log.trace("Closing the Kafka consumer"); AtomicReference<Throwable> firstException = new AtomicReference<>(); - try { - if (coordinator != null) - coordinator.close(time.timer(Math.min(timeoutMs, requestTimeoutMs))); - } catch (Throwable t) { - firstException.compareAndSet(null, t); - log.error("Failed to close coordinator", t); + + final Timer closeTimer = (time == null) ? new SystemTime().timer(Math.min(timeout.toMillis(), requestTimeoutMs)) : time.timer(Math.min(timeout.toMillis(), requestTimeoutMs)); + // Close objects with a timeout. The timeout is required because fetcher makes request to the server in the + // process of closing which may not respect the overall timeout defined for closing the consumer. + if (coordinator != null) { + try { + coordinator.close(closeTimer); Review Comment: note: I think it also tries blocking commits the offset during the close. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java: ########## @@ -446,6 +429,33 @@ private RequestFuture<ClientResponse> sendMetadataRequest(MetadataRequest.Builde return client.send(node, request); } + /** + * Send Fetch Request to Kafka cluster asynchronously. + * + * This method is visible for testing. + * + * @return A future that indicates result of sent Fetch request + */ + RequestFuture<ClientResponse> sendFetchRequestToNode(final FetchSessionHandler.FetchRequestData requestData, Review Comment: can this be private? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java: ########## @@ -1933,11 +1943,79 @@ private Map<String, String> topicPartitionTags(TopicPartition tp) { } } + // Visible for testing + void maybeCloseFetchSessions(final Timer timer) { + final Cluster cluster = metadata.fetch(); + final List<RequestFuture<ClientResponse>> requestFutures = new ArrayList<>(); + for (final Map.Entry<Integer, FetchSessionHandler> entry : sessionHandlers.entrySet()) { + final FetchSessionHandler sessionHandler = entry.getValue(); + // set the session handler to notify close. This will set the next metadata request to send close message. + sessionHandler.notifyClose(); + + final int sessionId = sessionHandler.sessionId(); + final Integer fetchTargetNodeId = entry.getKey(); + // FetchTargetNode may not be available as it may have disconnected the connection. In such cases, we will + // skip sending the close request. + final Node fetchTarget = cluster.nodeById(fetchTargetNodeId); Review Comment: could really just be `cluster.nodeById(entry.getKey())`. ########## clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java: ########## @@ -150,6 +150,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +/** + * Note to future authors in this class. If you close the consumer, close with DURATION.ZERO to reduce the duration of the + * test. Review Comment: can be combined into the previous line as we don't have the hard limit on number of char per line :) ########## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java: ########## @@ -2403,17 +2404,46 @@ private ClusterResourceListeners configureClusterResourceListeners(Deserializer< return clusterResourceListeners; } - private void close(long timeoutMs, boolean swallowException) { + private Timer createTimerForRequest(final Duration timeout) { + final Time localTime = (time == null) ? new SystemTime() : time; + return localTime.timer(Math.min(timeout.toMillis(), requestTimeoutMs)); + } + + private void close(Duration timeout, boolean swallowException) { log.trace("Closing the Kafka consumer"); AtomicReference<Throwable> firstException = new AtomicReference<>(); - try { - if (coordinator != null) - coordinator.close(time.timer(Math.min(timeoutMs, requestTimeoutMs))); - } catch (Throwable t) { - firstException.compareAndSet(null, t); - log.error("Failed to close coordinator", t); + + final Timer closeTimer = createTimerForRequest(timeout); + // Close objects with a timeout. The timeout is required because coordinator & fetcher send requests to the + // server in the process of closing which may not respect the overall timeout defined for closing the consumer. + if (coordinator != null) { + try { + // This is a blocking call bound by the time remaining in closeTimer + coordinator.close(closeTimer); + } catch (Throwable t) { + firstException.compareAndSet(null, t); + log.error("Failed to close consumer coordinator", t); + } } - Utils.closeQuietly(fetcher, "fetcher", firstException); + + if (fetcher != null) { Review Comment: would it make sense to add a ``` public static void closeQuietlySync(AutoCloseable closeable, String name, AtomicReference<Throwable> firstException, Timer timer) { ``` To handle both coordinator and fetcher blocking close? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java: ########## @@ -1933,11 +1943,79 @@ private Map<String, String> topicPartitionTags(TopicPartition tp) { } } + // Visible for testing + void maybeCloseFetchSessions(final Timer timer) { + final Cluster cluster = metadata.fetch(); + final List<RequestFuture<ClientResponse>> requestFutures = new ArrayList<>(); + for (final Map.Entry<Integer, FetchSessionHandler> entry : sessionHandlers.entrySet()) { + final FetchSessionHandler sessionHandler = entry.getValue(); + // set the session handler to notify close. This will set the next metadata request to send close message. + sessionHandler.notifyClose(); + + final int sessionId = sessionHandler.sessionId(); + final Integer fetchTargetNodeId = entry.getKey(); + // FetchTargetNode may not be available as it may have disconnected the connection. In such cases, we will + // skip sending the close request. + final Node fetchTarget = cluster.nodeById(fetchTargetNodeId); + if (fetchTarget == null || client.isUnavailable(fetchTarget)) { + log.debug("Skip sending close session request to broker {} since it is not reachable", fetchTarget); + continue; + } + + final RequestFuture<ClientResponse> responseFuture = sendFetchRequestToNode(sessionHandler.newBuilder().build(), fetchTarget); + responseFuture.addListener(new RequestFutureListener<ClientResponse>() { + @Override + public void onSuccess(ClientResponse value) { + log.debug("Successfully sent a close message for fetch session: {} to node: {}", sessionId, fetchTarget); + } + + @Override + public void onFailure(RuntimeException e) { + log.debug("Unable to a close message for fetch session: {} to node: {}. " + + "This may result in unnecessary fetch sessions at the broker.", sessionId, fetchTarget, e); + } + }); + + requestFutures.add(responseFuture); + } + + // Poll to ensure that request has been written to the socket. Wait until either the timer has expired or until + // all requests have received a response. + do { + client.poll(timer, null, true); + } while (timer.notExpired() && !requestFutures.stream().allMatch(RequestFuture::isDone)); Review Comment: is it possible to implement these conditions in the pollCondition? maybe like client.poll(timer, () -> { requestFutures.stream().allMatch(RequestFuture::isDone) }, true) ########## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java: ########## @@ -2403,17 +2404,46 @@ private ClusterResourceListeners configureClusterResourceListeners(Deserializer< return clusterResourceListeners; } - private void close(long timeoutMs, boolean swallowException) { + private Timer createTimerForRequest(final Duration timeout) { + final Time localTime = (time == null) ? new SystemTime() : time; + return localTime.timer(Math.min(timeout.toMillis(), requestTimeoutMs)); + } + + private void close(Duration timeout, boolean swallowException) { log.trace("Closing the Kafka consumer"); AtomicReference<Throwable> firstException = new AtomicReference<>(); - try { - if (coordinator != null) - coordinator.close(time.timer(Math.min(timeoutMs, requestTimeoutMs))); - } catch (Throwable t) { - firstException.compareAndSet(null, t); - log.error("Failed to close coordinator", t); + + final Timer closeTimer = createTimerForRequest(timeout); + // Close objects with a timeout. The timeout is required because coordinator & fetcher send requests to the + // server in the process of closing which may not respect the overall timeout defined for closing the consumer. + if (coordinator != null) { + try { + // This is a blocking call bound by the time remaining in closeTimer + coordinator.close(closeTimer); + } catch (Throwable t) { + firstException.compareAndSet(null, t); + log.error("Failed to close consumer coordinator", t); + } } - Utils.closeQuietly(fetcher, "fetcher", firstException); + + if (fetcher != null) { + // the timeout for the session close is at-most the requestTimeoutMs + long remainingDurationInTimeout = Math.max(0, timeout.toMillis() - closeTimer.elapsedMs()); + if (remainingDurationInTimeout > 0) { Review Comment: can it just be this? ``` long remainingDurationInTimeout = Math.min(requestTimeoutMs, Math.max(0, timeout.toMillis() - closeTimer.elapsedMs())); ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java: ########## @@ -2403,17 +2404,46 @@ private ClusterResourceListeners configureClusterResourceListeners(Deserializer< return clusterResourceListeners; } - private void close(long timeoutMs, boolean swallowException) { + private Timer createTimerForRequest(final Duration timeout) { + final Time localTime = (time == null) ? new SystemTime() : time; Review Comment: mind elaborate on when the time object can be null? maybe when we inject the time object? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java: ########## @@ -1933,11 +1941,79 @@ private Map<String, String> topicPartitionTags(TopicPartition tp) { } } + // Visible for testing + void maybeCloseFetchSessions(final Timer timer) { + final Cluster cluster = metadata.fetch(); + final List<RequestFuture<ClientResponse>> requestFutures = new ArrayList<>(); + for (final Map.Entry<Integer, FetchSessionHandler> entry : sessionHandlers.entrySet()) { Review Comment: maybe ``` requestFutures = sessionHandlers.entrySet().stream().filter(e -> closeable(e)).map(// toRespFuture()).collect(Collectors.toList()) ``` ########## clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java: ########## @@ -150,6 +150,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +/** + * Note to future authors in this class. If you close the consumer, close with DURATION.ZERO to reduce the duration of the + * test. Review Comment: we are doing a lot of close(Duration.ZERO) here, maybe it make sense to implement a wrapper class around the consumer with close() method, in this test, so that we could continue using try-with-resource pattern here. I was wondering if we could leverage autoclosable and avoid explicit calling close(Duration.ZERO) in every test. or alternatively, we could try to close in `@AfterEach` ``` if (autoClosing && consumer != null) { consumer.close(Duration.ZERO); } -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org