It turns out to be the code mentioned does not read topic from beginning
and actually waits for producer to produce something fresh. After starting
producer and sending messages I was able to retrieve all the messages. Is
there a way to get messages on topic from beginning?

On 26 June 2015 at 14:32, Gwen Shapira <gshap...@cloudera.com> wrote:

> Zookeeper actually doesn't show any errors - it shows a warning, which
> is pretty normal.
>
> What does your consumer and Kafka broker show? Are there any errors in
> the consumer? Or is it just hanging?
>
> You may want to consult our FAQ:
>
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whydoesmyconsumernevergetanydata
> ?
>
> Gwen
>
> On Fri, Jun 26, 2015 at 11:00 AM, shivam tiwari <bigbang...@gmail.com>
> wrote:
> > Kafka 0.8.2.2.3 and zookeper both are running inside VM. I was able to
> run
> > both producer and consumer within VM successfully using
> > kafka-console-producer.sh and kafka-console-consumer.sh respectively.
> Even
> > I was able to consume Kafka messages from host machine using
> > kafka-console-consumer.sh. But when I tried to run the consumer using
> java
> > from eclipse then zookeeper logs following error
> >
> > 2015-06-26 03:06:26,323 - INFO
> > [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1007] -
> > Closed socket connection for client /192.168.1.12:59549 (no session
> > established for client)
> > 2015-06-26 03:07:26,225 - INFO
> > [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197] -
> > Accepted socket connection from /192.168.1.12:59617
> > 2015-06-26 03:07:26,226 - WARN
> > [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught
> > end of stream exception
> > EndOfStreamException: Unable to read additional data from client
> > sessionid 0x0, likely client has closed socket
> >     at
> org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228)
> >     at
> org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
> >     at java.lang.Thread.run(Thread.java:745)
> >
> > Below is my Kafka consume code
> >
> > package com.truckevent.producer;
> >
> >
> > import java.util.HashMap;
> > import java.util.List;
> > import java.util.Map;
> > import java.util.Properties;
> >
> > import kafka.consumer.Consumer;
> > import kafka.consumer.ConsumerConfig;
> > import kafka.consumer.ConsumerIterator;
> > import kafka.consumer.KafkaStream;
> > import kafka.javaapi.consumer.ConsumerConnector;
> >
> >
> > public class KafkaConsumer {
> >
> >     public static void main(String[] args) throws Exception {
> >
> >         String group = "hello" ;
> >
> >
> >         Properties props = new Properties();
> >         props.put("zookeeper.connect", "192.168.1.12:2181");
> >         props.put("group.id", group);
> >         props.put("zookeeper.session.timeout.ms", "20000");
> >         props.put("zookeeper.sync.time.ms", "2030");
> >         props.put("auto.commit.interval.ms", "10000");
> >         props.put("auto.offset.reset", "smallest");
> >
> >         ConsumerConfig cf = new ConsumerConfig(props) ;
> >
> >         ConsumerConnector consumer =
> Consumer.createJavaConsumerConnector(cf) ;
> >
> >         String topic = "event" ;
> >
> >         Map<String, Integer> topicCountMap = new HashMap<String,
> Integer>();
> >         topicCountMap.put(topic, new Integer(1));
> >         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > consumer.createMessageStreams(topicCountMap);
> >         List<KafkaStream<byte[], byte[]>> streams =
> consumerMap.get(topic);
> >
> >
> >         KafkaStream<byte[],byte[]> stream = streams.get(0) ;
> >
> >         ConsumerIterator<byte[], byte[]> it = stream.iterator();
> >         int i = 1 ;
> >         while (it.hasNext()) {
> >
> >             System.out.println(i + ": " + new
> String(it.next().message()));
> >             ++i;
> >         }
> >         consumer.shutdown();
> >     }
> > }
> >
> >
> > I am not sure why I am not able to consume messages from java code. Kafka
> > is running on port 6667 and zookeeper on 2181.
> >
> >
> > --
> > Regards
> >
> > Shivam Tiwari
>



-- 
Regards

Shivam Tiwari

Reply via email to