hachikuji commented on a change in pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#discussion_r533024276



##########
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##########
@@ -35,7 +35,8 @@ import scala.jdk.CollectionConverters._
 
 trait BrokerToControllerChannelManager {
   def sendRequest(request: AbstractRequest.Builder[_ <: AbstractRequest],
-                  callback: RequestCompletionHandler): Unit
+                  callback: BrokerToControllerRequestCompletionHandler,
+                  requestTimeout: Long): Unit

Review comment:
       Perhaps we could use a name like `retryTimeout` to distinguish this from 
the request timeout which only applies to individual requests. Alternatively we 
could let the caller provide the retry deadline explicitly. This would save the 
need for the extra `time.milliseconds` call.

##########
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##########
@@ -178,6 +191,10 @@ class BrokerToControllerRequestThread(networkClient: 
KafkaClient,
     }
   }
 
+  private def isTimedOut(request: BrokerToControllerQueueItem): Boolean = {

Review comment:
       nit: `hasTimedOut`?

##########
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##########
@@ -125,15 +126,25 @@ class BrokerToControllerChannelManagerImpl(metadataCache: 
kafka.server.MetadataC
   }
 
   override def sendRequest(request: AbstractRequest.Builder[_ <: 
AbstractRequest],
-                           callback: RequestCompletionHandler): Unit = {
-    requestQueue.put(BrokerToControllerQueueItem(request, callback))
+                           callback: 
BrokerToControllerRequestCompletionHandler,
+                           requestTimeout: Long): Unit = {
+    requestQueue.put(BrokerToControllerQueueItem(request, callback, 
time.milliseconds() + requestTimeout))

Review comment:
       Won't this overflow with `requestTimeout` set to `Long.MaxValue`. Do we 
have any test cases?

##########
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##########
@@ -178,6 +191,10 @@ class BrokerToControllerRequestThread(networkClient: 
KafkaClient,
     }
   }
 
+  private def isTimedOut(request: BrokerToControllerQueueItem): Boolean = {
+    time.milliseconds() > request.deadlineMs

Review comment:
       Maybe we can avoid this call to `time.milliseconds` and use 
`ClientResponse.receivedTimeMs`?

##########
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##########
@@ -165,7 +176,9 @@ class BrokerToControllerRequestThread(networkClient: 
KafkaClient,
   }
 
   private[server] def handleResponse(request: 
BrokerToControllerQueueItem)(response: ClientResponse): Unit = {
-    if (response.wasDisconnected()) {
+    if (isTimedOut(request)) {

Review comment:
       We check for timeouts only after receiving a response. I guess this 
means that in the worst case, the total timeout would be request.timeout*2. 
This is probably not a big deal, but maybe worth documenting in a comment 
somewhere. 

##########
File path: core/src/main/scala/kafka/server/ForwardingManager.scala
##########
@@ -44,26 +46,34 @@ class ForwardingManager(channelManager: 
BrokerToControllerChannelManager) extend
       request.context.clientAddress.getAddress
     )
 
-    def onClientResponse(clientResponse: ClientResponse): Unit = {
-      val envelopeResponse = 
clientResponse.responseBody.asInstanceOf[EnvelopeResponse]
-      val envelopeError = envelopeResponse.error()
-      val requestBody = request.body[AbstractRequest]
+    class ForwardingResponseHandler extends 
BrokerToControllerRequestCompletionHandler {
+      override def onComplete(clientResponse: ClientResponse): Unit = {
+        val envelopeResponse = 
clientResponse.responseBody.asInstanceOf[EnvelopeResponse]
+        val envelopeError = envelopeResponse.error()
+        val requestBody = request.body[AbstractRequest]
 
-      val response = if (envelopeError != Errors.NONE) {
-        // An envelope error indicates broker misconfiguration (e.g. the 
principal serde
-        // might not be defined on the receiving broker). In this case, we do 
not return
-        // the error directly to the client since it would not be expected. 
Instead we
-        // return `UNKNOWN_SERVER_ERROR` so that the user knows that there is 
a problem
-        // on the broker.
-        debug(s"Forwarded request $request failed with an error in the 
envelope response $envelopeError")
-        requestBody.getErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception)
-      } else {
-        parseResponse(envelopeResponse.responseData, requestBody, 
request.header)
+        val response = if (envelopeError != Errors.NONE) {
+          // An envelope error indicates broker misconfiguration (e.g. the 
principal serde
+          // might not be defined on the receiving broker). In this case, we 
do not return
+          // the error directly to the client since it would not be expected. 
Instead we
+          // return `UNKNOWN_SERVER_ERROR` so that the user knows that there 
is a problem
+          // on the broker.
+          debug(s"Forwarded request $request failed with an error in the 
envelope response $envelopeError")
+          requestBody.getErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception)
+        } else {
+          parseResponse(envelopeResponse.responseData, requestBody, 
request.header)
+        }
+        responseCallback(response)
+      }
+
+      override def onTimeout(): Unit = {
+        error(s"Forwarding of the request $request failed due to timeout 
exception")

Review comment:
       I think this should be debug. Users will already have visibility into 
the error through the request log and the error metrics. There's probably a 
stronger case to increase the level for the unknown error case in `onComplete` 
above, but I'm fine letting them both be debug.

##########
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##########
@@ -88,20 +88,33 @@ class AlterIsrManagerImpl(val controllerChannelManager: 
BrokerToControllerChanne
 
   private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
     val message = buildRequest(inflightAlterIsrItems)
-    def responseHandler(response: ClientResponse): Unit = {
-      try {
-        val body = response.responseBody().asInstanceOf[AlterIsrResponse]
-        handleAlterIsrResponse(body, message.brokerEpoch(), 
inflightAlterIsrItems)
-      } finally {
-        // Be sure to clear the in-flight flag to allow future AlterIsr 
requests
-        if (!inflightRequest.compareAndSet(true, false)) {
-          throw new IllegalStateException("AlterIsr response callback called 
when no requests were in flight")
+
+    def clearInflightRequests(): Unit = {
+      // Be sure to clear the in-flight flag to allow future AlterIsr requests
+      if (!inflightRequest.compareAndSet(true, false)) {
+        throw new IllegalStateException("AlterIsr response callback called 
when no requests were in flight")
+      }
+    }
+
+    class AlterIsrResponseHandler extends 
BrokerToControllerRequestCompletionHandler {
+      override def onComplete(response: ClientResponse): Unit = {
+        try {
+          val body = response.responseBody().asInstanceOf[AlterIsrResponse]
+          handleAlterIsrResponse(body, message.brokerEpoch(), 
inflightAlterIsrItems)
+        } finally {
+          clearInflightRequests()
         }
       }
+
+      override def onTimeout(): Unit = {
+        warn(s"Encountered request when sending AlterIsr to the controller")

Review comment:
       If we don't expect this, maybe we should just raise an exception. 
Alternatively if we are not going to fail, perhaps we should go ahead and 
invoke the callbacks.

##########
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##########
@@ -125,15 +126,25 @@ class BrokerToControllerChannelManagerImpl(metadataCache: 
kafka.server.MetadataC
   }
 
   override def sendRequest(request: AbstractRequest.Builder[_ <: 
AbstractRequest],
-                           callback: RequestCompletionHandler): Unit = {
-    requestQueue.put(BrokerToControllerQueueItem(request, callback))
+                           callback: 
BrokerToControllerRequestCompletionHandler,
+                           requestTimeout: Long): Unit = {
+    requestQueue.put(BrokerToControllerQueueItem(request, callback, 
time.milliseconds() + requestTimeout))
     requestThread.wakeup()
   }
 
 }
 
+abstract class BrokerToControllerRequestCompletionHandler extends 
RequestCompletionHandler {
+
+  /**
+   * Fire when the request transmission hits a fatal exception.

Review comment:
       This comment needs to be updated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to