junrao commented on code in PR #19635: URL: https://github.com/apache/kafka/pull/19635#discussion_r2080213276
########## core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala: ########## @@ -838,11 +838,11 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { @ValueSource(strings = Array("kip932")) def testAuthorizationWithTopicExisting(quorum: String): Unit = { //First create the topic so we have a valid topic ID - sendRequests(mutable.Map(ApiKeys.CREATE_TOPICS -> createTopicsRequest)) + createTopicWithBrokerPrincipal(topic) val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = true), - ApiKeys.PRODUCE -> createProduceRequest, + ApiKeys.PRODUCE -> createProduceRequest("", getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID), ApiKeys.PRODUCE.latestVersion()), Review Comment: Since we expect the topicId to be present, should we throw an exception if the topicId can't be found? ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -411,10 +411,9 @@ class KafkaApis(val requestChannel: RequestChannel, } val topicPartition = new TopicPartition(topicName, partition.index()) - if (topicName.isEmpty) + // To compatible with the old version, only return UNKNOWN_TOPIC_ID if topicId is not default value + if (topicName.isEmpty && !topic.topicId().equals(Uuid.ZERO_UUID)) Review Comment: If the request version is 13 and the topicId is set to 0, we should return UNKNOWN_TOPIC_ID to match what's in the fetch request. Only if the request version is below 13 and topicName is empty, should we return UNKNOWN_TOPIC_OR_PARTITION. So, it seems that we should do `fetchRequest.version() >= 13` instead of `!topic.topicId().equals(Uuid.ZERO_UUID)`. ########## core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala: ########## @@ -1021,7 +1030,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { * The newer version is covered by testAuthorizationWithTopicNotExisting. */ @ParameterizedTest - @CsvSource(value = Array("false", "true")) + @ValueSource(booleans = Array(true, false)) Review Comment: Should we change the ordering in `testTopicIdAuthorization` too to be consistent?? ########## core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala: ########## @@ -924,6 +924,54 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { sendRequests(requestKeyToRequest, false, topicNames) } + /** + * Test that the produce request fails with TOPIC_AUTHORIZATION_FAILED if the client doesn't have permission + * and topic name is used in the request. Even if the topic doesn't exist, we return TOPIC_AUTHORIZATION_FAILED to + * prevent leaking the topic name. + * This case covers produce request version from oldest to 12. + * The newer version is covered by testAuthorizationWithTopicNotExisting. Review Comment: testAuthorizationWithTopicNotExisting => testAuthorizationWithTopicNotExisting and testAuthorizationWithTopicExisting ########## core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala: ########## @@ -974,19 +1022,29 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { } } - /* - * even if the topic doesn't exist, request APIs should not leak the topic name + /** + * Test that the fetch request fails with TOPIC_AUTHORIZATION_FAILED if the client doesn't have permission + * and topic name is used in the request. Even if the topic doesn't exist, we return TOPIC_AUTHORIZATION_FAILED to + * prevent leaking the topic name. + * This case covers fetch request version from oldest to 12. + * The newer version is covered by testAuthorizationWithTopicNotExisting. Review Comment: testAuthorizationWithTopicNotExisting => testAuthorizationWithTopicNotExisting and testAuthorizationWithTopicExisting ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -411,10 +411,9 @@ class KafkaApis(val requestChannel: RequestChannel, } val topicPartition = new TopicPartition(topicName, partition.index()) - if (topicName.isEmpty) + // To compatible with the old version, only return UNKNOWN_TOPIC_ID if topicId is not default value Review Comment: To compatible => To be compatible -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org