Hi all,

I’m trying the next piece of code I found on Google for simply consuming some 
messages from a Kafka topic, must be implemented in Java.
The problem is that this code consumes a single message every time.
How can I tell it to consume let’s say 1000 messages at a time?

Currently this is the code snippet:

    while (it.hasNext()){
      msg = new String(it.next().message());
    }



The whole code:

package tst_mvn_proj_pkg;

import java.util.HashMap;
import java.util.Properties;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.ConsumerTimeoutException;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaTest {
                private static final Logger logger = 
LoggerFactory.getLogger(KafkaTest.class);
                public static ConsumerConnector consumer;

                //java -jar .jar 192.168.3.85:2181 rzGroup12 topic_TOPOLOGY 
true false 60000
                public static void main(String[] args) {
                                String zookeeper = args[0]; //192.168.3.85:2181 
 //"192.168.5.4:2181";
                                String consumerGroup = args[1]; 
//"ConsumerGroupId_5";
                                String topic = args[2]; //"topic_TOPOLOGY";
                                String commitAtEnd = args[3];
                                String smallest = args[4];
                                String consumerTimeout = args[5]; //60000

                                String msg="";
                                int msgNum=0;
                                logger.info("consuming topic: " + topic);
                                Properties props = new Properties();
                    //-//props.put("zookeeper.connect","192.168.3.85:2181");
                                props.put("zookeeper.connect",zookeeper);
                    props.put("group.id", consumerGroup);
                    props.put("zookeeper.session.timeout.ms", "5000");
                    props.put("zookeeper.sync.time.ms", "250");
                    props.put("auto.commit.interval.ms", "1000");
                    //props.put("auto.offset.reset","largest"); //Start only 
from new messages
                    props.put("consumer.timeout.ms",consumerTimeout); //Make 
sure loop doesn't hang
                    props.put("auto.commit.enable", "false");


                    if(smallest.equals("true")){
                                System.out.println("Using SMALLEST auto reset");
                                props.put("auto.offset.reset","smallest"); 
//Start from first message possible consumer never read before
                    }


                    ConsumerConfig consumerConfig = new ConsumerConfig(props);
                    consumer = 
kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);


                    java.util.Map<String, Integer> topicCountMap = new 
HashMap<String, Integer>();
                    topicCountMap.put(topic,new Integer(1));

                    StringDecoder decoder = new StringDecoder(new 
VerifiableProperties());
                    KafkaStream<String, String> stream = 
consumer.createMessageStreams(topicCountMap, decoder, 
decoder).get(topic).get(0);
                    ConsumerIterator<String,String> iterator = 
stream.iterator();


                    while(hasNext(iterator)) {
                                msgNum++;
                                msg = iterator.next().message();
                                if(msgNum%10 == 0){
                                                logger.info(" Kafka Msg " + 
msgNum + ": " + msg.substring(0, 300) + "...");
                                }
                    }


                    if(commitAtEnd.equals("true")){
                                System.out.println("committing Offset to ZK");
                                consumer.commitOffsets(true); //committing 
consumer index to zk
                    }

                    consumer.shutdown();

                    logger.info("Consumer Done");
                }


                private static boolean hasNext(ConsumerIterator<String,String> 
iterator) {
        boolean result = false;

        try {
            iterator.hasNext();
            result = true;
        }
        catch (ConsumerTimeoutException e) {
                System.out.println("ConsumerTimeoutException -> " + 
e.getMessage());
            result = false;
        }

        return result;
    }

}

Thanks a lot,
Roni

Reply via email to