chia7712 commented on a change in pull request #10223:
URL: https://github.com/apache/kafka/pull/10223#discussion_r584123786
##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1874,7 +1874,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (topic.name() != null && topic.topicId() != Uuid.ZERO_UUID)
throw new InvalidRequestException("Topic name and topic ID can not
both be specified.")
val name = if (topic.topicId() == Uuid.ZERO_UUID) topic.name()
- else
zkSupport.controller.controllerContext.topicNames.getOrElse(topic.topicId(),
null)
+ else
zkSupport.controller.controllerContext.topicName(topic.topicId).orNull
Review comment:
Do you want to narrow the access of `topicNames`? If so, we should
change the modifier of `topicNames` from public to private
##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1884,20 +1884,24 @@ class KafkaApis(val requestChannel: RequestChannel,
val authorizedDeleteTopics =
authHelper.filterByAuthorized(request.context, DELETE, TOPIC,
results.asScala.filter(result => result.name() != null))(_.name)
results.forEach { topic =>
- val unresolvedTopicId = !(topic.topicId() == Uuid.ZERO_UUID) &&
topic.name() == null
- if (!config.usesTopicId &&
topicIdsFromRequest.contains(topic.topicId)) {
- topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
- topic.setErrorMessage("Topic IDs are not supported on the server.")
- } else if (unresolvedTopicId)
- topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
- else if (topicIdsFromRequest.contains(topic.topicId) &&
!authorizedDescribeTopics(topic.name))
- topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
- else if (!authorizedDeleteTopics.contains(topic.name))
- topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
- else if (!metadataCache.contains(topic.name))
- topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
- else
- toDelete += topic.name
+ val unresolvedTopicId = topic.topicId() != Uuid.ZERO_UUID &&
topic.name() == null
+ if (!config.usesTopicId &&
topicIdsFromRequest.contains(topic.topicId)) {
+ topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
+ topic.setErrorMessage("Topic IDs are not supported on the server.")
+ } else if (unresolvedTopicId) {
+ topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
+ } else if (topicIdsFromRequest.contains(topic.topicId) &&
!authorizedDescribeTopics(topic.name)) {
+ // Because the client does not have Describe permission, the name
should
+ // not be returned in the response.
+ topic.setName(null)
+ topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
Review comment:
I prepared to discuss this error code on
https://github.com/apache/kafka/pull/10184#discussion_r583933291 :)
### return `UNKNOWN_TOPIC_ID`
Could you add clear error message to this response (call `setErrorMessage`)?
I can imagine users get confusing for the error code `UNKNOWN_TOPIC_ID` because
it presents two scenarios now (the id does not exist or you have no permission
to describe topic).
### return `TOPIC_AUTHORIZATION_FAILED`
It indicates accurate error and it can help user handle un-authorized
requests. Personally, this error is more suitable :)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]