HI Jay,

Yes, it is reproducible quite easily.  The problem is synchronized in
RecordAccumulator.  You can easy produce it.  I have attached the Java code
in my original email.  Due to Application threads enqueue message into
single partition is causing thrad contention and application thread may be
blocked on this for more than a 2 minutes as shown in original email.   Let
me know if you need more information.

Last Commit I tested with:

commit 68b9f7716df1d994a9d43bec6bc42c90e66f1e99
Author: Anton Karamanov <atara...@gmail.com>
Date:   Tue Oct 7 18:22:31 2014 -0700

    kafka-1644; Inherit FetchResponse from RequestOrResponse; patched by
Anton Karamanov; reviewed by Jun Rao

Thanks,

Bhavesh

On Tue, Oct 14, 2014 at 10:16 AM, Jay Kreps <jay.kr...@gmail.com> wrote:

> Hey Bhavesh,
>
> This sounds like a problem. Just to confirm this is after the fix for
> KAFKA-1673?
>
> https://issues.apache.org/jira/browse/KAFKA-1673
>
> It sounds like you have a reproducible test case?
>
> -Jay
>
>
> On Mon, Oct 13, 2014 at 10:54 AM, Bhavesh Mistry <
> mistry.p.bhav...@gmail.com
> > wrote:
>
> > Hi Kafka Dev Team,
> >
> > When I run the test to send message to single partition for 3 minutes or
> > so on, I have encounter deadlock (please see the screen attached) and
> > thread contention from YourKit profiling.
> >
> > Use Case:
> >
> > 1)  Aggregating messages into same partition for metric counting.
> > 2)  Replicate Old Producer behavior for sticking to partition for 3
> > minutes.
> >
> >
> > Here is output:
> >
> > Frozen threads found (potential deadlock)
> >
> > It seems that the following threads have not changed their stack for more
> > than 10 seconds.
> > These threads are possibly (but not necessarily!) in a deadlock or hung.
> >
> > pool-1-thread-128 <--- Frozen for at least 2m
> >
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
> > byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
> > org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord,
> > Callback) KafkaProducer.java:237
> > org.kafka.test.TestNetworkDownProducer$MyProducer.run()
> > TestNetworkDownProducer.java:84
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker)
> > ThreadPoolExecutor.java:1145
> > java.util.concurrent.ThreadPoolExecutor$Worker.run()
> > ThreadPoolExecutor.java:615
> > java.lang.Thread.run() Thread.java:744
> >
> >
> >
> > pool-1-thread-159 <--- Frozen for at least 2m 1 sec
> >
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
> > byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
> > org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord,
> > Callback) KafkaProducer.java:237
> > org.kafka.test.TestNetworkDownProducer$MyProducer.run()
> > TestNetworkDownProducer.java:84
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker)
> > ThreadPoolExecutor.java:1145
> > java.util.concurrent.ThreadPoolExecutor$Worker.run()
> > ThreadPoolExecutor.java:615
> > java.lang.Thread.run() Thread.java:744
> >
> >
> >
> > pool-1-thread-55 <--- Frozen for at least 2m
> >
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
> > byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
> > org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord,
> > Callback) KafkaProducer.java:237
> > org.kafka.test.TestNetworkDownProducer$MyProducer.run()
> > TestNetworkDownProducer.java:84
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker)
> > ThreadPoolExecutor.java:1145
> > java.util.concurrent.ThreadPoolExecutor$Worker.run()
> > ThreadPoolExecutor.java:615
> > java.lang.Thread.run() Thread.java:744
> >
> >
> >
> >
> >
>

Reply via email to