chia7712 commented on a change in pull request #9715:
URL: https://github.com/apache/kafka/pull/9715#discussion_r539362958



##########
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##########
@@ -432,6 +432,43 @@ class RequestChannel(val queueSize: Int,
     }
   }
 
+  def sendResponse(request: RequestChannel.Request,
+                   responseOpt: Option[AbstractResponse],
+                   onComplete: Option[Send => Unit]): Unit = {
+    // Update error metrics for each error code in the response including 
Errors.NONE
+    responseOpt.foreach(response => updateErrorMetrics(request.header.apiKey, 
response.errorCounts.asScala))
+
+    val response = responseOpt match {
+      case Some(response) =>
+        new RequestChannel.SendResponse(
+          request,
+          request.buildResponseSend(response),
+          request.responseString(response),
+          onComplete
+        )
+      case None =>
+        new RequestChannel.NoOpResponse(request)
+    }
+
+    sendResponse(response)
+  }
+
+  def sendErrorOrCloseConnection(request: RequestChannel.Request, error: 
Throwable, throttleMs: Int): Unit = {

Review comment:
       just curious. Why this method is located in ```RequestChannel``` rather 
than ```ApiUtils```

##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -369,15 +369,17 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
             KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
 
         /* start processing requests */
-        dataPlaneRequestProcessor = new 
KafkaApis(socketServer.dataPlaneRequestChannel, replicaManager, adminManager, 
groupCoordinator, transactionCoordinator,
+        dataPlaneRequestProcessor = new 
KafkaApis(socketServer.dataPlaneRequestChannel,

Review comment:
       Is this change still valid?

##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -369,15 +369,17 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
             KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
 
         /* start processing requests */
-        dataPlaneRequestProcessor = new 
KafkaApis(socketServer.dataPlaneRequestChannel, replicaManager, adminManager, 
groupCoordinator, transactionCoordinator,
+        dataPlaneRequestProcessor = new 
KafkaApis(socketServer.dataPlaneRequestChannel,
+          replicaManager, adminManager, groupCoordinator, 
transactionCoordinator,
           kafkaController, forwardingManager, zkClient, config.brokerId, 
config, metadataCache, metrics, authorizer, quotaManagers,
           fetchManager, brokerTopicStats, clusterId, time, tokenManager, 
brokerFeatures, featureCache)
 
         dataPlaneRequestHandlerPool = new 
KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, 
dataPlaneRequestProcessor, time,
           config.numIoThreads, 
s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", 
SocketServer.DataPlaneThreadPrefix)
 
         socketServer.controlPlaneRequestChannelOpt.foreach { 
controlPlaneRequestChannel =>
-          controlPlaneRequestProcessor = new 
KafkaApis(controlPlaneRequestChannel, replicaManager, adminManager, 
groupCoordinator, transactionCoordinator,
+          controlPlaneRequestProcessor = new 
KafkaApis(controlPlaneRequestChannel,

Review comment:
       ditto

##########
File path: core/src/main/scala/kafka/server/KafkaRequestHandler.scala
##########
@@ -34,6 +34,10 @@ trait ApiRequestHandler {
   def handle(request: RequestChannel.Request): Unit
 }
 
+trait BaseApis extends ApiRequestHandler {

Review comment:
       BTW, my thought was 
   
   ```scala
   trait ApisUtils extends Logging {
     this: KafkaApis =>
   ```

##########
File path: core/src/main/scala/kafka/server/KafkaRequestHandler.scala
##########
@@ -34,6 +34,10 @@ trait ApiRequestHandler {
   def handle(request: RequestChannel.Request): Unit
 }
 
+trait BaseApis extends ApiRequestHandler {

Review comment:
       not sure whether ```BaseApis```  is required. ```KafkaApis``` can still 
extend ```ApiRequestHandler``` as you have added required variables to 
```ApisUtils```
   ```scala
     val requestChannel: RequestChannel
     val quotas: QuotaManagers
     val time: Time
     val authorizer: Option[Authorizer]
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to