junrao commented on code in PR #19635:
URL: https://github.com/apache/kafka/pull/19635#discussion_r2080213276


##########
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala:
##########
@@ -838,11 +838,11 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
   @ValueSource(strings = Array("kip932"))
   def testAuthorizationWithTopicExisting(quorum: String): Unit = {
     //First create the topic so we have a valid topic ID
-    sendRequests(mutable.Map(ApiKeys.CREATE_TOPICS -> createTopicsRequest))
+    createTopicWithBrokerPrincipal(topic)
 
     val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
       ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = true),
-      ApiKeys.PRODUCE -> createProduceRequest,
+      ApiKeys.PRODUCE -> createProduceRequest("", 
getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID), 
ApiKeys.PRODUCE.latestVersion()),

Review Comment:
   Since we expect the topicId to be present, should we throw an exception if 
the topicId can't be found?



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -411,10 +411,9 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
 
         val topicPartition = new TopicPartition(topicName, partition.index())
-        if (topicName.isEmpty)
+        // To compatible with the old version, only return UNKNOWN_TOPIC_ID if 
topicId is not default value
+        if (topicName.isEmpty && !topic.topicId().equals(Uuid.ZERO_UUID))

Review Comment:
   If the request version is 13 and the topicId is set to 0, we should return 
UNKNOWN_TOPIC_ID to match what's in the fetch request. Only if the request 
version is below 13 and topicName is empty, should we return 
UNKNOWN_TOPIC_OR_PARTITION. So, it seems that we should do 
`fetchRequest.version() >= 13` instead of 
`!topic.topicId().equals(Uuid.ZERO_UUID)`.



##########
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala:
##########
@@ -1021,7 +1030,7 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
    * The newer version is covered by testAuthorizationWithTopicNotExisting.
    */
   @ParameterizedTest
-  @CsvSource(value = Array("false", "true"))
+  @ValueSource(booleans = Array(true, false))

Review Comment:
   Should we change the ordering in `testTopicIdAuthorization` too to be 
consistent??



##########
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala:
##########
@@ -924,6 +924,54 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     sendRequests(requestKeyToRequest, false, topicNames)
   }
 
+  /**
+   * Test that the produce request fails with TOPIC_AUTHORIZATION_FAILED if 
the client doesn't have permission
+   * and topic name is used in the request. Even if the topic doesn't exist, 
we return TOPIC_AUTHORIZATION_FAILED to
+   * prevent leaking the topic name.
+   * This case covers produce request version from oldest to 12.
+   * The newer version is covered by testAuthorizationWithTopicNotExisting.

Review Comment:
   testAuthorizationWithTopicNotExisting => 
testAuthorizationWithTopicNotExisting and testAuthorizationWithTopicExisting



##########
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala:
##########
@@ -974,19 +1022,29 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     }
   }
 
-  /*
-   * even if the topic doesn't exist, request APIs should not leak the topic 
name
+  /**
+   * Test that the fetch request fails with TOPIC_AUTHORIZATION_FAILED if the 
client doesn't have permission
+   * and topic name is used in the request. Even if the topic doesn't exist, 
we return TOPIC_AUTHORIZATION_FAILED to
+   * prevent leaking the topic name.
+   * This case covers fetch request version from oldest to 12.
+   * The newer version is covered by testAuthorizationWithTopicNotExisting.

Review Comment:
   testAuthorizationWithTopicNotExisting => 
testAuthorizationWithTopicNotExisting and testAuthorizationWithTopicExisting



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -411,10 +411,9 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
 
         val topicPartition = new TopicPartition(topicName, partition.index())
-        if (topicName.isEmpty)
+        // To compatible with the old version, only return UNKNOWN_TOPIC_ID if 
topicId is not default value

Review Comment:
   To compatible => To be compatible



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to