[ https://issues.apache.org/jira/browse/KAFKA-1764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14211833#comment-14211833 ]
Joel Koshy commented on KAFKA-1764: ----------------------------------- That was my bad - I should have double-checked that before check-in. [~copester] thanks for pointing it out. I can't dig into it now so I reverted the checkin. [~becket_qin] please take a look > ZookeeperConsumerConnector could put multiple shutdownCommand to the same > data chunk queue. > ------------------------------------------------------------------------------------------- > > Key: KAFKA-1764 > URL: https://issues.apache.org/jira/browse/KAFKA-1764 > Project: Kafka > Issue Type: Bug > Reporter: Jiangjie Qin > Assignee: Jiangjie Qin > Attachments: KAFKA-1764.patch, KAFKA-1764_2014-11-12_14:05:35.patch > > > In ZookeeperConsumerConnector shutdown(), we could potentially put multiple > shutdownCommand into the same data chunk queue, provided the topics are > sharing the same data chunk queue in topicThreadIdAndQueues. > From email thread to document: > In ZookeeperConsumerConnector shutdown(), we could potentially put > multiple shutdownCommand into the same data chunk queue, provided the > topics are sharing the same data chunk queue in topicThreadIdAndQueues. > In our case, we only have 1 consumer stream for all the topics, the data > chunk queue capacity is set to 1. The execution sequence causing problem is > as below: > 1. ZookeeperConsumerConnector shutdown() is called, it tries to put > shutdownCommand for each queue in topicThreadIdAndQueues. Since we only > have 1 queue, multiple shutdownCommand will be put into the queue. > 2. In sendShutdownToAllQueues(), between queue.clean() and > queue.put(shutdownCommand), consumer iterator receives the shutdownCommand > and put it back into the data chunk queue. After that, > ZookeeperConsumerConnector tries to put another shutdownCommand into the > data chunk queue but will block forever. > The thread stack trace is as below: > {code} > "Thread-23" #58 prio=5 os_prio=0 tid=0x00007ff440004800 nid=0x40a waiting > on condition [0x00007ff4f0124000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0000000680b96bf0> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) > at > java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350) > at > kafka.consumer.ZookeeperConsumerConnector$$anonfun$sendShutdownToAllQueues$1.apply(ZookeeperConsumerConnector.scala:262) > at > kafka.consumer.ZookeeperConsumerConnector$$anonfun$sendShutdownToAllQueues$1.apply(ZookeeperConsumerConnector.scala:259) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > kafka.consumer.ZookeeperConsumerConnector.sendShutdownToAllQueues(ZookeeperConsumerConnector.scala:259) > at > kafka.consumer.ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:199) > at > kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:192) > - locked <0x0000000680dd5848> (a java.lang.Object) > at > kafka.tools.MirrorMaker$$anonfun$cleanShutdown$1.apply(MirrorMaker.scala:185) > at > kafka.tools.MirrorMaker$$anonfun$cleanShutdown$1.apply(MirrorMaker.scala:185) > at scala.collection.immutable.List.foreach(List.scala:318) > at kafka.tools.MirrorMaker$.cleanShutdown(MirrorMaker.scala:185) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)