abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r513759802



##########
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##########
@@ -94,19 +104,63 @@ object RequestChannel extends Logging {
     @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
 
     val session = Session(context.principal, context.clientAddress)
+
     private val bodyAndSize: RequestAndSize = context.parseRequest(buffer)
 
     def header: RequestHeader = context.header
     def sizeOfBodyInBytes: Int = bodyAndSize.size
 
-    //most request types are parsed entirely into objects at this point. for 
those we can release the underlying buffer.
-    //some (like produce, or any time the schema contains fields of types 
BYTES or NULLABLE_BYTES) retain a reference
-    //to the buffer. for those requests we cannot release the buffer early, 
but only when request processing is done.
+    // most request types are parsed entirely into objects at this point. for 
those we can release the underlying buffer.
+    // some (like produce, or any time the schema contains fields of types 
BYTES or NULLABLE_BYTES) retain a reference
+    // to the buffer. for those requests we cannot release the buffer early, 
but only when request processing is done.
     if (!header.apiKey.requiresDelayedAllocation) {
       releaseBuffer()
     }
 
-    def requestDesc(details: Boolean): String = s"$header -- 
${loggableRequest.toString(details)}"
+    def buildResponse(abstractResponse: AbstractResponse,
+                      error: Errors): Send = {
+      envelopeContext match {
+        case Some(envelopeContext) =>
+          val envelopeResponse = new EnvelopeResponse(
+            abstractResponse.throttleTimeMs(),
+            abstractResponse.serializeBody(context.header.apiVersion),
+            error
+          )
+
+          envelopeContext.brokerContext.buildResponse(envelopeResponse)
+        case None =>
+          context.buildResponse(abstractResponse)
+      }
+    }
+
+    def responseString(response: AbstractResponse): Option[String] = {
+      if (RequestChannel.isRequestLoggingEnabled)
+        Some(envelopeContext match {
+          case Some(envelopeContext) =>
+            response.toString(envelopeContext.brokerContext.apiVersion)

Review comment:
       Sg, but I guess we need to keep it as is for now to try using the 
correct api version.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to