hachikuji commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r510552124



##########
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##########
@@ -94,19 +104,63 @@ object RequestChannel extends Logging {
     @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
 
     val session = Session(context.principal, context.clientAddress)
+
     private val bodyAndSize: RequestAndSize = context.parseRequest(buffer)
 
     def header: RequestHeader = context.header
     def sizeOfBodyInBytes: Int = bodyAndSize.size
 
-    //most request types are parsed entirely into objects at this point. for 
those we can release the underlying buffer.
-    //some (like produce, or any time the schema contains fields of types 
BYTES or NULLABLE_BYTES) retain a reference
-    //to the buffer. for those requests we cannot release the buffer early, 
but only when request processing is done.
+    // most request types are parsed entirely into objects at this point. for 
those we can release the underlying buffer.
+    // some (like produce, or any time the schema contains fields of types 
BYTES or NULLABLE_BYTES) retain a reference
+    // to the buffer. for those requests we cannot release the buffer early, 
but only when request processing is done.
     if (!header.apiKey.requiresDelayedAllocation) {
       releaseBuffer()
     }
 
-    def requestDesc(details: Boolean): String = s"$header -- 
${loggableRequest.toString(details)}"
+    def buildResponse(abstractResponse: AbstractResponse,
+                      error: Errors): Send = {
+      envelopeContext match {
+        case Some(envelopeContext) =>
+          val envelopeResponse = new EnvelopeResponse(
+            abstractResponse.throttleTimeMs(),

Review comment:
       Quotas are one aspect of this work that need more consideration. What we 
don't want is for the inter-broker channel to get affected by the individual 
client throttle, which is what will happen with the current patch. What I'd 
suggest for now is that we allow the broker to track client quotas and pass 
back the throttle value in the underlying response, but we set the envelope 
throttle time to 0 and ensure that the inter-broker channel does not get 
throttled. 
   
   For this, I think we we will need to change the logic in 
`KafkaApis.sendResponseMaybeThrottle`. If it is a forwarded request, we still 
need to check `maybeRecordAndGetThrottleTimeMs`, but we can skip the call to 
`ClientQuotaManager.throttle`. When the response is received on the forwarding 
broker, we will need to apply the throttle, which I think the patch already 
handles.
   
   One challenging aspect is how this will affect quota metrics. Currently 
quota/throttling metrics are relatively simple because they are recorded 
separately by each broker. However, here the controller is the one that is 
tracking the throttling for the client across multiple inbound connections from 
multiple brokers. This means that the broker that is applying a throttle for a 
forwarded request may not have actually observed a quota violation. Other than 
causing some reporting confusion, I am not sure whether there are any other 
consequences to this.
   
   cc @apovzner @rajinisivaram 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to