chia7712 commented on a change in pull request #10505:
URL: https://github.com/apache/kafka/pull/10505#discussion_r612981625



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -242,87 +253,99 @@ class ControllerApis(val requestChannel: RequestChannel,
     val toAuthenticate = new util.HashSet[String]
     toAuthenticate.addAll(providedNames)
     val idToName = new util.HashMap[Uuid, String]
-    controller.findTopicNames(providedIds).get().forEach { (id, nameOrError) =>
-      if (nameOrError.isError) {
-        appendResponse(null, id, nameOrError.error())
-      } else {
-        toAuthenticate.add(nameOrError.result())
-        idToName.put(id, nameOrError.result())
-      }
-    }
-    // Get the list of deletable topics (those we can delete) and the list of 
describeable
-    // topics.  If a topic can't be deleted or described, we have to act like 
it doesn't
-    // exist, even when it does.
-    val topicsToAuthenticate = toAuthenticate.asScala
-    val (describeable, deletable) = if (hasClusterAuth) {
-      (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet)
-    } else {
-      (getDescribableTopics(topicsToAuthenticate), 
getDeletableTopics(topicsToAuthenticate))
-    }
-    // For each topic that was provided by ID, check if authentication failed.
-    // If so, remove it from the idToName map and create an error response for 
it.
-    val iterator = idToName.entrySet().iterator()
-    while (iterator.hasNext) {
-      val entry = iterator.next()
-      val id = entry.getKey
-      val name = entry.getValue
-      if (!deletable.contains(name)) {
-        if (describeable.contains(name)) {
-          appendResponse(name, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+    controller.findTopicNames(providedIds).thenCompose(topicNames => {

Review comment:
       just code style. Could we use following style
   ```scala
   controller.findTopicNames(providedIds).thenCompose { topicNames =>
   
   }
   ```
   
   instead of
   
   ```scala
   controller.findTopicNames(providedIds).thenCompose(topicNames => {
   
   })
   ```

##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -505,9 +529,8 @@ class ControllerApis(val requestChannel: RequestChannel,
   }
 
   def handleAlterClientQuotas(request: RequestChannel.Request): Unit = {
+    authHelper.authorizeClusterOperation(request, ALTER_CONFIGS)

Review comment:
       just curious. Why this authorization check is executed "before" 
(casting) getting request? It seems most methods get request first. 

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -1023,6 +1027,22 @@ private QuorumController(LogContext logContext,
         });
     }
 
+    @Override
+    public CompletableFuture<AlterPartitionReassignmentsResponseData>
+            alterPartitionReassignments(AlterPartitionReassignmentsRequestData 
request) {
+        CompletableFuture<AlterPartitionReassignmentsResponseData> future = 
new CompletableFuture<>();
+        future.completeExceptionally(new UnsupportedOperationException());

Review comment:
       As this method is not supported, maybe we can move this implementation 
to interface? The benefit is that we don't need to add similar code to both 
`MockController` and `QuorumController`

##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -347,21 +370,22 @@ class ControllerApis(val requestChannel: RequestChannel,
         iterator.remove()
       }
     }
-    val response = controller.createTopics(effectiveRequest).get()
-    duplicateTopicNames.forEach { name =>
-      response.topics().add(new CreatableTopicResult().
-        setName(name).
-        setErrorCode(INVALID_REQUEST.code()).
-        setErrorMessage("Found multiple entries for this topic."))
-    }
-    topicNames.forEach { name =>
-      if (!authorizedTopicNames.contains(name)) {
+    controller.createTopics(effectiveRequest).thenApply(response => {
+      duplicateTopicNames.forEach { name =>
         response.topics().add(new CreatableTopicResult().
           setName(name).
-          setErrorCode(TOPIC_AUTHORIZATION_FAILED.code()))
+          setErrorCode(INVALID_REQUEST.code).
+          setErrorMessage("Found multiple entries for this topic."))

Review comment:
       How about unifying this error message? "Duplicate topic name."

##########
File path: core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
##########
@@ -155,6 +225,73 @@ class ControllerApisTest {
       brokerRegistrationResponse.errorCounts().asScala)
   }
 
+  @Test
+  def testUnauthorizedHandleAlterClientQuotas(): Unit = {
+    assertThrows(classOf[ClusterAuthorizationException], () => 
createControllerApis(
+      Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
+        handleAlterClientQuotas(buildRequest(new AlterClientQuotasRequest(
+          new AlterClientQuotasRequestData(), 0))))
+  }
+
+  @Test
+  def testUnauthorizedHandleIncrementalAlterConfigs(): Unit = {
+    val requestData = new IncrementalAlterConfigsRequestData().setResources(
+      new AlterConfigsResourceCollection(
+        util.Arrays.asList(new 
IncrementalAlterConfigsRequestData.AlterConfigsResource().
+          setResourceName("1").
+          setResourceType(ConfigResource.Type.BROKER.id()).
+          setConfigs(new AlterableConfigCollection(util.Arrays.asList(new 
AlterableConfig().
+            setName(KafkaConfig.LogCleanerBackoffMsProp).
+            setValue("100000").
+            setConfigOperation(AlterConfigOp.OpType.SET.id())).iterator())),
+        new IncrementalAlterConfigsRequestData.AlterConfigsResource().
+          setResourceName("foo").
+          setResourceType(ConfigResource.Type.TOPIC.id()).
+          setConfigs(new AlterableConfigCollection(util.Arrays.asList(new 
AlterableConfig().
+            setName(TopicConfig.FLUSH_MS_CONFIG).
+            setValue("1000").
+            setConfigOperation(AlterConfigOp.OpType.SET.id())).iterator())),
+        ).iterator()))
+    val request = buildRequest(new 
IncrementalAlterConfigsRequest.Builder(requestData).build(0))
+    createControllerApis(Some(createDenyAllAuthorizer()),
+      new 
MockController.Builder().build()).handleIncrementalAlterConfigs(request)

Review comment:
       Could you add unit test for `BROKER_LOGGER` and `UNKNOWN`?

##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -242,87 +253,99 @@ class ControllerApis(val requestChannel: RequestChannel,
     val toAuthenticate = new util.HashSet[String]
     toAuthenticate.addAll(providedNames)
     val idToName = new util.HashMap[Uuid, String]
-    controller.findTopicNames(providedIds).get().forEach { (id, nameOrError) =>
-      if (nameOrError.isError) {
-        appendResponse(null, id, nameOrError.error())
-      } else {
-        toAuthenticate.add(nameOrError.result())
-        idToName.put(id, nameOrError.result())
-      }
-    }
-    // Get the list of deletable topics (those we can delete) and the list of 
describeable
-    // topics.  If a topic can't be deleted or described, we have to act like 
it doesn't
-    // exist, even when it does.
-    val topicsToAuthenticate = toAuthenticate.asScala
-    val (describeable, deletable) = if (hasClusterAuth) {
-      (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet)
-    } else {
-      (getDescribableTopics(topicsToAuthenticate), 
getDeletableTopics(topicsToAuthenticate))
-    }
-    // For each topic that was provided by ID, check if authentication failed.
-    // If so, remove it from the idToName map and create an error response for 
it.
-    val iterator = idToName.entrySet().iterator()
-    while (iterator.hasNext) {
-      val entry = iterator.next()
-      val id = entry.getKey
-      val name = entry.getValue
-      if (!deletable.contains(name)) {
-        if (describeable.contains(name)) {
-          appendResponse(name, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+    controller.findTopicNames(providedIds).thenCompose(topicNames => {
+      topicNames.forEach { (id, nameOrError) =>
+        if (nameOrError.isError) {
+          appendResponse(null, id, nameOrError.error())
         } else {
-          appendResponse(null, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+          toAuthenticate.add(nameOrError.result())
+          idToName.put(id, nameOrError.result())
         }
-        iterator.remove()
       }
-    }
-    // For each topic that was provided by name, check if authentication 
failed.
-    // If so, create an error response for it.  Otherwise, add it to the 
idToName map.
-    controller.findTopicIds(providedNames).get().forEach { (name, idOrError) =>
-      if (!describeable.contains(name)) {
-        appendResponse(name, ZERO_UUID, new 
ApiError(TOPIC_AUTHORIZATION_FAILED))
-      } else if (idOrError.isError) {
-        appendResponse(name, ZERO_UUID, idOrError.error)
-      } else if (deletable.contains(name)) {
-        val id = idOrError.result()
-        if (duplicateProvidedIds.contains(id) || idToName.put(id, name) != 
null) {
-          // This is kind of a weird case: what if we supply topic ID X and 
also a name
-          // that maps to ID X?  In that case, _if authorization succeeds_, we 
end up
-          // here.  If authorization doesn't succeed, we refrain from 
commenting on the
-          // situation since it would reveal topic ID mappings.
-          duplicateProvidedIds.add(id)
-          idToName.remove(id)
-          appendResponse(name, id, new ApiError(INVALID_REQUEST,
-            "The provided topic name maps to an ID that was already 
supplied."))
-        }
+      // Get the list of deletable topics (those we can delete) and the list 
of describeable
+      // topics.
+      val topicsToAuthenticate = toAuthenticate.asScala
+      val (describeable, deletable) = if (hasClusterAuth) {
+        (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet)
       } else {
-        appendResponse(name, ZERO_UUID, new 
ApiError(TOPIC_AUTHORIZATION_FAILED))
+        (getDescribableTopics(topicsToAuthenticate), 
getDeletableTopics(topicsToAuthenticate))
       }
-    }
-    // Finally, the idToName map contains all the topics that we are 
authorized to delete.
-    // Perform the deletion and create responses for each one.
-    val idToError = controller.deleteTopics(idToName.keySet).get()
-    idToError.forEach { (id, error) =>
-        appendResponse(idToName.get(id), id, error)
-    }
-    // Shuffle the responses so that users can not use patterns in their 
positions to
-    // distinguish between absent topics and topics we are not permitted to 
see.
-    Collections.shuffle(responses)
-    responses
+      // For each topic that was provided by ID, check if authentication 
failed.
+      // If so, remove it from the idToName map and create an error response 
for it.
+      val iterator = idToName.entrySet().iterator()
+      while (iterator.hasNext) {
+        val entry = iterator.next()
+        val id = entry.getKey
+        val name = entry.getValue
+        if (!deletable.contains(name)) {
+          if (describeable.contains(name)) {
+            appendResponse(name, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+          } else {
+            appendResponse(null, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+          }
+          iterator.remove()
+        }
+      }
+      // For each topic that was provided by name, check if authentication 
failed.
+      // If so, create an error response for it.  Otherwise, add it to the 
idToName map.

Review comment:
       redundant space




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to