[ 
https://issues.apache.org/jira/browse/KAFKA-7082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519850#comment-16519850
 ] 

ASF GitHub Bot commented on KAFKA-7082:
---------------------------------------

ijuma closed pull request #5259: KAFKA-7082: Concurrent create topics may throw 
NodeExistsException
URL: https://github.com/apache/kafka/pull/5259
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala 
b/core/src/main/scala/kafka/zk/AdminZkClient.scala
index 8a6b3ee212d..060c0b4d4ae 100644
--- a/core/src/main/scala/kafka/zk/AdminZkClient.scala
+++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala
@@ -93,7 +93,6 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
                                                      update: Boolean = false) {
     validateCreateOrUpdateTopic(topic, partitionReplicaAssignment, config, 
update)
 
-    // Configs only matter if a topic is being created. Changing configs via 
AlterTopic is not supported
     if (!update) {
       // write out the config if there is any, this isn't transactional with 
the partition assignments
       zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic, config)
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala 
b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index bb342945ea8..ec4932ab47b 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -246,6 +246,12 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
   /**
    * Sets or creates the entity znode path with the given configs depending
    * on whether it already exists or not.
+   *
+   * If this is method is called concurrently, the last writer wins. In cases 
where we update configs and then
+   * partition assignment (i.e. create topic), it's possible for one thread to 
set this and the other to set the
+   * partition assignment. As such, the recommendation is to never call create 
topic for the same topic with different
+   * configs/partition assignment concurrently.
+   *
    * @param rootEntityType entity type
    * @param sanitizedEntityName entity name
    * @throws KeeperException if there is an error while setting or creating 
the znode
@@ -257,16 +263,19 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
       retryRequestUntilConnected(setDataRequest)
     }
 
-    def create(configData: Array[Byte]) = {
+    def createOrSet(configData: Array[Byte]): Unit = {
       val path = ConfigEntityZNode.path(rootEntityType, sanitizedEntityName)
-      createRecursive(path, ConfigEntityZNode.encode(config))
+      try createRecursive(path, ConfigEntityZNode.encode(config))
+      catch {
+        case _: NodeExistsException => set(configData).maybeThrow
+      }
     }
 
     val configData = ConfigEntityZNode.encode(config)
 
     val setDataResponse = set(configData)
     setDataResponse.resultCode match {
-      case Code.NONODE => create(configData)
+      case Code.NONODE => createOrSet(configData)
       case _ => setDataResponse.maybeThrow
     }
   }
diff --git a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala 
b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
index fe5fbff55d0..39745e5e608 100644
--- a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
@@ -28,8 +28,10 @@ import kafka.utils.TestUtils._
 import kafka.utils.{Logging, TestUtils}
 import kafka.zk.{AdminZkClient, KafkaZkClient, ZooKeeperTestHarness}
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.errors.{InvalidReplicaAssignmentException, 
InvalidTopicException, TopicExistsException}
 import org.apache.kafka.common.metrics.Quota
+import org.apache.kafka.test.{TestUtils => JTestUtils}
 import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.{After, Test}
@@ -132,7 +134,7 @@ class AdminZkClientTest extends ZooKeeperTestHarness with 
Logging with RackAware
   }
 
   @Test
-  def testConcurrentTopicCreation() {
+  def testMockedConcurrentTopicCreation() {
     val topic = "test.topic"
 
     // simulate the ZK interactions that can happen when a topic is 
concurrently created by multiple processes
@@ -147,6 +149,28 @@ class AdminZkClientTest extends ZooKeeperTestHarness with 
Logging with RackAware
     }
   }
 
+  @Test
+  def testConcurrentTopicCreation() {
+    val topic = "test-concurrent-topic-creation"
+    TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4))
+    val props = new Properties
+    props.setProperty(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2")
+    def createTopic(): Unit = {
+      try adminZkClient.createTopic(topic, 3, 1, props)
+      catch { case _: TopicExistsException => () }
+      val (_, partitionAssignment) = 
zkClient.getPartitionAssignmentForTopics(Set(topic)).head
+      assertEquals(3, partitionAssignment.size)
+      partitionAssignment.foreach { case (partition, replicas) =>
+        assertEquals(s"Unexpected replication factor for $partition", 1, 
replicas.size)
+      }
+      val savedProps = zkClient.getEntityConfigs(ConfigType.Topic, topic)
+      assertEquals(props, savedProps)
+    }
+
+    TestUtils.assertConcurrent("Concurrent topic creation failed", 
Seq(createTopic, createTopic),
+      JTestUtils.DEFAULT_MAX_WAIT_MS.toInt)
+  }
+
   /**
    * This test creates a topic with a few config overrides and checks that the 
configs are applied to the new topic
    * then changes the config and checks that the new values take effect.


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Concurrent createTopics calls may throw NodeExistsException
> -----------------------------------------------------------
>
>                 Key: KAFKA-7082
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7082
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 1.1.0
>            Reporter: Ismael Juma
>            Assignee: Ismael Juma
>            Priority: Critical
>              Labels: regression
>             Fix For: 2.0.1, 1.1.2
>
>
> This exception is unexpected causing an `UnknownServerException` to be thrown 
> back to the client. Example below:
> {code}
> org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = 
> NodeExists for /config/topics/connect-configs
>         at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:119)
>         at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
>         at kafka.zookeeper.AsyncResponse.maybeThrow(ZooKeeperClient.scala:472)
>         at kafka.zk.KafkaZkClient.createRecursive(KafkaZkClient.scala:1400)
>         at kafka.zk.KafkaZkClient.create$1(KafkaZkClient.scala:262)
>         at 
> kafka.zk.KafkaZkClient.setOrCreateEntityConfigs(KafkaZkClient.scala:269)
>         at 
> kafka.zk.AdminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(AdminZkClient.scala:99)
>         at kafka.server.AdminManager$$anonfun$2.apply(AdminManager.scala:126)
>         at kafka.server.AdminManager$$anonfun$2.apply(AdminManager.scala:81)
>         at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>         at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to