chia7712 commented on a change in pull request #9715: URL: https://github.com/apache/kafka/pull/9715#discussion_r539362958
########## File path: core/src/main/scala/kafka/network/RequestChannel.scala ########## @@ -432,6 +432,43 @@ class RequestChannel(val queueSize: Int, } } + def sendResponse(request: RequestChannel.Request, + responseOpt: Option[AbstractResponse], + onComplete: Option[Send => Unit]): Unit = { + // Update error metrics for each error code in the response including Errors.NONE + responseOpt.foreach(response => updateErrorMetrics(request.header.apiKey, response.errorCounts.asScala)) + + val response = responseOpt match { + case Some(response) => + new RequestChannel.SendResponse( + request, + request.buildResponseSend(response), + request.responseString(response), + onComplete + ) + case None => + new RequestChannel.NoOpResponse(request) + } + + sendResponse(response) + } + + def sendErrorOrCloseConnection(request: RequestChannel.Request, error: Throwable, throttleMs: Int): Unit = { Review comment: just curious. Why this method is located in ```RequestChannel``` rather than ```ApiUtils``` ########## File path: core/src/main/scala/kafka/server/KafkaServer.scala ########## @@ -369,15 +369,17 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)) /* start processing requests */ - dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator, + dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, Review comment: Is this change still valid? ########## File path: core/src/main/scala/kafka/server/KafkaServer.scala ########## @@ -369,15 +369,17 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)) /* start processing requests */ - dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator, + dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, + replicaManager, adminManager, groupCoordinator, transactionCoordinator, kafkaController, forwardingManager, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers, fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache) dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time, config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix) socketServer.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel => - controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator, + controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, Review comment: ditto ########## File path: core/src/main/scala/kafka/server/KafkaRequestHandler.scala ########## @@ -34,6 +34,10 @@ trait ApiRequestHandler { def handle(request: RequestChannel.Request): Unit } +trait BaseApis extends ApiRequestHandler { Review comment: BTW, my thought was ```scala trait ApisUtils extends Logging { this: KafkaApis => ``` ########## File path: core/src/main/scala/kafka/server/KafkaRequestHandler.scala ########## @@ -34,6 +34,10 @@ trait ApiRequestHandler { def handle(request: RequestChannel.Request): Unit } +trait BaseApis extends ApiRequestHandler { Review comment: not sure whether ```BaseApis``` is required. ```KafkaApis``` can still extend ```ApiRequestHandler``` as you have added required variables to ```ApisUtils``` ```scala val requestChannel: RequestChannel val quotas: QuotaManagers val time: Time val authorizer: Option[Authorizer] ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org