[ https://issues.apache.org/jira/browse/KAFKA-270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14328741#comment-14328741 ]
Shridhar commented on KAFKA-270: -------------------------------- Any updates on this issue. > sync producer / consumer test producing lot of kafka server exceptions & not > getting the throughput mentioned here > http://incubator.apache.org/kafka/performance.html > ---------------------------------------------------------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-270 > URL: https://issues.apache.org/jira/browse/KAFKA-270 > Project: Kafka > Issue Type: Bug > Components: clients, core > Affects Versions: 0.7 > Environment: Linux 2.6.18-238.1.1.el5 , x86_64 x86_64 x86_64 > GNU/Linux > ext3 file system with raid10 > Reporter: Praveen Ramachandra > Labels: clients, core, newdev, performance > > I am getting ridiculously low producer and consumer throughput. > I am using default config values for producer, consumer and broker > which are very good starting points, as they should yield sufficient > throughput. > Appreciate if you point what settings/changes-in-code needs to be done > to get higher throughput. > I changed num.partitions in the server.config to 10. > Please look below for exception and error messages from the server > BTW: I am running server, zookeeper, producer, consumer on the same host. > ====Consumer Code===== > long startTime = System.currentTimeMillis(); > long endTime = startTime + runDuration*1000l; > Properties props = new Properties(); > props.put("zk.connect", "localhost:2181"); > props.put("groupid", subscriptionName); // to support multiple > subscribers > props.put("zk.sessiontimeout.ms", "400"); > props.put("zk.synctime.ms", "200"); > props.put("autocommit.interval.ms", "1000"); > consConfig = new ConsumerConfig(props); > consumer = > kafka.consumer.Consumer.createJavaConsumerConnector(consConfig); > Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); > topicCountMap.put(topicName, new Integer(1)); // has the topic > to which to subscribe to > Map<String, List<KafkaMessageStream<Message>>> consumerMap = > consumer.createMessageStreams(topicCountMap); > KafkaMessageStream<Message> stream = > consumerMap.get(topicName).get(0); > ConsumerIterator<Message> it = stream.iterator(); > while(System.currentTimeMillis() <= endTime ) > { > it.next(); // discard data > consumeMsgCount.incrementAndGet(); > } > ====End consumer CODE============================ > =====Producer CODE======================== > props.put("serializer.class", "kafka.serializer.StringEncoder"); > props.put("zk.connect", "localhost:2181"); > // Use random partitioner. Don't need the key type. Just > set it to Integer. > // The message is of type String. > producer = new kafka.javaapi.producer.Producer<Integer, > String>(new ProducerConfig(props)); > long endTime = startTime + runDuration*1000l; // run duration > is in seconds > while(System.currentTimeMillis() <= endTime ) > { > String msg = > org.apache.commons.lang.RandomStringUtils.random(msgSizes.get(0)); > producer.send(new ProducerData<Integer, String>(topicName, msg)); > pc.incrementAndGet(); > } > java.util.Date date = new java.util.Date(System.currentTimeMillis()); > System.out.println(date+" :: stopped producer for topic"+topicName); > =====END Producer CODE======================== > I see a bunch of exceptions like this > [2012-02-11 02:44:11,945] ERROR Closing socket for /188.125.88.145 because of > error (kafka.network.Processor) > java.io.IOException: Connection reset by peer > at sun.nio.ch.FileChannelImpl.transferTo0(Native Method) > at > sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:405) > at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:506) > at kafka.message.FileMessageSet.writeTo(FileMessageSet.scala:107) > at kafka.server.MessageSetSend.writeTo(MessageSetSend.scala:51) > at kafka.network.MultiSend.writeTo(Transmission.scala:95) > at kafka.network.Processor.write(SocketServer.scala:332) > at kafka.network.Processor.run(SocketServer.scala:209) > at java.lang.Thread.run(Thread.java:662) > java.io.IOException: Connection reset by peer > at sun.nio.ch.FileDispatcher.read0(Native Method) > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21) > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198) > at sun.nio.ch.IOUtil.read(IOUtil.java:171) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:243) > at kafka.utils.Utils$.read(Utils.scala:485) > at > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > at kafka.network.Processor.read(SocketServer.scala:304) > at kafka.network.Processor.run(SocketServer.scala:207) > at java.lang.Thread.run(Thread.java:662) > And Many INFO messages e.g., > INFO: Expiring session 0x1356a43167e0009, timeout of 6000ms exceeded > (org.apache.zookeeper.server.ZooKeeperServer) > INFO: Closed socket connection for client /127.0.0.1:59884 which had > sessionid 0x1356a43167e0022 (org.apache.zookeeper.server.NIOServerCnxn) -- This message was sent by Atlassian JIRA (v6.3.4#6332)