[ 
https://issues.apache.org/jira/browse/KAFKA-1764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-1764:
------------------------------
    Description: 
In ZookeeperConsumerConnector shutdown(), we could potentially put multiple 
shutdownCommand into the same data chunk queue, provided the topics are sharing 
the same data chunk queue in topicThreadIdAndQueues.

>From email thread to document:

In ZookeeperConsumerConnector shutdown(), we could potentially put
multiple shutdownCommand into the same data chunk queue, provided the
topics are sharing the same data chunk queue in topicThreadIdAndQueues.

In our case, we only have 1 consumer stream for all the topics, the data
chunk queue capacity is set to 1. The execution sequence causing problem is
as below:
1. ZookeeperConsumerConnector shutdown() is called, it tries to put
shutdownCommand for each queue in topicThreadIdAndQueues. Since we only
have 1 queue, multiple shutdownCommand will be put into the queue.
2. In sendShutdownToAllQueues(), between queue.clean() and
queue.put(shutdownCommand), consumer iterator receives the shutdownCommand
and put it back into the data chunk queue. After that,
ZookeeperConsumerConnector tries to put another shutdownCommand into the
data chunk queue but will block forever.

The thread stack trace is as below:
{code}
"Thread-23" #58 prio=5 os_prio=0 tid=0x00007ff440004800 nid=0x40a waiting
on condition [0x00007ff4f0124000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x0000000680b96bf0> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at
java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350)
        at
kafka.consumer.ZookeeperConsumerConnector$$anonfun$sendShutdownToAllQueues$1.apply(ZookeeperConsumerConnector.scala:262)
        at
kafka.consumer.ZookeeperConsumerConnector$$anonfun$sendShutdownToAllQueues$1.apply(ZookeeperConsumerConnector.scala:259)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at
kafka.consumer.ZookeeperConsumerConnector.sendShutdownToAllQueues(ZookeeperConsumerConnector.scala:259)
        at
kafka.consumer.ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:199)
        at
kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:192)
        - locked <0x0000000680dd5848> (a java.lang.Object)
        at
kafka.tools.MirrorMaker$$anonfun$cleanShutdown$1.apply(MirrorMaker.scala:185)
        at
kafka.tools.MirrorMaker$$anonfun$cleanShutdown$1.apply(MirrorMaker.scala:185)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at kafka.tools.MirrorMaker$.cleanShutdown(MirrorMaker.scala:185)
{code}

  was:In ZookeeperConsumerConnector shutdown(), we could potentially put 
multiple shutdownCommand into the same data chunk queue, provided the topics 
are sharing the same data chunk queue in topicThreadIdAndQueues.


> ZookeeperConsumerConnector could put multiple shutdownCommand to the same 
> data chunk queue.
> -------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-1764
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1764
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Jiangjie Qin
>            Assignee: Jiangjie Qin
>         Attachments: KAFKA-1764.patch, KAFKA-1764_2014-11-12_14:05:35.patch
>
>
> In ZookeeperConsumerConnector shutdown(), we could potentially put multiple 
> shutdownCommand into the same data chunk queue, provided the topics are 
> sharing the same data chunk queue in topicThreadIdAndQueues.
> From email thread to document:
> In ZookeeperConsumerConnector shutdown(), we could potentially put
> multiple shutdownCommand into the same data chunk queue, provided the
> topics are sharing the same data chunk queue in topicThreadIdAndQueues.
> In our case, we only have 1 consumer stream for all the topics, the data
> chunk queue capacity is set to 1. The execution sequence causing problem is
> as below:
> 1. ZookeeperConsumerConnector shutdown() is called, it tries to put
> shutdownCommand for each queue in topicThreadIdAndQueues. Since we only
> have 1 queue, multiple shutdownCommand will be put into the queue.
> 2. In sendShutdownToAllQueues(), between queue.clean() and
> queue.put(shutdownCommand), consumer iterator receives the shutdownCommand
> and put it back into the data chunk queue. After that,
> ZookeeperConsumerConnector tries to put another shutdownCommand into the
> data chunk queue but will block forever.
> The thread stack trace is as below:
> {code}
> "Thread-23" #58 prio=5 os_prio=0 tid=0x00007ff440004800 nid=0x40a waiting
> on condition [0x00007ff4f0124000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x0000000680b96bf0> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>         at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>         at
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350)
>         at
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$sendShutdownToAllQueues$1.apply(ZookeeperConsumerConnector.scala:262)
>         at
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$sendShutdownToAllQueues$1.apply(ZookeeperConsumerConnector.scala:259)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>         at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at
> kafka.consumer.ZookeeperConsumerConnector.sendShutdownToAllQueues(ZookeeperConsumerConnector.scala:259)
>         at
> kafka.consumer.ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:199)
>         at
> kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:192)
>         - locked <0x0000000680dd5848> (a java.lang.Object)
>         at
> kafka.tools.MirrorMaker$$anonfun$cleanShutdown$1.apply(MirrorMaker.scala:185)
>         at
> kafka.tools.MirrorMaker$$anonfun$cleanShutdown$1.apply(MirrorMaker.scala:185)
>         at scala.collection.immutable.List.foreach(List.scala:318)
>         at kafka.tools.MirrorMaker$.cleanShutdown(MirrorMaker.scala:185)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to