Yes, try this command replacing the bracketed things with the correct
identification for your project:

*bin/kafka-console-consumer.sh --zookeeper [zookeeper info] --topic
[your-topic-here] --from-beginning*

On Mon, Jun 29, 2015 at 9:56 AM, shivam tiwari <bigbang...@gmail.com> wrote:

> It turns out to be the code mentioned does not read topic from beginning
> and actually waits for producer to produce something fresh. After starting
> producer and sending messages I was able to retrieve all the messages. Is
> there a way to get messages on topic from beginning?
>
> On 26 June 2015 at 14:32, Gwen Shapira <gshap...@cloudera.com> wrote:
>
> > Zookeeper actually doesn't show any errors - it shows a warning, which
> > is pretty normal.
> >
> > What does your consumer and Kafka broker show? Are there any errors in
> > the consumer? Or is it just hanging?
> >
> > You may want to consult our FAQ:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whydoesmyconsumernevergetanydata
> > ?
> >
> > Gwen
> >
> > On Fri, Jun 26, 2015 at 11:00 AM, shivam tiwari <bigbang...@gmail.com>
> > wrote:
> > > Kafka 0.8.2.2.3 and zookeper both are running inside VM. I was able to
> > run
> > > both producer and consumer within VM successfully using
> > > kafka-console-producer.sh and kafka-console-consumer.sh respectively.
> > Even
> > > I was able to consume Kafka messages from host machine using
> > > kafka-console-consumer.sh. But when I tried to run the consumer using
> > java
> > > from eclipse then zookeeper logs following error
> > >
> > > 2015-06-26 03:06:26,323 - INFO
> > > [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1007] -
> > > Closed socket connection for client /192.168.1.12:59549 (no session
> > > established for client)
> > > 2015-06-26 03:07:26,225 - INFO
> > > [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197] -
> > > Accepted socket connection from /192.168.1.12:59617
> > > 2015-06-26 03:07:26,226 - WARN
> > > [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught
> > > end of stream exception
> > > EndOfStreamException: Unable to read additional data from client
> > > sessionid 0x0, likely client has closed socket
> > >     at
> > org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228)
> > >     at
> >
> org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
> > >     at java.lang.Thread.run(Thread.java:745)
> > >
> > > Below is my Kafka consume code
> > >
> > > package com.truckevent.producer;
> > >
> > >
> > > import java.util.HashMap;
> > > import java.util.List;
> > > import java.util.Map;
> > > import java.util.Properties;
> > >
> > > import kafka.consumer.Consumer;
> > > import kafka.consumer.ConsumerConfig;
> > > import kafka.consumer.ConsumerIterator;
> > > import kafka.consumer.KafkaStream;
> > > import kafka.javaapi.consumer.ConsumerConnector;
> > >
> > >
> > > public class KafkaConsumer {
> > >
> > >     public static void main(String[] args) throws Exception {
> > >
> > >         String group = "hello" ;
> > >
> > >
> > >         Properties props = new Properties();
> > >         props.put("zookeeper.connect", "192.168.1.12:2181");
> > >         props.put("group.id", group);
> > >         props.put("zookeeper.session.timeout.ms", "20000");
> > >         props.put("zookeeper.sync.time.ms", "2030");
> > >         props.put("auto.commit.interval.ms", "10000");
> > >         props.put("auto.offset.reset", "smallest");
> > >
> > >         ConsumerConfig cf = new ConsumerConfig(props) ;
> > >
> > >         ConsumerConnector consumer =
> > Consumer.createJavaConsumerConnector(cf) ;
> > >
> > >         String topic = "event" ;
> > >
> > >         Map<String, Integer> topicCountMap = new HashMap<String,
> > Integer>();
> > >         topicCountMap.put(topic, new Integer(1));
> > >         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > > consumer.createMessageStreams(topicCountMap);
> > >         List<KafkaStream<byte[], byte[]>> streams =
> > consumerMap.get(topic);
> > >
> > >
> > >         KafkaStream<byte[],byte[]> stream = streams.get(0) ;
> > >
> > >         ConsumerIterator<byte[], byte[]> it = stream.iterator();
> > >         int i = 1 ;
> > >         while (it.hasNext()) {
> > >
> > >             System.out.println(i + ": " + new
> > String(it.next().message()));
> > >             ++i;
> > >         }
> > >         consumer.shutdown();
> > >     }
> > > }
> > >
> > >
> > > I am not sure why I am not able to consume messages from java code.
> Kafka
> > > is running on port 6667 and zookeeper on 2181.
> > >
> > >
> > > --
> > > Regards
> > >
> > > Shivam Tiwari
> >
>
>
>
> --
> Regards
>
> Shivam Tiwari
>



-- 

Jiefu Gong
University of California, Berkeley | Class of 2017
B.A Computer Science | College of Letters and Sciences

jg...@berkeley.edu <elise...@berkeley.edu> | (925) 400-3427

Reply via email to