dajac commented on code in PR #12813:
URL: https://github.com/apache/kafka/pull/12813#discussion_r1048223644


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -641,7 +643,7 @@ private void completeBatch(ProducerBatch batch, 
ProduceResponse.PartitionRespons
                 // thus it is not safe to reassign the sequence.
                 failBatch(batch, response, batch.attempts() < this.retries);
             }
-            if (error.exception() instanceof InvalidMetadataException) {
+            if (error.exception() instanceof InvalidMetadataException || 
error.exception() instanceof TimeoutException) {

Review Comment:
   I wonder if this is necessary. It seems that the NetworkClient calls 
`metadataUpdater.handleServerDisconnect` when the node is disconnected and a 
metadata refresh is requested there. Could this be?



##########
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##########
@@ -346,8 +346,16 @@ private void cancelInFlightRequests(String nodeId, long 
now, Collection<ClientRe
             }
 
             if (!request.isInternalRequest) {
-                if (responses != null)
-                    responses.add(request.disconnected(now, null));
+                if (responses != null) {
+                    ClientResponse disconnected;
+
+                    if (request.timeElapsedSinceSendMs(now) > 
request.requestTimeoutMs)

Review Comment:
   I wonder if we could pass a boolean to the method instead of re-checking 
`timeElapsedSinceSendMs` here. Have you tried/considered this?



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -552,9 +552,11 @@ private void handleProduceResponse(ClientResponse 
response, Map<TopicPartition,
         if (response.wasDisconnected()) {
             log.trace("Cancelled request with header {} due to node {} being 
disconnected",
                 requestHeader, response.destination());
-            for (ProducerBatch batch : batches.values())
-                completeBatch(batch, new 
ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION, 
String.format("Disconnected from node %s", response.destination())),
+            for (ProducerBatch batch : batches.values()) {
+                Errors e = response.wasTimedOut() ? Errors.REQUEST_TIMED_OUT : 
Errors.NETWORK_EXCEPTION;

Review Comment:
   I wonder if propagating the TimeoutException to the user space has any 
backward compatibility implications. I think that we already use 
TimeoutException when the batch expires so the user should already handle 
timeouts. That seems to be ok.



##########
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##########
@@ -1284,6 +1292,11 @@ public ClientResponse disconnected(long timeMs, 
AuthenticationException authenti
                     true, null, authenticationException, null);
         }
 
+        public ClientResponse timedOut(long timeMs, AuthenticationException 
authenticationException) {
+            return new ClientResponse(header, callback, destination, 
createdTimeMs, timeMs,
+                    true, true, null, authenticationException, null);

Review Comment:
   My understanding is that a timed out request is still considered 
disconnected as well. Am I correct? I think that this is important in order to 
not break compatibility. The NetworkClient is used in many other places.



##########
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##########
@@ -346,8 +346,16 @@ private void cancelInFlightRequests(String nodeId, long 
now, Collection<ClientRe
             }
 
             if (!request.isInternalRequest) {
-                if (responses != null)
-                    responses.add(request.disconnected(now, null));
+                if (responses != null) {
+                    ClientResponse disconnected;

Review Comment:
   nit: The variable name is not accurate anymore.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -552,9 +552,11 @@ private void handleProduceResponse(ClientResponse 
response, Map<TopicPartition,
         if (response.wasDisconnected()) {
             log.trace("Cancelled request with header {} due to node {} being 
disconnected",
                 requestHeader, response.destination());
-            for (ProducerBatch batch : batches.values())
-                completeBatch(batch, new 
ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION, 
String.format("Disconnected from node %s", response.destination())),
+            for (ProducerBatch batch : batches.values()) {
+                Errors e = response.wasTimedOut() ? Errors.REQUEST_TIMED_OUT : 
Errors.NETWORK_EXCEPTION;
+                completeBatch(batch, new ProduceResponse.PartitionResponse(e, 
String.format("Disconnected from node %s", response.destination())),

Review Comment:
   nit: Should we also change the message here when the request is timed out? I 
also wonder if we should update the trace log at L553 as well. Perhaps, we 
should just add a `if (response.wasTimeOut)` before `if 
(response.wasDisconnected())` to fully customise it.



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -2519,7 +2519,7 @@ public void testInflightBatchesExpireOnDeliveryTimeout() 
throws InterruptedExcep
 
         Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = 
new HashMap<>();
         responseMap.put(tp0, new 
ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L));
-        client.respond(new ProduceResponse(responseMap));
+        client.respond(new ProduceResponse(responseMap), true, true);

Review Comment:
   Changing this test does not seem right because it tests the delivery timeout 
of the batch, not the request timeout. Could we add another one?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to