cadonna commented on code in PR #14532: URL: https://github.com/apache/kafka/pull/14532#discussion_r1356928191
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -290,7 +290,6 @@ private void onFailure(final long currentTimeMs, private void retry(final long currentTimeMs) { onFailedAttempt(currentTimeMs); - onSendAttempt(currentTimeMs); Review Comment: Why is this call removed? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ########## @@ -254,28 +250,38 @@ public String toString() { } } - public static class FutureCompletionHandler implements RequestCompletionHandler { + public static class FutureCompletionHandler extends CompletableFuture<ClientResponse> implements RequestCompletionHandler { - private final CompletableFuture<ClientResponse> future; + /** + * The time when the response is completed. This is used when the response is completed exceptionally because + * ClientResponse already contains received time which is injected by the network client. + */ Review Comment: I do not understand this comment. Variable `responseCompletionTimeMs` is also set when the request is completed succesfully. I actually think, we do not need this comment. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ########## @@ -184,13 +184,11 @@ private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() { NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest( new ConsumerGroupHeartbeatRequest.Builder(data), coordinatorRequestManager.coordinator()); - request.future().whenComplete((response, exception) -> { + request.handler().whenComplete((response, exception) -> { if (response != null) { onResponse((ConsumerGroupHeartbeatResponse) response.responseBody(), response.receivedTimeMs()); } else { - // TODO: Currently, we lack a good way to propage the response time from the network client to the - // request handler. We will need to store the response time in the handler to make it accessible. - onFailure(exception, time.milliseconds()); + onFailure(exception, request.handler().completionTimeMs()); Review Comment: I do not completely understand how this should work. If the response is `null` then I assume also `onComplete()` of `NetworkClientDelegate` is not called because that would lead to a `NullPointerException`. If `onComplete()` of `NetworkClientDelegate` is not called the `completionTimeMs` field in `NetworkClientDelegate` is not set. Thus, `request.handler().completionTimeMs()` will not return the completion time. Am I missing something? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org