[ https://issues.apache.org/jira/browse/KAFKA-7082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519850#comment-16519850 ]
ASF GitHub Bot commented on KAFKA-7082: --------------------------------------- ijuma closed pull request #5259: KAFKA-7082: Concurrent create topics may throw NodeExistsException URL: https://github.com/apache/kafka/pull/5259 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala b/core/src/main/scala/kafka/zk/AdminZkClient.scala index 8a6b3ee212d..060c0b4d4ae 100644 --- a/core/src/main/scala/kafka/zk/AdminZkClient.scala +++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala @@ -93,7 +93,6 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging { update: Boolean = false) { validateCreateOrUpdateTopic(topic, partitionReplicaAssignment, config, update) - // Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported if (!update) { // write out the config if there is any, this isn't transactional with the partition assignments zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic, config) diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index bb342945ea8..ec4932ab47b 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -246,6 +246,12 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean /** * Sets or creates the entity znode path with the given configs depending * on whether it already exists or not. + * + * If this is method is called concurrently, the last writer wins. In cases where we update configs and then + * partition assignment (i.e. create topic), it's possible for one thread to set this and the other to set the + * partition assignment. As such, the recommendation is to never call create topic for the same topic with different + * configs/partition assignment concurrently. + * * @param rootEntityType entity type * @param sanitizedEntityName entity name * @throws KeeperException if there is an error while setting or creating the znode @@ -257,16 +263,19 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean retryRequestUntilConnected(setDataRequest) } - def create(configData: Array[Byte]) = { + def createOrSet(configData: Array[Byte]): Unit = { val path = ConfigEntityZNode.path(rootEntityType, sanitizedEntityName) - createRecursive(path, ConfigEntityZNode.encode(config)) + try createRecursive(path, ConfigEntityZNode.encode(config)) + catch { + case _: NodeExistsException => set(configData).maybeThrow + } } val configData = ConfigEntityZNode.encode(config) val setDataResponse = set(configData) setDataResponse.resultCode match { - case Code.NONODE => create(configData) + case Code.NONODE => createOrSet(configData) case _ => setDataResponse.maybeThrow } } diff --git a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala index fe5fbff55d0..39745e5e608 100644 --- a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala @@ -28,8 +28,10 @@ import kafka.utils.TestUtils._ import kafka.utils.{Logging, TestUtils} import kafka.zk.{AdminZkClient, KafkaZkClient, ZooKeeperTestHarness} import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.errors.{InvalidReplicaAssignmentException, InvalidTopicException, TopicExistsException} import org.apache.kafka.common.metrics.Quota +import org.apache.kafka.test.{TestUtils => JTestUtils} import org.easymock.EasyMock import org.junit.Assert._ import org.junit.{After, Test} @@ -132,7 +134,7 @@ class AdminZkClientTest extends ZooKeeperTestHarness with Logging with RackAware } @Test - def testConcurrentTopicCreation() { + def testMockedConcurrentTopicCreation() { val topic = "test.topic" // simulate the ZK interactions that can happen when a topic is concurrently created by multiple processes @@ -147,6 +149,28 @@ class AdminZkClientTest extends ZooKeeperTestHarness with Logging with RackAware } } + @Test + def testConcurrentTopicCreation() { + val topic = "test-concurrent-topic-creation" + TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4)) + val props = new Properties + props.setProperty(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2") + def createTopic(): Unit = { + try adminZkClient.createTopic(topic, 3, 1, props) + catch { case _: TopicExistsException => () } + val (_, partitionAssignment) = zkClient.getPartitionAssignmentForTopics(Set(topic)).head + assertEquals(3, partitionAssignment.size) + partitionAssignment.foreach { case (partition, replicas) => + assertEquals(s"Unexpected replication factor for $partition", 1, replicas.size) + } + val savedProps = zkClient.getEntityConfigs(ConfigType.Topic, topic) + assertEquals(props, savedProps) + } + + TestUtils.assertConcurrent("Concurrent topic creation failed", Seq(createTopic, createTopic), + JTestUtils.DEFAULT_MAX_WAIT_MS.toInt) + } + /** * This test creates a topic with a few config overrides and checks that the configs are applied to the new topic * then changes the config and checks that the new values take effect. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Concurrent createTopics calls may throw NodeExistsException > ----------------------------------------------------------- > > Key: KAFKA-7082 > URL: https://issues.apache.org/jira/browse/KAFKA-7082 > Project: Kafka > Issue Type: Bug > Affects Versions: 1.1.0 > Reporter: Ismael Juma > Assignee: Ismael Juma > Priority: Critical > Labels: regression > Fix For: 2.0.1, 1.1.2 > > > This exception is unexpected causing an `UnknownServerException` to be thrown > back to the client. Example below: > {code} > org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = > NodeExists for /config/topics/connect-configs > at > org.apache.zookeeper.KeeperException.create(KeeperException.java:119) > at > org.apache.zookeeper.KeeperException.create(KeeperException.java:51) > at kafka.zookeeper.AsyncResponse.maybeThrow(ZooKeeperClient.scala:472) > at kafka.zk.KafkaZkClient.createRecursive(KafkaZkClient.scala:1400) > at kafka.zk.KafkaZkClient.create$1(KafkaZkClient.scala:262) > at > kafka.zk.KafkaZkClient.setOrCreateEntityConfigs(KafkaZkClient.scala:269) > at > kafka.zk.AdminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(AdminZkClient.scala:99) > at kafka.server.AdminManager$$anonfun$2.apply(AdminManager.scala:126) > at kafka.server.AdminManager$$anonfun$2.apply(AdminManager.scala:81) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)