I am using kafka_2.10 now. My consumer is tuck in infinite loop at
highlighted line.
Is there any issue with code?

public static void main(String[] args) {
Properties props = new Properties();
props.put("zookeeper.connect","zkaddress:2181");
props.put("group.id", "testgroup");
props.put("zookeeper.session.timeout.ms", "5000");
props.put("zookeeper.sync.time.ms", "250");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "smallest");
ConsumerConfig consumerConfig = new ConsumerConfig(props);
Map<String, Integer> topicMap = new HashMap<String, Integer>();
topicMap.put("topicname", new Integer(1));
 ConsumerConnector consumer =
Consumer.createJavaConsumerConnector(consumerConfig);
 Map<String, List<KafkaStream<byte[], byte[]>>> streamMap =
consumer.createMessageStreams(topicMap);
 List<KafkaStream<byte[], byte[]>> streamList = streamMap.get("topicname");
System.out.println("fetching...");System.out.println("size"+streamList.size());
for(KafkaStream<byte[], byte[]> stream:streamList){
System.out.println("size str"+stream.size());
ConsumerIterator<byte[], byte[]> strItr = stream.iterator();
System.out.println("size itr"+strItr.size());
while(strItr.hasNext()){
MessageAndMetadata<byte[], byte[]> msgMeta = strItr.next();
System.out.println("offset is"+msgMeta.offset());
System.out.println(msgMeta.message());
 }
}
}
 And how do we specify batch size while consuming messages?


On Mon, Jun 22, 2015 at 12:23 AM, Sriharsha Chintalapani <ka...@harsha.io>
wrote:

> Sushant,
>      You are using kafka clients new consumer api. It looks like you want
> to use high-level consumer api?. If so you need use following kafka core
> lib as the dependency
> <dependency>
> <groupId>org.apache.kafka</groupId>
> <artifactId>kafka_2.10</artifactId>
> <version>0.8.2.1</version>
> </dependency
>
> More details on this page
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
> --
> Harsha
>
>
> On June 21, 2015 at 8:02:01 AM, Shushant Arora (shushantaror...@gmail.com)
> wrote:
>
> which is the latest jar to be used for kafka java client.
>
> As in
> <dependency>
> <groupId>org.apache.kafka</groupId>
> <artifactId>kafka-clients</artifactId>
> <version>0.8.2.1</version>
> </dependency>
>
> In class org.apache.kafka.clients.consumer.KafkaConsumer
>
> public Map<String, ConsumerRecords<K,V>> poll(long timeout) {
> // TODO Auto-generated method stub
> return null;
> }
>
> poll method returns null. I want to use a high level java consumer.
>
> And why in org.apache.kafka.clients.consumer.ConsumerConfig configuration
> is for
> public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
> not for zookeeper.connect ? Is in highlevel offsets are maintained by
> zookeeper
> then why broker address is required?
>
>

Reply via email to