hachikuji commented on a change in pull request #9564: URL: https://github.com/apache/kafka/pull/9564#discussion_r533024276
########## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala ########## @@ -35,7 +35,8 @@ import scala.jdk.CollectionConverters._ trait BrokerToControllerChannelManager { def sendRequest(request: AbstractRequest.Builder[_ <: AbstractRequest], - callback: RequestCompletionHandler): Unit + callback: BrokerToControllerRequestCompletionHandler, + requestTimeout: Long): Unit Review comment: Perhaps we could use a name like `retryTimeout` to distinguish this from the request timeout which only applies to individual requests. Alternatively we could let the caller provide the retry deadline explicitly. This would save the need for the extra `time.milliseconds` call. ########## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala ########## @@ -178,6 +191,10 @@ class BrokerToControllerRequestThread(networkClient: KafkaClient, } } + private def isTimedOut(request: BrokerToControllerQueueItem): Boolean = { Review comment: nit: `hasTimedOut`? ########## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala ########## @@ -125,15 +126,25 @@ class BrokerToControllerChannelManagerImpl(metadataCache: kafka.server.MetadataC } override def sendRequest(request: AbstractRequest.Builder[_ <: AbstractRequest], - callback: RequestCompletionHandler): Unit = { - requestQueue.put(BrokerToControllerQueueItem(request, callback)) + callback: BrokerToControllerRequestCompletionHandler, + requestTimeout: Long): Unit = { + requestQueue.put(BrokerToControllerQueueItem(request, callback, time.milliseconds() + requestTimeout)) Review comment: Won't this overflow with `requestTimeout` set to `Long.MaxValue`. Do we have any test cases? ########## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala ########## @@ -178,6 +191,10 @@ class BrokerToControllerRequestThread(networkClient: KafkaClient, } } + private def isTimedOut(request: BrokerToControllerQueueItem): Boolean = { + time.milliseconds() > request.deadlineMs Review comment: Maybe we can avoid this call to `time.milliseconds` and use `ClientResponse.receivedTimeMs`? ########## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala ########## @@ -165,7 +176,9 @@ class BrokerToControllerRequestThread(networkClient: KafkaClient, } private[server] def handleResponse(request: BrokerToControllerQueueItem)(response: ClientResponse): Unit = { - if (response.wasDisconnected()) { + if (isTimedOut(request)) { Review comment: We check for timeouts only after receiving a response. I guess this means that in the worst case, the total timeout would be request.timeout*2. This is probably not a big deal, but maybe worth documenting in a comment somewhere. ########## File path: core/src/main/scala/kafka/server/ForwardingManager.scala ########## @@ -44,26 +46,34 @@ class ForwardingManager(channelManager: BrokerToControllerChannelManager) extend request.context.clientAddress.getAddress ) - def onClientResponse(clientResponse: ClientResponse): Unit = { - val envelopeResponse = clientResponse.responseBody.asInstanceOf[EnvelopeResponse] - val envelopeError = envelopeResponse.error() - val requestBody = request.body[AbstractRequest] + class ForwardingResponseHandler extends BrokerToControllerRequestCompletionHandler { + override def onComplete(clientResponse: ClientResponse): Unit = { + val envelopeResponse = clientResponse.responseBody.asInstanceOf[EnvelopeResponse] + val envelopeError = envelopeResponse.error() + val requestBody = request.body[AbstractRequest] - val response = if (envelopeError != Errors.NONE) { - // An envelope error indicates broker misconfiguration (e.g. the principal serde - // might not be defined on the receiving broker). In this case, we do not return - // the error directly to the client since it would not be expected. Instead we - // return `UNKNOWN_SERVER_ERROR` so that the user knows that there is a problem - // on the broker. - debug(s"Forwarded request $request failed with an error in the envelope response $envelopeError") - requestBody.getErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception) - } else { - parseResponse(envelopeResponse.responseData, requestBody, request.header) + val response = if (envelopeError != Errors.NONE) { + // An envelope error indicates broker misconfiguration (e.g. the principal serde + // might not be defined on the receiving broker). In this case, we do not return + // the error directly to the client since it would not be expected. Instead we + // return `UNKNOWN_SERVER_ERROR` so that the user knows that there is a problem + // on the broker. + debug(s"Forwarded request $request failed with an error in the envelope response $envelopeError") + requestBody.getErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception) + } else { + parseResponse(envelopeResponse.responseData, requestBody, request.header) + } + responseCallback(response) + } + + override def onTimeout(): Unit = { + error(s"Forwarding of the request $request failed due to timeout exception") Review comment: I think this should be debug. Users will already have visibility into the error through the request log and the error metrics. There's probably a stronger case to increase the level for the unknown error case in `onComplete` above, but I'm fine letting them both be debug. ########## File path: core/src/main/scala/kafka/server/AlterIsrManager.scala ########## @@ -88,20 +88,33 @@ class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChanne private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = { val message = buildRequest(inflightAlterIsrItems) - def responseHandler(response: ClientResponse): Unit = { - try { - val body = response.responseBody().asInstanceOf[AlterIsrResponse] - handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems) - } finally { - // Be sure to clear the in-flight flag to allow future AlterIsr requests - if (!inflightRequest.compareAndSet(true, false)) { - throw new IllegalStateException("AlterIsr response callback called when no requests were in flight") + + def clearInflightRequests(): Unit = { + // Be sure to clear the in-flight flag to allow future AlterIsr requests + if (!inflightRequest.compareAndSet(true, false)) { + throw new IllegalStateException("AlterIsr response callback called when no requests were in flight") + } + } + + class AlterIsrResponseHandler extends BrokerToControllerRequestCompletionHandler { + override def onComplete(response: ClientResponse): Unit = { + try { + val body = response.responseBody().asInstanceOf[AlterIsrResponse] + handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems) + } finally { + clearInflightRequests() } } + + override def onTimeout(): Unit = { + warn(s"Encountered request when sending AlterIsr to the controller") Review comment: If we don't expect this, maybe we should just raise an exception. Alternatively if we are not going to fail, perhaps we should go ahead and invoke the callbacks. ########## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala ########## @@ -125,15 +126,25 @@ class BrokerToControllerChannelManagerImpl(metadataCache: kafka.server.MetadataC } override def sendRequest(request: AbstractRequest.Builder[_ <: AbstractRequest], - callback: RequestCompletionHandler): Unit = { - requestQueue.put(BrokerToControllerQueueItem(request, callback)) + callback: BrokerToControllerRequestCompletionHandler, + requestTimeout: Long): Unit = { + requestQueue.put(BrokerToControllerQueueItem(request, callback, time.milliseconds() + requestTimeout)) requestThread.wakeup() } } +abstract class BrokerToControllerRequestCompletionHandler extends RequestCompletionHandler { + + /** + * Fire when the request transmission hits a fatal exception. Review comment: This comment needs to be updated. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org