Hi, We encountered a production issue recently that Mirror Maker could not properly shutdown because ZookeeperConsumerConnector is blocked on shutdown(). After looking into the code, we found 2 issues that caused this problem.
1. After consumer iterator receives the shutdownCommand, It puts the shutdownCommand back into the data chunk queue. Is there any reason for doing this? 2. In ZookeeperConsumerConnector shutdown(), we could potentially put multiple shutdownCommand into the same data chunk queue, provided the topics are sharing the same data chunk queue in topicThreadIdAndQueues. (KAFKA-1764 is opened) In our case, we only have 1 consumer stream for all the topics, the data chunk queue capacity is set to 1. The execution sequence causing problem is as below: 1. ZookeeperConsumerConnector shutdown() is called, it tries to put shutdownCommand for each queue in topicThreadIdAndQueues. Since we only have 1 queue, multiple shutdownCommand will be put into the queue. 2. In sendShutdownToAllQueues(), between queue.clean() and queue.put(shutdownCommand), consumer iterator receives the shutdownCommand and put it back into the data chunk queue. After that, ZookeeperConsumerConnector tries to put another shutdownCommand into the data chunk queue but will block forever. The thread stack trace is as below: "Thread-23" #58 prio=5 os_prio=0 tid=0x00007ff440004800 nid=0x40a waiting on condition [0x00007ff4f0124000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0000000680b96bf0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350) at kafka.consumer.ZookeeperConsumerConnector$$anonfun$sendShutdownToAllQueues$1.apply(ZookeeperConsumerConnector.scala:262) at kafka.consumer.ZookeeperConsumerConnector$$anonfun$sendShutdownToAllQueues$1.apply(ZookeeperConsumerConnector.scala:259) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.consumer.ZookeeperConsumerConnector.sendShutdownToAllQueues(ZookeeperConsumerConnector.scala:259) at kafka.consumer.ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:199) at kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:192) - locked <0x0000000680dd5848> (a java.lang.Object) at kafka.tools.MirrorMaker$$anonfun$cleanShutdown$1.apply(MirrorMaker.scala:185) at kafka.tools.MirrorMaker$$anonfun$cleanShutdown$1.apply(MirrorMaker.scala:185) at scala.collection.immutable.List.foreach(List.scala:318) at kafka.tools.MirrorMaker$.cleanShutdown(MirrorMaker.scala:185) at kafka.tools.MirrorMaker$$anon$1.run(MirrorMaker.scala:169) Thanks. Jiangjie (Becket) Qin