abbccdda commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r513759802
########## File path: core/src/main/scala/kafka/network/RequestChannel.scala ########## @@ -94,19 +104,63 @@ object RequestChannel extends Logging { @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None val session = Session(context.principal, context.clientAddress) + private val bodyAndSize: RequestAndSize = context.parseRequest(buffer) def header: RequestHeader = context.header def sizeOfBodyInBytes: Int = bodyAndSize.size - //most request types are parsed entirely into objects at this point. for those we can release the underlying buffer. - //some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference - //to the buffer. for those requests we cannot release the buffer early, but only when request processing is done. + // most request types are parsed entirely into objects at this point. for those we can release the underlying buffer. + // some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference + // to the buffer. for those requests we cannot release the buffer early, but only when request processing is done. if (!header.apiKey.requiresDelayedAllocation) { releaseBuffer() } - def requestDesc(details: Boolean): String = s"$header -- ${loggableRequest.toString(details)}" + def buildResponse(abstractResponse: AbstractResponse, + error: Errors): Send = { + envelopeContext match { + case Some(envelopeContext) => + val envelopeResponse = new EnvelopeResponse( + abstractResponse.throttleTimeMs(), + abstractResponse.serializeBody(context.header.apiVersion), + error + ) + + envelopeContext.brokerContext.buildResponse(envelopeResponse) + case None => + context.buildResponse(abstractResponse) + } + } + + def responseString(response: AbstractResponse): Option[String] = { + if (RequestChannel.isRequestLoggingEnabled) + Some(envelopeContext match { + case Some(envelopeContext) => + response.toString(envelopeContext.brokerContext.apiVersion) Review comment: Sg, but I guess we need to keep it as is for now to try using the correct api version. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org