HI Jay, Yes, it is reproducible quite easily. The problem is synchronized in RecordAccumulator. You can easy produce it. I have attached the Java code in my original email. Due to Application threads enqueue message into single partition is causing thrad contention and application thread may be blocked on this for more than a 2 minutes as shown in original email. Let me know if you need more information.
Last Commit I tested with: commit 68b9f7716df1d994a9d43bec6bc42c90e66f1e99 Author: Anton Karamanov <atara...@gmail.com> Date: Tue Oct 7 18:22:31 2014 -0700 kafka-1644; Inherit FetchResponse from RequestOrResponse; patched by Anton Karamanov; reviewed by Jun Rao Thanks, Bhavesh On Tue, Oct 14, 2014 at 10:16 AM, Jay Kreps <jay.kr...@gmail.com> wrote: > Hey Bhavesh, > > This sounds like a problem. Just to confirm this is after the fix for > KAFKA-1673? > > https://issues.apache.org/jira/browse/KAFKA-1673 > > It sounds like you have a reproducible test case? > > -Jay > > > On Mon, Oct 13, 2014 at 10:54 AM, Bhavesh Mistry < > mistry.p.bhav...@gmail.com > > wrote: > > > Hi Kafka Dev Team, > > > > When I run the test to send message to single partition for 3 minutes or > > so on, I have encounter deadlock (please see the screen attached) and > > thread contention from YourKit profiling. > > > > Use Case: > > > > 1) Aggregating messages into same partition for metric counting. > > 2) Replicate Old Producer behavior for sticking to partition for 3 > > minutes. > > > > > > Here is output: > > > > Frozen threads found (potential deadlock) > > > > It seems that the following threads have not changed their stack for more > > than 10 seconds. > > These threads are possibly (but not necessarily!) in a deadlock or hung. > > > > pool-1-thread-128 <--- Frozen for at least 2m > > > org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, > > byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 > > org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, > > Callback) KafkaProducer.java:237 > > org.kafka.test.TestNetworkDownProducer$MyProducer.run() > > TestNetworkDownProducer.java:84 > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) > > ThreadPoolExecutor.java:1145 > > java.util.concurrent.ThreadPoolExecutor$Worker.run() > > ThreadPoolExecutor.java:615 > > java.lang.Thread.run() Thread.java:744 > > > > > > > > pool-1-thread-159 <--- Frozen for at least 2m 1 sec > > > org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, > > byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 > > org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, > > Callback) KafkaProducer.java:237 > > org.kafka.test.TestNetworkDownProducer$MyProducer.run() > > TestNetworkDownProducer.java:84 > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) > > ThreadPoolExecutor.java:1145 > > java.util.concurrent.ThreadPoolExecutor$Worker.run() > > ThreadPoolExecutor.java:615 > > java.lang.Thread.run() Thread.java:744 > > > > > > > > pool-1-thread-55 <--- Frozen for at least 2m > > > org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, > > byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 > > org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, > > Callback) KafkaProducer.java:237 > > org.kafka.test.TestNetworkDownProducer$MyProducer.run() > > TestNetworkDownProducer.java:84 > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) > > ThreadPoolExecutor.java:1145 > > java.util.concurrent.ThreadPoolExecutor$Worker.run() > > ThreadPoolExecutor.java:615 > > java.lang.Thread.run() Thread.java:744 > > > > > > > > > > >