[ https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14179014#comment-14179014 ]
Bhavesh Mistry edited comment on KAFKA-1710 at 10/21/14 8:28 PM: ----------------------------------------------------------------- [~jkreps], I am sorry I did not get back to you so soon. The cost of enqueue a message into single partition is ~54% as compare to round-robin. (test with 32 partitions to single topic and 3 broker cluster) The throughput is measuring the cost of put data into buffer only not cost of sending data to brokers. Here is test I have done: To *single* partition: Throughput per Thread=2666.6666666666665 byte(s)/microsecond All done...! To *all* partition: Throughput per Thread=5818.181818181818 byte(s)/microsecond All done...! {code} package org.kafka.test; import java.io.IOException; import java.io.InputStream; import java.util.Properties; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class TestNetworkDownProducer { static int numberTh = 75; static CountDownLatch latch = new CountDownLatch(numberTh); public static void main(String[] args) throws IOException, InterruptedException { //Thread.sleep(60000); Properties prop = new Properties(); InputStream propFile = Thread.currentThread().getContextClassLoader() .getResourceAsStream("kafkaproducer.properties"); String topic = "logmon.test"; prop.load(propFile); System.out.println("Property: " + prop.toString()); StringBuilder builder = new StringBuilder(1024); int msgLenth = 256; int numberOfLoop = 5000; for (int i = 0; i < msgLenth; i++) builder.append("a"); int numberOfProducer = 1; Producer[] producer = new Producer[numberOfProducer]; for (int i = 0; i < producer.length; i++) { producer[i] = new KafkaProducer(prop); } ExecutorService service = new ThreadPoolExecutor(numberTh, numberTh, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(numberTh *2)); MyProducer [] producerThResult = new MyProducer [numberTh]; for(int i = 0 ; i < numberTh;i++){ producerThResult[i] = new MyProducer(producer,numberOfLoop,builder.toString(), topic); service.execute(producerThResult[i]); } latch.await(); for (int i = 0; i < producer.length; i++) { producer[i].close(); } service.shutdownNow(); System.out.println("All Producers done...!"); // now interpret the result... of this... long lowestTime = 0 ; for(int i =0 ; i < producerThResult.length;i++){ if(i == 1){ lowestTime = producerThResult[i].totalTimeinNano; }else if ( producerThResult[i].totalTimeinNano < lowestTime){ lowestTime = producerThResult[i].totalTimeinNano; } } long bytesSend = msgLenth * numberOfLoop; long durationInMs = TimeUnit.MILLISECONDS.convert(lowestTime, TimeUnit.NANOSECONDS); double throughput = (bytesSend * 1.0) / (durationInMs); System.out.println("Throughput per Thread=" + throughput + " byte(s)/microsecond"); System.out.println("All done...!"); } static class MyProducer implements Callable<Long> , Runnable { Producer[] producer; long maxloops; String msg ; String topic; long totalTimeinNano = 0; MyProducer(Producer[] list, long maxloops,String msg,String topic){ this.producer = list; this.maxloops = maxloops; this.msg = msg; this.topic = topic; } public void run() { // ALWAYS SEND DATA TO PARTITION 1 only... //ProducerRecord record = new ProducerRecord(topic, 1,null,msg.toString().getBytes()); ProducerRecord record = new ProducerRecord(topic, null,null,msg.toString().getBytes()); Callback callBack = new MyCallback(); try{ for(long j=0 ; j < maxloops ; j++){ try { for (int i = 0; i < producer.length; i++) { long start = System.nanoTime(); producer[i].send(record, callBack); long end = System.nanoTime(); totalTimeinNano += (end-start); } Thread.sleep(10); } catch (Throwable th) { System.err.println("FATAL "); th.printStackTrace(); } } }finally { latch.countDown(); } } public Long call() throws Exception { run(); return totalTimeinNano; } } static class MyCallback implements Callback { public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null){ System.err.println("Msg dropped..!"); exception.printStackTrace(); } } } } {code} was (Author: bmis13): [~jkreps], I am sorry I did not get back to you soon. The cost of enqueue a message into single partition is ~54% as compare to round-robin. (test with 32 partition to single topic and 3 cluster) The throughput is measuring the cost of put data into buffer. Here is test I have done: To *single* partition: Throughput per Thread=2666.6666666666665 byte(s)/microsecond All done...! To *all* partition: Throughput per Thread=5818.181818181818 byte(s)/microsecond All done...! {code} package org.kafka.test; import java.io.IOException; import java.io.InputStream; import java.util.Properties; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class TestNetworkDownProducer { static int numberTh = 75; static CountDownLatch latch = new CountDownLatch(numberTh); public static void main(String[] args) throws IOException, InterruptedException { //Thread.sleep(60000); Properties prop = new Properties(); InputStream propFile = Thread.currentThread().getContextClassLoader() .getResourceAsStream("kafkaproducer.properties"); String topic = "logmon.test"; prop.load(propFile); System.out.println("Property: " + prop.toString()); StringBuilder builder = new StringBuilder(1024); int msgLenth = 256; int numberOfLoop = 5000; for (int i = 0; i < msgLenth; i++) builder.append("a"); int numberOfProducer = 1; Producer[] producer = new Producer[numberOfProducer]; for (int i = 0; i < producer.length; i++) { producer[i] = new KafkaProducer(prop); } ExecutorService service = new ThreadPoolExecutor(numberTh, numberTh, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(numberTh *2)); MyProducer [] producerThResult = new MyProducer [numberTh]; for(int i = 0 ; i < numberTh;i++){ producerThResult[i] = new MyProducer(producer,numberOfLoop,builder.toString(), topic); service.execute(producerThResult[i]); } latch.await(); for (int i = 0; i < producer.length; i++) { producer[i].close(); } service.shutdownNow(); System.out.println("All Producers done...!"); // now interpret the result... of this... long lowestTime = 0 ; for(int i =0 ; i < producerThResult.length;i++){ if(i == 1){ lowestTime = producerThResult[i].totalTimeinNano; }else if ( producerThResult[i].totalTimeinNano < lowestTime){ lowestTime = producerThResult[i].totalTimeinNano; } } long bytesSend = msgLenth * numberOfLoop; long durationInMs = TimeUnit.MILLISECONDS.convert(lowestTime, TimeUnit.NANOSECONDS); double throughput = (bytesSend * 1.0) / (durationInMs); System.out.println("Throughput per Thread=" + throughput + " byte(s)/microsecond"); System.out.println("All done...!"); } static class MyProducer implements Callable<Long> , Runnable { Producer[] producer; long maxloops; String msg ; String topic; long totalTimeinNano = 0; MyProducer(Producer[] list, long maxloops,String msg,String topic){ this.producer = list; this.maxloops = maxloops; this.msg = msg; this.topic = topic; } public void run() { // ALWAYS SEND DATA TO PARTITION 1 only... //ProducerRecord record = new ProducerRecord(topic, 1,null,msg.toString().getBytes()); ProducerRecord record = new ProducerRecord(topic, null,null,msg.toString().getBytes()); Callback callBack = new MyCallback(); try{ for(long j=0 ; j < maxloops ; j++){ try { for (int i = 0; i < producer.length; i++) { long start = System.nanoTime(); producer[i].send(record, callBack); long end = System.nanoTime(); totalTimeinNano += (end-start); } Thread.sleep(10); } catch (Throwable th) { System.err.println("FATAL "); th.printStackTrace(); } } }finally { latch.countDown(); } } public Long call() throws Exception { run(); return totalTimeinNano; } } static class MyCallback implements Callback { public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null){ System.err.println("Msg dropped..!"); exception.printStackTrace(); } } } } {code} > [New Java Producer Potential Deadlock] Producer Deadlock when all messages is > being sent to single partition > ------------------------------------------------------------------------------------------------------------ > > Key: KAFKA-1710 > URL: https://issues.apache.org/jira/browse/KAFKA-1710 > Project: Kafka > Issue Type: Bug > Components: producer > Environment: Development > Reporter: Bhavesh Mistry > Assignee: Ewen Cheslack-Postava > Priority: Critical > Labels: performance > Attachments: Screen Shot 2014-10-13 at 10.19.04 AM.png, Screen Shot > 2014-10-15 at 9.09.06 PM.png, Screen Shot 2014-10-15 at 9.14.15 PM.png, > TestNetworkDownProducer.java, th1.dump, th10.dump, th11.dump, th12.dump, > th13.dump, th14.dump, th15.dump, th2.dump, th3.dump, th4.dump, th5.dump, > th6.dump, th7.dump, th8.dump, th9.dump > > > Hi Kafka Dev Team, > When I run the test to send message to single partition for 3 minutes or so > on, I have encounter deadlock (please see the screen attached) and thread > contention from YourKit profiling. > Use Case: > 1) Aggregating messages into same partition for metric counting. > 2) Replicate Old Producer behavior for sticking to partition for 3 minutes. > Here is output: > Frozen threads found (potential deadlock) > > It seems that the following threads have not changed their stack for more > than 10 seconds. > These threads are possibly (but not necessarily!) in a deadlock or hung. > > pool-1-thread-128 <--- Frozen for at least 2m > org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, > byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 > org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, > Callback) KafkaProducer.java:237 > org.kafka.test.TestNetworkDownProducer$MyProducer.run() > TestNetworkDownProducer.java:84 > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) > ThreadPoolExecutor.java:1145 > java.util.concurrent.ThreadPoolExecutor$Worker.run() > ThreadPoolExecutor.java:615 > java.lang.Thread.run() Thread.java:744 > pool-1-thread-159 <--- Frozen for at least 2m 1 sec > org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, > byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 > org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, > Callback) KafkaProducer.java:237 > org.kafka.test.TestNetworkDownProducer$MyProducer.run() > TestNetworkDownProducer.java:84 > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) > ThreadPoolExecutor.java:1145 > java.util.concurrent.ThreadPoolExecutor$Worker.run() > ThreadPoolExecutor.java:615 > java.lang.Thread.run() Thread.java:744 > pool-1-thread-55 <--- Frozen for at least 2m > org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, > byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 > org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, > Callback) KafkaProducer.java:237 > org.kafka.test.TestNetworkDownProducer$MyProducer.run() > TestNetworkDownProducer.java:84 > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) > ThreadPoolExecutor.java:1145 > java.util.concurrent.ThreadPoolExecutor$Worker.run() > ThreadPoolExecutor.java:615 > java.lang.Thread.run() Thread.java:744 > Thanks, > Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)