Hi all,
I’m trying the next piece of code I found on Google for simply consuming some messages from a Kafka topic, must be implemented in Java. The problem is that this code consumes a single message every time. How can I tell it to consume let’s say 1000 messages at a time? Currently this is the code snippet: while (it.hasNext()){ msg = new String(it.next().message()); } The whole code: package tst_mvn_proj_pkg; import java.util.HashMap; import java.util.Properties; import kafka.serializer.StringDecoder; import kafka.utils.VerifiableProperties; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.ConsumerTimeoutException; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class KafkaTest { private static final Logger logger = LoggerFactory.getLogger(KafkaTest.class); public static ConsumerConnector consumer; //java -jar .jar 192.168.3.85:2181 rzGroup12 topic_TOPOLOGY true false 60000 public static void main(String[] args) { String zookeeper = args[0]; //192.168.3.85:2181 //"192.168.5.4:2181"; String consumerGroup = args[1]; //"ConsumerGroupId_5"; String topic = args[2]; //"topic_TOPOLOGY"; String commitAtEnd = args[3]; String smallest = args[4]; String consumerTimeout = args[5]; //60000 String msg=""; int msgNum=0; logger.info("consuming topic: " + topic); Properties props = new Properties(); //-//props.put("zookeeper.connect","192.168.3.85:2181"); props.put("zookeeper.connect",zookeeper); props.put("group.id", consumerGroup); props.put("zookeeper.session.timeout.ms", "5000"); props.put("zookeeper.sync.time.ms", "250"); props.put("auto.commit.interval.ms", "1000"); //props.put("auto.offset.reset","largest"); //Start only from new messages props.put("consumer.timeout.ms",consumerTimeout); //Make sure loop doesn't hang props.put("auto.commit.enable", "false"); if(smallest.equals("true")){ System.out.println("Using SMALLEST auto reset"); props.put("auto.offset.reset","smallest"); //Start from first message possible consumer never read before } ConsumerConfig consumerConfig = new ConsumerConfig(props); consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); java.util.Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic,new Integer(1)); StringDecoder decoder = new StringDecoder(new VerifiableProperties()); KafkaStream<String, String> stream = consumer.createMessageStreams(topicCountMap, decoder, decoder).get(topic).get(0); ConsumerIterator<String,String> iterator = stream.iterator(); while(hasNext(iterator)) { msgNum++; msg = iterator.next().message(); if(msgNum%10 == 0){ logger.info(" Kafka Msg " + msgNum + ": " + msg.substring(0, 300) + "..."); } } if(commitAtEnd.equals("true")){ System.out.println("committing Offset to ZK"); consumer.commitOffsets(true); //committing consumer index to zk } consumer.shutdown(); logger.info("Consumer Done"); } private static boolean hasNext(ConsumerIterator<String,String> iterator) { boolean result = false; try { iterator.hasNext(); result = true; } catch (ConsumerTimeoutException e) { System.out.println("ConsumerTimeoutException -> " + e.getMessage()); result = false; } return result; } } Thanks a lot, Roni