dajac commented on a change in pull request #8940:
URL: https://github.com/apache/kafka/pull/8940#discussion_r447557312



##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -354,13 +401,62 @@ class KafkaApisTest {
       .setValue("bar"))
     requestData.resources.add(alterResource)
 
-    val request = buildRequest(new 
IncrementalAlterConfigsRequest.Builder(requestData)
-      .build(requestHeader.apiVersion))
+    val incrementalAlterConfigsRequest = new 
IncrementalAlterConfigsRequest.Builder(requestData)
+      .build(requestHeader.apiVersion)
+    val request = buildRequest(incrementalAlterConfigsRequest)
     createKafkaApis(authorizer = 
Some(authorizer)).handleIncrementalAlterConfigsRequest(request)
 
+    val response = readResponse(ApiKeys.INCREMENTAL_ALTER_CONFIGS, 
incrementalAlterConfigsRequest, capturedResponse)
+      .asInstanceOf[IncrementalAlterConfigsResponse]
+
+    val responseMap = response.data.responses().asScala.map { resourceResponse 
=>
+      resourceResponse.resourceName() -> 
Errors.forCode(resourceResponse.errorCode)
+    }.toMap
+    assertEquals(Map(resourceName -> Errors.NONE), responseMap)
+
     verify(authorizer, adminManager)
   }
 
+  @Test
+  def testIncrementalAlterConfigsWithNonController(): Unit = {
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+    val resourceName = "topic-1"
+    val requestHeader = new RequestHeader(ApiKeys.ALTER_CONFIGS, 
ApiKeys.ALTER_CONFIGS.latestVersion,

Review comment:
       `INCREMENTAL_ALTER_CONFIGS` should be used here.

##########
File path: clients/src/main/resources/common/message/AlterConfigsRequest.json
##########
@@ -18,7 +18,9 @@
   "type": "request",
   "name": "AlterConfigsRequest",
   // Version 1 is the same as version 0.
-  "validVersions": "0-1",
+  //
+  // Version 2 will always route to the controller for topic resources change.

Review comment:
       * This is not entirely true, isn't it? Topic resource change goes to the 
controller if `shouldValidateOnly` is false and goes to list loaded node 
otherwise. We should also explain how the broker resource is handled starting 
from v2.
   * I suggest to add the KIP number here and in all other schemas.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -2103,31 +2097,36 @@ void handleFailure(Throwable throwable) {
 
     @Override
     public AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, 
Collection<AlterConfigOp>> configs,
-                                                                 final 
AlterConfigsOptions options) {
+                                                      final 
AlterConfigsOptions options) {

Review comment:
       The code in `incrementalAlterConfigs` is almost identical to the code in 
`alterConfigs`. I wonder if we could share more of the logic between the two. 
Have you tried already?

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -292,30 +290,77 @@ class KafkaApisTest {
         1, true, true)
     )
 
+    EasyMock.expect(controller.isActive).andReturn(true)
+
     // Verify that authorize is only called once
     EasyMock.expect(authorizer.authorize(anyObject[RequestContext], 
EasyMock.eq(expectedActions.asJava)))
       .andReturn(Seq(AuthorizationResult.ALLOWED).asJava)
       .once()
 
-    expectNoThrottling()
+    val capturedResponse = expectNoThrottling()
 
     val configResource = new ConfigResource(ConfigResource.Type.TOPIC, 
resourceName)
     EasyMock.expect(adminManager.alterConfigs(anyObject(), EasyMock.eq(false)))
       .andReturn(Map(configResource -> ApiError.NONE))

Review comment:
       This is not related to the PR but while we are here, could we replace 
`anyObject()` with the correct expected value?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2561,23 +2580,41 @@ class KafkaApis(val requestChannel: RequestChannel,
       }.toBuffer
     }.toMap
 
-    val (authorizedResources, unauthorizedResources) = configs.partition { 
case (resource, _) =>
-      resource.`type` match {
-        case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER =>
-          authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)
-        case ConfigResource.Type.TOPIC =>
-          authorize(request.context, ALTER_CONFIGS, TOPIC, resource.name)
-        case rt => throw new InvalidRequestException(s"Unexpected resource 
type $rt")
+    def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = {
+      def responseCallback(requestThrottleMs: Int): 
IncrementalAlterConfigsResponse = {
+        new 
IncrementalAlterConfigsResponse(IncrementalAlterConfigsResponse.toResponseData(requestThrottleMs,
 results.asJava))
       }
+      sendResponseMaybeThrottle(request, responseCallback)
     }
 
-    val authorizedResult = 
adminManager.incrementalAlterConfigs(authorizedResources, 
alterConfigsRequest.data.validateOnly)
-    val unauthorizedResult = unauthorizedResources.keys.map { resource =>
-      resource -> configsAuthorizationApiError(resource)
+    if (incrementalAlterConfigsRequest.version >= 2
+      && !incrementalAlterConfigsRequest.data.validateOnly
+      && !controller.isActive) {
+      val requireControllerResult = configs.keys.map {
+        resource => resource -> new ApiError(Errors.NOT_CONTROLLER, null)
+      }.toMap
+
+      sendResponseCallback(requireControllerResult)
+    } else {
+

Review comment:
       nit: I would remove this empty line.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2425,34 +2425,54 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleAlterConfigsRequest(request: RequestChannel.Request): Unit = {
     val alterConfigsRequest = request.body[AlterConfigsRequest]
-    val (authorizedResources, unauthorizedResources) = 
alterConfigsRequest.configs.asScala.toMap.partition { case (resource, _) =>
-      resource.`type` match {
-        case ConfigResource.Type.BROKER_LOGGER =>
-          throw new InvalidRequestException(s"AlterConfigs is deprecated and 
does not support the resource type ${ConfigResource.Type.BROKER_LOGGER}")
-        case ConfigResource.Type.BROKER =>
-          authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)
-        case ConfigResource.Type.TOPIC =>
-          authorize(request.context, ALTER_CONFIGS, TOPIC, resource.name)
-        case rt => throw new InvalidRequestException(s"Unexpected resource 
type $rt")
+    val requestResources = alterConfigsRequest.configs.asScala.toMap
+
+    def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = {
+      def responseCallback(requestThrottleMs: Int): AlterConfigsResponse = {
+        val data = new AlterConfigsResponseData()
+          .setThrottleTimeMs(requestThrottleMs)
+        results.foreach{ case (resource, error) =>

Review comment:
       nit: space before `{`.

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -292,30 +290,77 @@ class KafkaApisTest {
         1, true, true)
     )
 
+    EasyMock.expect(controller.isActive).andReturn(true)
+
     // Verify that authorize is only called once
     EasyMock.expect(authorizer.authorize(anyObject[RequestContext], 
EasyMock.eq(expectedActions.asJava)))
       .andReturn(Seq(AuthorizationResult.ALLOWED).asJava)
       .once()
 
-    expectNoThrottling()
+    val capturedResponse = expectNoThrottling()
 
     val configResource = new ConfigResource(ConfigResource.Type.TOPIC, 
resourceName)
     EasyMock.expect(adminManager.alterConfigs(anyObject(), EasyMock.eq(false)))
       .andReturn(Map(configResource -> ApiError.NONE))
 
     EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, 
authorizer,
-      adminManager)
+      adminManager, controller)
 
     val configs = Map(
       configResource -> new AlterConfigsRequest.Config(
         Seq(new AlterConfigsRequest.ConfigEntry("foo", "bar")).asJava))
-    val request = buildRequest(new AlterConfigsRequest.Builder(configs.asJava, 
false)
-      .build(requestHeader.apiVersion))
+    val alterConfigsRequest = new AlterConfigsRequest.Builder(configs.asJava, 
false)
+      .build(requestHeader.apiVersion)
+    val request = buildRequest(alterConfigsRequest)
+
     createKafkaApis(authorizer = 
Some(authorizer)).handleAlterConfigsRequest(request)
 
+    val response = readResponse(ApiKeys.ALTER_CONFIGS, alterConfigsRequest, 
capturedResponse)
+      .asInstanceOf[AlterConfigsResponse]
+
+    val responseMap = response.data.responses().asScala.map { resourceResponse 
=>
+      resourceResponse.resourceName() -> 
Errors.forCode(resourceResponse.errorCode)
+    }.toMap
+    assertEquals(Map(resourceName -> Errors.NONE), responseMap)
+
     verify(authorizer, adminManager)
   }
 
+  @Test
+  def testAlterConfigsWithNonController(): Unit = {

Review comment:
       It would be great if we could verify all versions to ensure that older 
versions succeed as expected.

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -336,14 +381,16 @@ class KafkaApisTest {
       .andReturn(Seq(AuthorizationResult.ALLOWED).asJava)
       .once()
 
-    expectNoThrottling()
+    val capturedResponse = expectNoThrottling()
+
+    EasyMock.expect(controller.isActive).andReturn(true)
 
     val configResource = new ConfigResource(ConfigResource.Type.TOPIC, 
resourceName)
     EasyMock.expect(adminManager.incrementalAlterConfigs(anyObject(), 
EasyMock.eq(false)))
       .andReturn(Map(configResource -> ApiError.NONE))

Review comment:
       This is not related to the PR but while we are here, could we replace 
`anyObject()` with the correct expected value?

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -2416,10 +2424,22 @@ public void testIncrementalAlterConfigs()  throws 
Exception {
                     .setErrorCode(Errors.INVALID_REQUEST.code())
                     .setErrorMessage("Config value append is not allowed for 
config"));
 
+            responseData.responses().add(new AlterConfigsResourceResponse()
+                    .setResourceName("topic2")
+                    .setResourceType(ConfigResource.Type.TOPIC.id())
+                    .setErrorCode(Errors.NOT_CONTROLLER.code())
+                    .setErrorMessage("Topic resource change must be sent to 
the controller"));
+
             env.kafkaClient().prepareResponse(new 
IncrementalAlterConfigsResponse(responseData));
 
+            // Upon handling NOT_CONTROLLER exception, admin client needs to 
refresh the metadata.
+            MetadataResponse controllerNodeResponse = 
MetadataResponse.prepareResponse(env.cluster().nodes(),
+                env.cluster().clusterResource().clusterId(), 1, 
Collections.emptyList());
+            env.kafkaClient().prepareResponse(controllerNodeResponse);

Review comment:
       It may be worth using `prepareResponseFrom` in the test cases to verify 
that requests go to the expected brokers. I would also extend the test cases to 
cover the logic which create one or multiple requests based on wether 
`shouldValidateOnly` is set.

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -354,13 +401,62 @@ class KafkaApisTest {
       .setValue("bar"))
     requestData.resources.add(alterResource)
 
-    val request = buildRequest(new 
IncrementalAlterConfigsRequest.Builder(requestData)
-      .build(requestHeader.apiVersion))
+    val incrementalAlterConfigsRequest = new 
IncrementalAlterConfigsRequest.Builder(requestData)
+      .build(requestHeader.apiVersion)
+    val request = buildRequest(incrementalAlterConfigsRequest)
     createKafkaApis(authorizer = 
Some(authorizer)).handleIncrementalAlterConfigsRequest(request)
 
+    val response = readResponse(ApiKeys.INCREMENTAL_ALTER_CONFIGS, 
incrementalAlterConfigsRequest, capturedResponse)
+      .asInstanceOf[IncrementalAlterConfigsResponse]
+
+    val responseMap = response.data.responses().asScala.map { resourceResponse 
=>
+      resourceResponse.resourceName() -> 
Errors.forCode(resourceResponse.errorCode)
+    }.toMap
+    assertEquals(Map(resourceName -> Errors.NONE), responseMap)
+
     verify(authorizer, adminManager)
   }
 
+  @Test
+  def testIncrementalAlterConfigsWithNonController(): Unit = {

Review comment:
       It would be great if we could verify all versions to ensure that older 
versions succeed as expected.

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -292,30 +290,77 @@ class KafkaApisTest {
         1, true, true)
     )
 
+    EasyMock.expect(controller.isActive).andReturn(true)
+
     // Verify that authorize is only called once
     EasyMock.expect(authorizer.authorize(anyObject[RequestContext], 
EasyMock.eq(expectedActions.asJava)))
       .andReturn(Seq(AuthorizationResult.ALLOWED).asJava)
       .once()
 
-    expectNoThrottling()
+    val capturedResponse = expectNoThrottling()
 
     val configResource = new ConfigResource(ConfigResource.Type.TOPIC, 
resourceName)
     EasyMock.expect(adminManager.alterConfigs(anyObject(), EasyMock.eq(false)))
       .andReturn(Map(configResource -> ApiError.NONE))
 
     EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, 
authorizer,
-      adminManager)
+      adminManager, controller)
 
     val configs = Map(
       configResource -> new AlterConfigsRequest.Config(
         Seq(new AlterConfigsRequest.ConfigEntry("foo", "bar")).asJava))
-    val request = buildRequest(new AlterConfigsRequest.Builder(configs.asJava, 
false)
-      .build(requestHeader.apiVersion))
+    val alterConfigsRequest = new AlterConfigsRequest.Builder(configs.asJava, 
false)
+      .build(requestHeader.apiVersion)
+    val request = buildRequest(alterConfigsRequest)
+
     createKafkaApis(authorizer = 
Some(authorizer)).handleAlterConfigsRequest(request)
 
+    val response = readResponse(ApiKeys.ALTER_CONFIGS, alterConfigsRequest, 
capturedResponse)
+      .asInstanceOf[AlterConfigsResponse]
+
+    val responseMap = response.data.responses().asScala.map { resourceResponse 
=>
+      resourceResponse.resourceName() -> 
Errors.forCode(resourceResponse.errorCode)
+    }.toMap
+    assertEquals(Map(resourceName -> Errors.NONE), responseMap)
+
     verify(authorizer, adminManager)
   }
 
+  @Test
+  def testAlterConfigsWithNonController(): Unit = {
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+    val resourceName = "topic-1"
+    val requestHeader = new RequestHeader(ApiKeys.ALTER_CONFIGS, 
ApiKeys.ALTER_CONFIGS.latestVersion,
+      clientId, 0)
+
+    EasyMock.expect(controller.isActive).andReturn(false)
+
+    val capturedResponse = expectNoThrottling()
+
+    val configResource = new ConfigResource(ConfigResource.Type.TOPIC, 
resourceName)
+
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, 
authorizer, controller)
+
+    val configs = Map(
+      configResource -> new AlterConfigsRequest.Config(
+        Seq(new AlterConfigsRequest.ConfigEntry("foo", "bar")).asJava))
+
+    val alterConfigsRequest = new AlterConfigsRequest.Builder(configs.asJava, 
false)
+      .build(requestHeader.apiVersion)
+    val request = buildRequest(alterConfigsRequest)
+
+    createKafkaApis(authorizer = 
Some(authorizer)).handleAlterConfigsRequest(request)
+
+    val response = readResponse(ApiKeys.ALTER_CONFIGS, alterConfigsRequest, 
capturedResponse)
+      .asInstanceOf[AlterConfigsResponse]
+
+    val responseMap = response.data.responses().asScala.map { resourceResponse 
=>
+      resourceResponse.resourceName() -> 
Errors.forCode(resourceResponse.errorCode)
+    }.toMap
+    assertEquals(Map(resourceName -> Errors.NOT_CONTROLLER), responseMap)
+  }

Review comment:
       I suggest to verify the mocks with `verify` to ensure that only the 
calls that you expected were made.

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -354,13 +401,62 @@ class KafkaApisTest {
       .setValue("bar"))
     requestData.resources.add(alterResource)
 
-    val request = buildRequest(new 
IncrementalAlterConfigsRequest.Builder(requestData)
-      .build(requestHeader.apiVersion))
+    val incrementalAlterConfigsRequest = new 
IncrementalAlterConfigsRequest.Builder(requestData)
+      .build(requestHeader.apiVersion)
+    val request = buildRequest(incrementalAlterConfigsRequest)
     createKafkaApis(authorizer = 
Some(authorizer)).handleIncrementalAlterConfigsRequest(request)
 
+    val response = readResponse(ApiKeys.INCREMENTAL_ALTER_CONFIGS, 
incrementalAlterConfigsRequest, capturedResponse)
+      .asInstanceOf[IncrementalAlterConfigsResponse]
+
+    val responseMap = response.data.responses().asScala.map { resourceResponse 
=>
+      resourceResponse.resourceName() -> 
Errors.forCode(resourceResponse.errorCode)
+    }.toMap
+    assertEquals(Map(resourceName -> Errors.NONE), responseMap)
+
     verify(authorizer, adminManager)
   }
 
+  @Test
+  def testIncrementalAlterConfigsWithNonController(): Unit = {
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+    val resourceName = "topic-1"
+    val requestHeader = new RequestHeader(ApiKeys.ALTER_CONFIGS, 
ApiKeys.ALTER_CONFIGS.latestVersion,
+      clientId, 0)
+
+    EasyMock.expect(controller.isActive).andReturn(false)
+
+    val capturedResponse = expectNoThrottling()
+
+    val configResource = new ConfigResource(ConfigResource.Type.TOPIC, 
resourceName)
+
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, 
authorizer, controller)
+
+    val requestData = new IncrementalAlterConfigsRequestData()
+    val alterResource = new 
IncrementalAlterConfigsRequestData.AlterConfigsResource()
+      .setResourceName(configResource.name)
+      .setResourceType(configResource.`type`.id)
+    alterResource.configs.add(new AlterableConfig()
+      .setName("foo")
+      .setValue("bar"))
+    requestData.resources.add(alterResource)
+
+    val incrementalAlterConfigsRequest = new 
IncrementalAlterConfigsRequest.Builder(requestData)
+      .build(requestHeader.apiVersion)
+    val request = buildRequest(incrementalAlterConfigsRequest)
+
+    createKafkaApis(authorizer = 
Some(authorizer)).handleIncrementalAlterConfigsRequest(request)
+
+    val response = readResponse(ApiKeys.INCREMENTAL_ALTER_CONFIGS, 
incrementalAlterConfigsRequest, capturedResponse)
+      .asInstanceOf[IncrementalAlterConfigsResponse]
+
+    val responseMap = response.data.responses().asScala.map { resourceResponse 
=>
+      resourceResponse.resourceName() -> 
Errors.forCode(resourceResponse.errorCode)
+    }.toMap
+    assertEquals(Map(resourceName -> Errors.NOT_CONTROLLER), responseMap)
+  }

Review comment:
       Same comment regarding verifying the mocks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to