dajac commented on code in PR #12813: URL: https://github.com/apache/kafka/pull/12813#discussion_r1048223644
########## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ########## @@ -641,7 +643,7 @@ private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionRespons // thus it is not safe to reassign the sequence. failBatch(batch, response, batch.attempts() < this.retries); } - if (error.exception() instanceof InvalidMetadataException) { + if (error.exception() instanceof InvalidMetadataException || error.exception() instanceof TimeoutException) { Review Comment: I wonder if this is necessary. It seems that the NetworkClient calls `metadataUpdater.handleServerDisconnect` when the node is disconnected and a metadata refresh is requested there. Could this be? ########## clients/src/main/java/org/apache/kafka/clients/NetworkClient.java: ########## @@ -346,8 +346,16 @@ private void cancelInFlightRequests(String nodeId, long now, Collection<ClientRe } if (!request.isInternalRequest) { - if (responses != null) - responses.add(request.disconnected(now, null)); + if (responses != null) { + ClientResponse disconnected; + + if (request.timeElapsedSinceSendMs(now) > request.requestTimeoutMs) Review Comment: I wonder if we could pass a boolean to the method instead of re-checking `timeElapsedSinceSendMs` here. Have you tried/considered this? ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ########## @@ -552,9 +552,11 @@ private void handleProduceResponse(ClientResponse response, Map<TopicPartition, if (response.wasDisconnected()) { log.trace("Cancelled request with header {} due to node {} being disconnected", requestHeader, response.destination()); - for (ProducerBatch batch : batches.values()) - completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION, String.format("Disconnected from node %s", response.destination())), + for (ProducerBatch batch : batches.values()) { + Errors e = response.wasTimedOut() ? Errors.REQUEST_TIMED_OUT : Errors.NETWORK_EXCEPTION; Review Comment: I wonder if propagating the TimeoutException to the user space has any backward compatibility implications. I think that we already use TimeoutException when the batch expires so the user should already handle timeouts. That seems to be ok. ########## clients/src/main/java/org/apache/kafka/clients/NetworkClient.java: ########## @@ -1284,6 +1292,11 @@ public ClientResponse disconnected(long timeMs, AuthenticationException authenti true, null, authenticationException, null); } + public ClientResponse timedOut(long timeMs, AuthenticationException authenticationException) { + return new ClientResponse(header, callback, destination, createdTimeMs, timeMs, + true, true, null, authenticationException, null); Review Comment: My understanding is that a timed out request is still considered disconnected as well. Am I correct? I think that this is important in order to not break compatibility. The NetworkClient is used in many other places. ########## clients/src/main/java/org/apache/kafka/clients/NetworkClient.java: ########## @@ -346,8 +346,16 @@ private void cancelInFlightRequests(String nodeId, long now, Collection<ClientRe } if (!request.isInternalRequest) { - if (responses != null) - responses.add(request.disconnected(now, null)); + if (responses != null) { + ClientResponse disconnected; Review Comment: nit: The variable name is not accurate anymore. ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ########## @@ -552,9 +552,11 @@ private void handleProduceResponse(ClientResponse response, Map<TopicPartition, if (response.wasDisconnected()) { log.trace("Cancelled request with header {} due to node {} being disconnected", requestHeader, response.destination()); - for (ProducerBatch batch : batches.values()) - completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION, String.format("Disconnected from node %s", response.destination())), + for (ProducerBatch batch : batches.values()) { + Errors e = response.wasTimedOut() ? Errors.REQUEST_TIMED_OUT : Errors.NETWORK_EXCEPTION; + completeBatch(batch, new ProduceResponse.PartitionResponse(e, String.format("Disconnected from node %s", response.destination())), Review Comment: nit: Should we also change the message here when the request is timed out? I also wonder if we should update the trace log at L553 as well. Perhaps, we should just add a `if (response.wasTimeOut)` before `if (response.wasDisconnected())` to fully customise it. ########## clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java: ########## @@ -2519,7 +2519,7 @@ public void testInflightBatchesExpireOnDeliveryTimeout() throws InterruptedExcep Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<>(); responseMap.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L)); - client.respond(new ProduceResponse(responseMap)); + client.respond(new ProduceResponse(responseMap), true, true); Review Comment: Changing this test does not seem right because it tests the delivery timeout of the batch, not the request timeout. Could we add another one? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org