chia7712 commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r586131042
##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -154,17 +162,164 @@ class ControllerApis(val requestChannel: RequestChannel,
requestThrottleMs => createResponseCallback(requestThrottleMs))
}
+ def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+ val responses = deleteTopics(request.body[DeleteTopicsRequest].data,
+ request.context.apiVersion,
+ authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+ names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC,
names)(n => n),
+ names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC,
names)(n => n))
+ requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+ val responseData = new DeleteTopicsResponseData().
+ setResponses(new DeletableTopicResultCollection(responses.iterator)).
+ setThrottleTimeMs(throttleTimeMs)
+ new DeleteTopicsResponse(responseData)
+ })
+ }
+
+ def deleteTopics(request: DeleteTopicsRequestData,
+ apiVersion: Int,
+ hasClusterAuth: Boolean,
+ getDescribableTopics: Iterable[String] => Set[String],
+ getDeletableTopics: Iterable[String] => Set[String]):
util.List[DeletableTopicResult] = {
+ // Check if topic deletion is enabled at all.
+ if (!config.deleteTopicEnable) {
+ if (apiVersion < 3) {
+ throw new InvalidRequestException("Topic deletion is disabled.")
+ } else {
+ throw new TopicDeletionDisabledException()
+ }
+ }
+ // The first step is to load up the names and IDs that have been provided
by the
+ // request. This is a bit messy because we support multiple ways of
referring to
+ // topics (both by name and by id) and because we need to check for
duplicates or
+ // other invalid inputs.
+ val responses = new util.ArrayList[DeletableTopicResult]
+ def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+ responses.add(new DeletableTopicResult().
+ setName(name).
+ setTopicId(id).
+ setErrorCode(error.error.code).
+ setErrorMessage(error.message))
+ }
+ val providedNames = new util.HashSet[String]
+ val duplicateProvidedNames = new util.HashSet[String]
+ val providedIds = new util.HashSet[Uuid]
+ val duplicateProvidedIds = new util.HashSet[Uuid]
+ def addProvidedName(name: String): Unit = {
+ if (duplicateProvidedNames.contains(name) || !providedNames.add(name)) {
+ duplicateProvidedNames.add(name)
+ providedNames.remove(name)
+ }
+ }
+ request.topicNames.forEach(addProvidedName)
+ request.topics.forEach {
+ topic => if (topic.name == null) {
+ if (topic.topicId.equals(ZERO_UUID)) {
+ appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
+ "Neither topic name nor id were specified."))
+ } else if (duplicateProvidedIds.contains(topic.topicId) ||
!providedIds.add(topic.topicId)) {
+ duplicateProvidedIds.add(topic.topicId)
+ providedIds.remove(topic.topicId)
+ }
+ } else {
+ if (topic.topicId.equals(ZERO_UUID)) {
+ addProvidedName(topic.name)
+ } else {
+ appendResponse(topic.name, topic.topicId, new
ApiError(INVALID_REQUEST,
+ "You may not specify both topic name and topic id."))
+ }
+ }
+ }
+ // Create error responses for duplicates.
+ duplicateProvidedNames.forEach(name => appendResponse(name, ZERO_UUID,
+ new ApiError(INVALID_REQUEST, "Duplicate topic name.")))
+ duplicateProvidedIds.forEach(id => appendResponse(null, id,
+ new ApiError(INVALID_REQUEST, "Duplicate topic id.")))
+ // At this point we have all the valid names and IDs that have been
provided.
+ // However, the Authorizer needs topic names as inputs, not topic IDs. So
+ // we need to resolve all IDs to names.
+ val toAuthenticate = new util.HashSet[String]
+ toAuthenticate.addAll(providedNames)
+ val idToName = new util.HashMap[Uuid, String]
+ controller.findTopicNames(providedIds).get().forEach { (id, nameOrError) =>
+ if (nameOrError.isError) {
+ appendResponse(null, id, nameOrError.error())
+ } else {
+ toAuthenticate.add(nameOrError.result())
+ idToName.put(id, nameOrError.result())
+ }
+ }
+ // Get the list of deletable topics (those we can delete) and the list of
describeable
+ // topics. If a topic can't be deleted or described, we have to act like
it doesn't
+ // exist, even when it does.
+ val topicsToAuthenticate = toAuthenticate.asScala
+ val (describeable, deletable) = if (hasClusterAuth) {
+ (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet)
+ } else {
+ (getDescribableTopics(topicsToAuthenticate),
getDeletableTopics(topicsToAuthenticate))
+ }
+ // For each topic that was provided by ID, check if authentication failed.
+ // If so, remove it from the idToName map and create an error response for
it.
+ val iterator = idToName.entrySet().iterator()
+ while (iterator.hasNext) {
+ val entry = iterator.next()
+ val id = entry.getKey
+ val name = entry.getValue
+ if (!deletable.contains(name)) {
+ if (describeable.contains(name)) {
+ appendResponse(name, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+ } else {
+ appendResponse(null, id, new ApiError(UNKNOWN_TOPIC_ID))
Review comment:
It reaches consensus on #10223 that the error should be
`TOPIC_AUTHORIZATION_FAILED` rather than `UNKNOWN_TOPIC_ID`
see `KafkaApis`
(https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1901)
##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -154,17 +162,164 @@ class ControllerApis(val requestChannel: RequestChannel,
requestThrottleMs => createResponseCallback(requestThrottleMs))
}
+ def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+ val responses = deleteTopics(request.body[DeleteTopicsRequest].data,
+ request.context.apiVersion,
+ authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+ names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC,
names)(n => n),
+ names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC,
names)(n => n))
+ requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+ val responseData = new DeleteTopicsResponseData().
+ setResponses(new DeletableTopicResultCollection(responses.iterator)).
+ setThrottleTimeMs(throttleTimeMs)
+ new DeleteTopicsResponse(responseData)
+ })
+ }
+
+ def deleteTopics(request: DeleteTopicsRequestData,
+ apiVersion: Int,
+ hasClusterAuth: Boolean,
+ getDescribableTopics: Iterable[String] => Set[String],
+ getDeletableTopics: Iterable[String] => Set[String]):
util.List[DeletableTopicResult] = {
+ // Check if topic deletion is enabled at all.
+ if (!config.deleteTopicEnable) {
+ if (apiVersion < 3) {
+ throw new InvalidRequestException("Topic deletion is disabled.")
+ } else {
+ throw new TopicDeletionDisabledException()
+ }
+ }
+ // The first step is to load up the names and IDs that have been provided
by the
+ // request. This is a bit messy because we support multiple ways of
referring to
+ // topics (both by name and by id) and because we need to check for
duplicates or
+ // other invalid inputs.
+ val responses = new util.ArrayList[DeletableTopicResult]
+ def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+ responses.add(new DeletableTopicResult().
+ setName(name).
+ setTopicId(id).
+ setErrorCode(error.error.code).
+ setErrorMessage(error.message))
+ }
+ val providedNames = new util.HashSet[String]
+ val duplicateProvidedNames = new util.HashSet[String]
+ val providedIds = new util.HashSet[Uuid]
+ val duplicateProvidedIds = new util.HashSet[Uuid]
+ def addProvidedName(name: String): Unit = {
+ if (duplicateProvidedNames.contains(name) || !providedNames.add(name)) {
+ duplicateProvidedNames.add(name)
+ providedNames.remove(name)
+ }
+ }
+ request.topicNames.forEach(addProvidedName)
+ request.topics.forEach {
+ topic => if (topic.name == null) {
+ if (topic.topicId.equals(ZERO_UUID)) {
+ appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
+ "Neither topic name nor id were specified."))
+ } else if (duplicateProvidedIds.contains(topic.topicId) ||
!providedIds.add(topic.topicId)) {
+ duplicateProvidedIds.add(topic.topicId)
+ providedIds.remove(topic.topicId)
+ }
+ } else {
+ if (topic.topicId.equals(ZERO_UUID)) {
+ addProvidedName(topic.name)
+ } else {
+ appendResponse(topic.name, topic.topicId, new
ApiError(INVALID_REQUEST,
+ "You may not specify both topic name and topic id."))
+ }
+ }
+ }
+ // Create error responses for duplicates.
+ duplicateProvidedNames.forEach(name => appendResponse(name, ZERO_UUID,
+ new ApiError(INVALID_REQUEST, "Duplicate topic name.")))
+ duplicateProvidedIds.forEach(id => appendResponse(null, id,
+ new ApiError(INVALID_REQUEST, "Duplicate topic id.")))
+ // At this point we have all the valid names and IDs that have been
provided.
+ // However, the Authorizer needs topic names as inputs, not topic IDs. So
+ // we need to resolve all IDs to names.
+ val toAuthenticate = new util.HashSet[String]
+ toAuthenticate.addAll(providedNames)
+ val idToName = new util.HashMap[Uuid, String]
+ controller.findTopicNames(providedIds).get().forEach { (id, nameOrError) =>
+ if (nameOrError.isError) {
+ appendResponse(null, id, nameOrError.error())
+ } else {
+ toAuthenticate.add(nameOrError.result())
+ idToName.put(id, nameOrError.result())
+ }
+ }
+ // Get the list of deletable topics (those we can delete) and the list of
describeable
+ // topics. If a topic can't be deleted or described, we have to act like
it doesn't
+ // exist, even when it does.
+ val topicsToAuthenticate = toAuthenticate.asScala
+ val (describeable, deletable) = if (hasClusterAuth) {
+ (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet)
+ } else {
+ (getDescribableTopics(topicsToAuthenticate),
getDeletableTopics(topicsToAuthenticate))
+ }
+ // For each topic that was provided by ID, check if authentication failed.
+ // If so, remove it from the idToName map and create an error response for
it.
+ val iterator = idToName.entrySet().iterator()
+ while (iterator.hasNext) {
+ val entry = iterator.next()
+ val id = entry.getKey
+ val name = entry.getValue
+ if (!deletable.contains(name)) {
+ if (describeable.contains(name)) {
+ appendResponse(name, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+ } else {
+ appendResponse(null, id, new ApiError(UNKNOWN_TOPIC_ID))
+ }
+ iterator.remove()
+ }
+ }
+ // For each topic that was provided by name, check if authentication
failed.
+ // If so, create an error response for it. Otherwise, add it to the
idToName map.
+ controller.findTopicIds(providedNames).get().forEach { (name, idOrError) =>
+ if (idOrError.isError) {
+ appendResponse(name, ZERO_UUID, idOrError.error)
+ } else if (deletable.contains(name)) {
+ val id = idOrError.result()
+ if (duplicateProvidedIds.contains(id) || idToName.put(id, name) !=
null) {
+ // This is kind of a weird case: what if we supply topic ID X and
also a name
+ // that maps to ID X? In that case, _if authorization succeeds_, we
end up
+ // here. If authorization doesn't succeed, we refrain from
commenting on the
+ // situation since it would reveal topic ID mappings.
+ duplicateProvidedIds.add(id)
+ idToName.remove(id)
+ appendResponse(name, id, new ApiError(INVALID_REQUEST,
+ "The provided topic name maps to an ID that was already
supplied."))
+ }
+ } else if (describeable.contains(name)) {
+ appendResponse(name, idOrError.result(), new
ApiError(TOPIC_AUTHORIZATION_FAILED))
+ } else {
+ appendResponse(name, ZERO_UUID, new
ApiError(UNKNOWN_TOPIC_OR_PARTITION))
Review comment:
topic id is NOT sensitive (see discussion
https://issues.apache.org/jira/browse/KAFKA-12369) so it is ok to return topic
id. Also, the error code should be `TOPIC_AUTHORIZATION_FAILED` as it can
produce quick failure (`UNKNOWN_TOPIC_OR_PARTITION ` is a retryable error).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]