hachikuji commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r510552124
########## File path: core/src/main/scala/kafka/network/RequestChannel.scala ########## @@ -94,19 +104,63 @@ object RequestChannel extends Logging { @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None val session = Session(context.principal, context.clientAddress) + private val bodyAndSize: RequestAndSize = context.parseRequest(buffer) def header: RequestHeader = context.header def sizeOfBodyInBytes: Int = bodyAndSize.size - //most request types are parsed entirely into objects at this point. for those we can release the underlying buffer. - //some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference - //to the buffer. for those requests we cannot release the buffer early, but only when request processing is done. + // most request types are parsed entirely into objects at this point. for those we can release the underlying buffer. + // some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference + // to the buffer. for those requests we cannot release the buffer early, but only when request processing is done. if (!header.apiKey.requiresDelayedAllocation) { releaseBuffer() } - def requestDesc(details: Boolean): String = s"$header -- ${loggableRequest.toString(details)}" + def buildResponse(abstractResponse: AbstractResponse, + error: Errors): Send = { + envelopeContext match { + case Some(envelopeContext) => + val envelopeResponse = new EnvelopeResponse( + abstractResponse.throttleTimeMs(), Review comment: Quotas are one aspect of this work that need more consideration. What we don't want is for the inter-broker channel to get affected by the individual client throttle, which is what will happen with the current patch. What I'd suggest for now is that we allow the broker to track client quotas and pass back the throttle value in the underlying response, but we set the envelope throttle time to 0 and ensure that the inter-broker channel does not get throttled. For this, I think we we will need to change the logic in `KafkaApis.sendResponseMaybeThrottle`. If it is a forwarded request, we still need to check `maybeRecordAndGetThrottleTimeMs`, but we can skip the call to `ClientQuotaManager.throttle`. When the response is received on the forwarding broker, we will need to apply the throttle, which I think the patch already handles. One challenging aspect is how this will affect quota metrics. Currently quota/throttling metrics are relatively simple because they are recorded separately by each broker. However, here the controller is the one that is tracking the throttling for the client across multiple inbound connections from multiple brokers. This means that the broker that is applying a throttle for a forwarded request may not have actually observed a quota violation. Other than causing some reporting confusion, I am not sure whether there are any other consequences to this. cc @apovzner @rajinisivaram ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org