Thank you. 

1. Is this Scala code indeed the source code of the consumer that I am using?:
(import kafka.consumer.*)
kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig).createMessageStreams(topicCountMap,
 decoder, decoder).get(topic).get(0).iterator();

2. Even if that's so, can you please say how can I fetch a btach of 1000 
messages into my internal memory?
(For the very least this will enable me to validate what you're saying, by 
comparing the time it would take for KafkaConsumer to fetch a single batch of 
1000 messages to the time it would take for 1000 fetches of a single message 
every time).



-----Original Message-----
From: Thanh Hong Dai [mailto:hdth...@tma.com.vn] 
Sent: Tuesday, 24 November, 2015 04:52
To: users@kafka.apache.org
Subject: RE: java kafka consumer: how to read a BATCH of messages?

If the number of network round trips is your concern, from the source code of 
https://github.com/apache/kafka/blob/0.9.0/core/src/main/scala/kafka/consume
r/ConsumerIterator.scala, it seems that the data is read in chunks internally, 
and it will fetch the next chunk over the network when the iterator exhausts 
the current chunk.

-----Original Message-----
From: Peleg, Roni [mailto:roni.pe...@arris.com]
Sent: Sunday, 22 November, 2015 8:03 PM
To: users@kafka.apache.org
Subject: java kafka consumer: how to read a BATCH of messages?

Hi all,

I'm trying the next piece of code I found on Google for simply consuming some 
messages from a Kafka topic, must be implemented in Java.
The problem is that this code consumes a single message every time.
How can I tell it to consume let's say 1000 messages at a time?

Currently this is the code snippet:

    while (it.hasNext()){
      msg = new String(it.next().message());
    }



The whole code:

package tst_mvn_proj_pkg;

import java.util.HashMap;
import java.util.Properties;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties; 
import kafka.consumer.ConsumerConfig; 
import kafka.consumer.ConsumerIterator; 
import kafka.consumer.ConsumerTimeoutException;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaTest {
                private static final Logger logger = 
LoggerFactory.getLogger(KafkaTest.class);
                public static ConsumerConnector consumer;

                //java -jar .jar 192.168.3.85:2181 rzGroup12 topic_TOPOLOGY 
true false 60000
                public static void main(String[] args) {
                                String zookeeper = args[0];
//192.168.3.85:2181  //"192.168.5.4:2181";
                                String consumerGroup = args[1]; 
//"ConsumerGroupId_5";
                                String topic = args[2]; //"topic_TOPOLOGY";
                                String commitAtEnd = args[3];
                                String smallest = args[4];
                                String consumerTimeout = args[5]; //60000

                                String msg="";
                                int msgNum=0;
                                logger.info("consuming topic: " + topic);
                                Properties props = new Properties();
                    //-//props.put("zookeeper.connect","192.168.3.85:2181");
                                props.put("zookeeper.connect",zookeeper);
                    props.put("group.id", consumerGroup);
                    props.put("zookeeper.session.timeout.ms", "5000");
                    props.put("zookeeper.sync.time.ms", "250");
                    props.put("auto.commit.interval.ms", "1000");
                    //props.put("auto.offset.reset","largest"); //Start only 
from new messages
                    props.put("consumer.timeout.ms",consumerTimeout); //Make 
sure loop doesn't hang
                    props.put("auto.commit.enable", "false");


                    if(smallest.equals("true")){
                                System.out.println("Using SMALLEST auto reset");
                                props.put("auto.offset.reset","smallest");
//Start from first message possible consumer never read before
                    }


                    ConsumerConfig consumerConfig = new ConsumerConfig(props);
                    consumer =
kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);


                    java.util.Map<String, Integer> topicCountMap = new 
HashMap<String, Integer>();
                    topicCountMap.put(topic,new Integer(1));

                    StringDecoder decoder = new StringDecoder(new 
VerifiableProperties());
                    KafkaStream<String, String> stream = 
consumer.createMessageStreams(topicCountMap, decoder, 
decoder).get(topic).get(0);
                    ConsumerIterator<String,String> iterator = 
stream.iterator();


                    while(hasNext(iterator)) {
                                msgNum++;
                                msg = iterator.next().message();
                                if(msgNum%10 == 0){
                                                logger.info("\nKafka Msg " + 
msgNum + ": " + msg.substring(0, 300) + "...");
                                }
                    }


                    if(commitAtEnd.equals("true")){
                                System.out.println("committing Offset to ZK");
                                consumer.commitOffsets(true); //committing 
consumer index to zk
                    }

                    consumer.shutdown();

                    logger.info("Consumer Done");
                }


                private static boolean
hasNext(ConsumerIterator<String,String> iterator) {
        boolean result = false;

        try {
            iterator.hasNext();
            result = true;
        }
        catch (ConsumerTimeoutException e) {
                System.out.println("ConsumerTimeoutException -> " + 
e.getMessage());
            result = false;
        }

        return result;
    }

}

Thanks a lot,
Roni

Reply via email to