Thank you. 1. Is this Scala code indeed the source code of the consumer that I am using?: (import kafka.consumer.*) kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig).createMessageStreams(topicCountMap, decoder, decoder).get(topic).get(0).iterator();
2. Even if that's so, can you please say how can I fetch a btach of 1000 messages into my internal memory? (For the very least this will enable me to validate what you're saying, by comparing the time it would take for KafkaConsumer to fetch a single batch of 1000 messages to the time it would take for 1000 fetches of a single message every time). -----Original Message----- From: Thanh Hong Dai [mailto:hdth...@tma.com.vn] Sent: Tuesday, 24 November, 2015 04:52 To: users@kafka.apache.org Subject: RE: java kafka consumer: how to read a BATCH of messages? If the number of network round trips is your concern, from the source code of https://github.com/apache/kafka/blob/0.9.0/core/src/main/scala/kafka/consume r/ConsumerIterator.scala, it seems that the data is read in chunks internally, and it will fetch the next chunk over the network when the iterator exhausts the current chunk. -----Original Message----- From: Peleg, Roni [mailto:roni.pe...@arris.com] Sent: Sunday, 22 November, 2015 8:03 PM To: users@kafka.apache.org Subject: java kafka consumer: how to read a BATCH of messages? Hi all, I'm trying the next piece of code I found on Google for simply consuming some messages from a Kafka topic, must be implemented in Java. The problem is that this code consumes a single message every time. How can I tell it to consume let's say 1000 messages at a time? Currently this is the code snippet: while (it.hasNext()){ msg = new String(it.next().message()); } The whole code: package tst_mvn_proj_pkg; import java.util.HashMap; import java.util.Properties; import kafka.serializer.StringDecoder; import kafka.utils.VerifiableProperties; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.ConsumerTimeoutException; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class KafkaTest { private static final Logger logger = LoggerFactory.getLogger(KafkaTest.class); public static ConsumerConnector consumer; //java -jar .jar 192.168.3.85:2181 rzGroup12 topic_TOPOLOGY true false 60000 public static void main(String[] args) { String zookeeper = args[0]; //192.168.3.85:2181 //"192.168.5.4:2181"; String consumerGroup = args[1]; //"ConsumerGroupId_5"; String topic = args[2]; //"topic_TOPOLOGY"; String commitAtEnd = args[3]; String smallest = args[4]; String consumerTimeout = args[5]; //60000 String msg=""; int msgNum=0; logger.info("consuming topic: " + topic); Properties props = new Properties(); //-//props.put("zookeeper.connect","192.168.3.85:2181"); props.put("zookeeper.connect",zookeeper); props.put("group.id", consumerGroup); props.put("zookeeper.session.timeout.ms", "5000"); props.put("zookeeper.sync.time.ms", "250"); props.put("auto.commit.interval.ms", "1000"); //props.put("auto.offset.reset","largest"); //Start only from new messages props.put("consumer.timeout.ms",consumerTimeout); //Make sure loop doesn't hang props.put("auto.commit.enable", "false"); if(smallest.equals("true")){ System.out.println("Using SMALLEST auto reset"); props.put("auto.offset.reset","smallest"); //Start from first message possible consumer never read before } ConsumerConfig consumerConfig = new ConsumerConfig(props); consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); java.util.Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic,new Integer(1)); StringDecoder decoder = new StringDecoder(new VerifiableProperties()); KafkaStream<String, String> stream = consumer.createMessageStreams(topicCountMap, decoder, decoder).get(topic).get(0); ConsumerIterator<String,String> iterator = stream.iterator(); while(hasNext(iterator)) { msgNum++; msg = iterator.next().message(); if(msgNum%10 == 0){ logger.info("\nKafka Msg " + msgNum + ": " + msg.substring(0, 300) + "..."); } } if(commitAtEnd.equals("true")){ System.out.println("committing Offset to ZK"); consumer.commitOffsets(true); //committing consumer index to zk } consumer.shutdown(); logger.info("Consumer Done"); } private static boolean hasNext(ConsumerIterator<String,String> iterator) { boolean result = false; try { iterator.hasNext(); result = true; } catch (ConsumerTimeoutException e) { System.out.println("ConsumerTimeoutException -> " + e.getMessage()); result = false; } return result; } } Thanks a lot, Roni