Hi all, I'm using Kafka 0.8 and I've ran into an issue with ConsumerConnector. Steps to reproduce:
1. Start single-broker Kafka cluster with auto.create.topic.enable set to "true" 2. Start ConsumerConnector on topic (which does not yet exist) with auto.offset.reset set to "smallest". 3. Produce some data to the topic. 4. Bring Zookeeper down 5. Call ConsumerConnector.close() Observation - the call blocks forever with the following stack trace: java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000000c6bf0570> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkUntil(LockSupport.java:237) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUntil(AbstractQueuedSynchronizer.java:2072) at org.I0Itec.zkclient.ZkClient.waitForKeeperState(ZkClient.java:636) at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:619) at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:615) at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:679) at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813) at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808) at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777) at kafka.utils.ZkUtils$.updatePersistentPath(Unknown Source) at kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3$$anonfun$apply$5.apply(Unknown Source) at kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3$$anonfun$apply$5.apply(Unknown Source) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3.apply(Unknown Source) at kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3.apply(Unknown Source) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at kafka.utils.Pool$$anon$1.foreach(Unknown Source) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at kafka.utils.Pool.foreach(Unknown Source) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(Unknown Source) at kafka.consumer.ZookeeperConsumerConnector.liftedTree1$1(Unknown Source) at kafka.consumer.ZookeeperConsumerConnector.shutdown(Unknown Source) - locked <0x00000000c6be0c60> (a java.lang.Object) at kafka.javaapi.consumer.ZookeeperConsumerConnector.shutdown(Unknown Source) ... Once I set "auto.commit.enable" to "false", the problem is gone. This is identical to what's described in https://issues.apache.org/jira/browse/KAFKA-601, with the only difference that it applies to Kafka 0.8 rather than Kafka 0.7.2. Any way to solve this issue other than disabling auto-commit? Thanks, Yury