ahuang98 commented on code in PR #21399:
URL: https://github.com/apache/kafka/pull/21399#discussion_r2843861924
##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -141,11 +141,13 @@
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
-
/**
- * The ReplicationControlManager is the part of the controller which deals
with topics
- * and partitions. It is responsible for managing the in-sync replica set and
leader
- * of each partition, as well as administrative tasks like creating or
deleting topics.
+ * The ReplicationControlManager is the part of the controller which deals with
Review Comment:
let's undo all these formatting changes (if we think we need them, we should
break it out into a separate formatting PR)
##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -685,19 +697,28 @@ ControllerResult<CreateTopicsResponseData> createTopics(
for (CreatableTopic topic : request.topics()) {
ApiError error = topicErrors.get(topic.name());
if (error != null) {
- data.topics().add(new CreatableTopicResult().
- setName(topic.name()).
- setErrorCode(error.error().code()).
- setErrorMessage(error.message()));
- resultsBuilder.append(resultsPrefix).append(topic).append(":
").
- append(error.error()).append("
(").append(error.message()).append(")");
+ CreatableTopicResult result = new CreatableTopicResult()
+ .setName(topic.name())
+ .setErrorCode(error.error().code())
+ .setErrorMessage(error.message());
+
+ // KAFKA-20097: For existing topics, populate the topic ID
+ if (error.error() == Errors.TOPIC_ALREADY_EXISTS) {
+ Uuid existingTopicId = topicsByName.get(topic.name());
+ if (existingTopicId != null) {
+ result.setTopicId(existingTopicId);
+ }
+ }
+
+ data.topics().add(result);
+ resultsBuilder.append(resultsPrefix).append(topic).append(":
").append(error.error()).append(" (")
+ .append(error.message()).append(")");
resultsPrefix = ", ";
Review Comment:
might be worth deduplicating these lines by adding an explicit else case
##########
core/src/main/scala/kafka/server/ControllerApis.scala:
##########
@@ -413,39 +413,61 @@ class ControllerApis(
getCreatableTopics.apply(allowedTopicNames)
}
val describableTopicNames =
getDescribableTopics.apply(allowedTopicNames).asJava
- val effectiveRequest = request.duplicate()
- val iterator = effectiveRequest.topics().iterator()
- while (iterator.hasNext) {
- val creatableTopic = iterator.next()
- if (duplicateTopicNames.contains(creatableTopic.name()) ||
- !authorizedTopicNames.contains(creatableTopic.name())) {
- iterator.remove()
- }
- }
- controller.createTopics(context, effectiveRequest,
describableTopicNames).thenApply { response =>
- duplicateTopicNames.forEach { name =>
- response.topics().add(new CreatableTopicResult().
- setName(name).
- setErrorCode(INVALID_REQUEST.code).
- setErrorMessage("Duplicate topic name."))
- }
- topicNames.forEach { name =>
- if (name == Topic.CLUSTER_METADATA_TOPIC_NAME) {
- response.topics().add(new CreatableTopicResult().
- setName(name).
- setErrorCode(INVALID_REQUEST.code).
- setErrorMessage(s"Creation of internal topic
${Topic.CLUSTER_METADATA_TOPIC_NAME} is prohibited."))
- } else if (!authorizedTopicNames.contains(name)) {
- response.topics().add(new CreatableTopicResult().
- setName(name).
- setErrorCode(TOPIC_AUTHORIZATION_FAILED.code).
- setErrorMessage("Authorization failed."))
- }
- }
- response
- }
+ // Create a map to collect validation errors upfront
+val validationErrors = new util.ArrayList[CreatableTopicResult]()
Review Comment:
what's the rationale of these additional changes?
##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -379,16 +391,15 @@ static Map<String, String>
translateCreationConfigs(CreatableTopicConfigCollecti
final KRaftClusterDescriber clusterDescriber = new KRaftClusterDescriber();
private ReplicationControlManager(
- SnapshotRegistry snapshotRegistry,
- LogContext logContext,
- short defaultReplicationFactor,
- int defaultNumPartitions,
- int maxElectionsPerImbalance,
- ConfigurationControlManager configurationControl,
- ClusterControlManager clusterControl,
- Optional<CreateTopicPolicy> createTopicPolicy,
- FeatureControlManager featureControl
- ) {
+ SnapshotRegistry snapshotRegistry,
+ LogContext logContext,
+ short defaultReplicationFactor,
+ int defaultNumPartitions,
+ int maxElectionsPerImbalance,
+ ConfigurationControlManager configurationControl,
+ ClusterControlManager clusterControl,
+ Optional<CreateTopicPolicy> createTopicPolicy,
+ FeatureControlManager featureControl) {
Review Comment:
let's make sure to revert the format changes like `) {` here
##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -284,19 +288,22 @@ static Map<String, String>
translateCreationConfigs(CreatableTopicConfigCollecti
private final Logger log;
/**
- * The KIP-464 default replication factor that is used if a CreateTopics
request does
+ * The KIP-464 default replication factor that is used if a CreateTopics
request
+ * does
Review Comment:
seems like a lot of extra formatting changes snuck in?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]