Hi Kafka Dev Team, Let me know if new Producer is supporting sending data to single partition (without blocking). We have use-case for Aggregating Events.
Thanks, Bhavesh On Mon, Oct 13, 2014 at 10:54 AM, Bhavesh Mistry <mistry.p.bhav...@gmail.com > wrote: > Hi Kafka Dev Team, > > When I run the test to send message to single partition for 3 minutes or > so on, I have encounter deadlock (please see the screen attached) and > thread contention from YourKit profiling. > > Use Case: > > 1) Aggregating messages into same partition for metric counting. > 2) Replicate Old Producer behavior for sticking to partition for 3 > minutes. > > > Here is output: > > Frozen threads found (potential deadlock) > > It seems that the following threads have not changed their stack for more > than 10 seconds. > These threads are possibly (but not necessarily!) in a deadlock or hung. > > pool-1-thread-128 <--- Frozen for at least 2m > org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, > byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 > org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, > Callback) KafkaProducer.java:237 > org.kafka.test.TestNetworkDownProducer$MyProducer.run() > TestNetworkDownProducer.java:84 > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) > ThreadPoolExecutor.java:1145 > java.util.concurrent.ThreadPoolExecutor$Worker.run() > ThreadPoolExecutor.java:615 > java.lang.Thread.run() Thread.java:744 > > > > pool-1-thread-159 <--- Frozen for at least 2m 1 sec > org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, > byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 > org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, > Callback) KafkaProducer.java:237 > org.kafka.test.TestNetworkDownProducer$MyProducer.run() > TestNetworkDownProducer.java:84 > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) > ThreadPoolExecutor.java:1145 > java.util.concurrent.ThreadPoolExecutor$Worker.run() > ThreadPoolExecutor.java:615 > java.lang.Thread.run() Thread.java:744 > > > > pool-1-thread-55 <--- Frozen for at least 2m > org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, > byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 > org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, > Callback) KafkaProducer.java:237 > org.kafka.test.TestNetworkDownProducer$MyProducer.run() > TestNetworkDownProducer.java:84 > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) > ThreadPoolExecutor.java:1145 > java.util.concurrent.ThreadPoolExecutor$Worker.run() > ThreadPoolExecutor.java:615 > java.lang.Thread.run() Thread.java:744 > > > > >