Hi all,

I'm using Kafka 0.8 and I've ran into an issue with ConsumerConnector.
Steps to reproduce:

1. Start single-broker Kafka cluster with auto.create.topic.enable set to
"true"
2. Start ConsumerConnector on topic (which does not yet exist) with
auto.offset.reset set to "smallest".
3. Produce some data to the topic.
4. Bring Zookeeper down
5. Call ConsumerConnector.close()

Observation - the call blocks forever with the following stack trace:

   java.lang.Thread.State: TIMED_WAITING (parking)

        at sun.misc.Unsafe.park(Native Method)

        - parking to wait for  <0x00000000c6bf0570> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)

        at
java.util.concurrent.locks.LockSupport.parkUntil(LockSupport.java:237)

        at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUntil(AbstractQueuedSynchronizer.java:2072)

        at
org.I0Itec.zkclient.ZkClient.waitForKeeperState(ZkClient.java:636)

        at
org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:619)

        at
org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:615)

        at
org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:679)

        at
org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)

        at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)

        at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)

        at kafka.utils.ZkUtils$.updatePersistentPath(Unknown Source)

        at
kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3$$anonfun$apply$5.apply(Unknown
Source)

        at
kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3$$anonfun$apply$5.apply(Unknown
Source)

        at scala.collection.Iterator$class.foreach(Iterator.scala:727)

        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

        at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

        at
kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3.apply(Unknown
Source)

        at
kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3.apply(Unknown
Source)

        at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)

        at scala.collection.Iterator$class.foreach(Iterator.scala:727)

        at kafka.utils.Pool$$anon$1.foreach(Unknown Source)

        at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

        at kafka.utils.Pool.foreach(Unknown Source)

        at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)

        at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(Unknown
Source)

        at kafka.consumer.ZookeeperConsumerConnector.liftedTree1$1(Unknown
Source)

        at kafka.consumer.ZookeeperConsumerConnector.shutdown(Unknown
Source)

        - locked <0x00000000c6be0c60> (a java.lang.Object)

        at
kafka.javaapi.consumer.ZookeeperConsumerConnector.shutdown(Unknown Source)
       ...

Once I set "auto.commit.enable" to "false", the problem is gone. This is
identical to what's described in
https://issues.apache.org/jira/browse/KAFKA-601, with the only difference
that it applies to Kafka 0.8 rather than Kafka 0.7.2. Any way to solve this
issue other than disabling auto-commit?

Thanks,
Yury

Reply via email to