I read the faqs and I added "auto.offset.reset" property in the
configuration setting of storm. Then I ran my producer code and then I ran
my consumer code when I ran the consumer code it printed all the messages
that were created by producer but after stopping the consumer when I ran it
again it didn't show any messages. I think the offset was not reset. What
do you think is going wrong ?

Thanks


On Mon, Jan 20, 2014 at 9:42 PM, Jun Rao <jun...@gmail.com> wrote:

> Could you check the following FAQ?
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whydoesmyconsumernevergetanydata
> ?
>
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Myconsumerseemstohavestopped,why
> ?
>
> Thanks,
>
> Jun
>
>
> On Mon, Jan 20, 2014 at 7:22 AM, Abhishek Bhattacharjee <
> abhishek.bhattacharje...@gmail.com> wrote:
>
> > Sorry I have sent both codes as consumer codes. This is the producer
> code.
> >
> > *Producer.java*
> >
> > package kafka.examples;
> >
> >
> > import java.util.Properties;
> > import kafka.producer.KeyedMessage;
> > import kafka.producer.ProducerConfig;
> >
> > public class Producer/* extends Thread*/
> > {
> >   private final kafka.javaapi.producer.Producer<Integer, String>
> producer;
> >   private final String topic;
> >   private final Properties props = new Properties();
> >
> >   public Producer(String topic)
> >   {
> >     props.put("serializer.class", "kafka.serializer.StringEncoder");
> >     props.put("metadata.broker.list", "localhost:9092");
> >     // Use random partitioner. Don't need the key type. Just set it to
> > Integer.
> >     // The message is of type String.
> >     producer = new kafka.javaapi.producer.Producer<Integer, String>(new
> > ProducerConfig(props));
> >     this.topic = topic;
> >     System.out.println("Producer at "+this.topic);
> >   }
> >
> >   public void putdata() {
> >     int messageNo = 1;
> >     while(messageNo < 100)
> >     {
> >       String messageStr = new String("Message_" + messageNo);
> >       producer.send(new KeyedMessage<Integer, String>(topic
> ,messageStr));
> >       messageNo = messageNo +1;
> >     }
> >     producer.close();
> >     System.out.println("Producer exit");
> >   }
> >
> > }
> >
> >
> > On Mon, Jan 20, 2014 at 8:46 PM, Abhishek Bhattacharjee <
> > abhishek.bhattacharje...@gmail.com> wrote:
> >
> > > Hello,
> > > I am new to kafka and facing some problem.
> > > My producer code works properly and sends data.
> > > But the consumer is not able to read it.
> > > Here are the codes for Producer and Consumer.
> > > Something is wrong with the Consumer.java code can someone please help
> > > with this.
> > >
> > >
> > > *Producer.java*
> > >
> > > package kafka.examples;
> > >
> > >
> > > import java.util.HashMap;
> > > import java.util.List;
> > > import java.util.Map;
> > > import java.util.Properties;
> > > import kafka.consumer.ConsumerConfig;
> > > import kafka.consumer.ConsumerIterator;
> > > import kafka.consumer.KafkaStream;
> > > import kafka.javaapi.consumer.ConsumerConnector;
> > > import kafka.message.Message;
> > >
> > >
> > > public class Consumer
> > > {
> > >     private final ConsumerConnector consumer;
> > >     private final String topic;
> > >
> > >     public Consumer(String topic)
> > >     {
> > > consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
> > >        createConsumerConfig());
> > >  this.topic = topic;
> > > System.out.println("Consumer at "+this.topic);
> > >     }
> > >
> > >     private static ConsumerConfig createConsumerConfig()
> > >     {
> > > Properties props = new Properties();
> > > props.put("zookeeper.connect", KafkaProperties.zkConnect);
> > >  props.put("group.id", KafkaProperties.groupId);
> > > props.put("zookeeper.session.timeout.ms", "400");
> > >  props.put("zookeeper.sync.time.ms", "200");
> > > props.put("auto.commit.interval.ms", "1000");
> > >
> > > return new ConsumerConfig(props);
> > >
> > >     }
> > >
> > >     public void readdata() {
> > > Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
> > >  topicCountMap.put(topic, new Integer(1));
> > > Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > > consumer.createMessageStreams(topicCountMap);
> > >  KafkaStream<byte[], byte[]> stream =  consumerMap.get(topic).get(0);
> > > ConsumerIterator<byte[], byte[]> it = stream.iterator();
> > >  System.out.println("Inside read data");
> > > while(it.hasNext())
> > >     System.out.println(new String(it.next().message()));
> > >
> > >     }
> > > }
> > >
> > > And this is the consumer code.
> > >
> > > *Consumer.java*
> > >
> > > package kafka.examples;
> > >
> > >
> > > import java.util.HashMap;
> > > import java.util.List;
> > > import java.util.Map;
> > > import java.util.Properties;
> > > import kafka.consumer.ConsumerConfig;
> > > import kafka.consumer.ConsumerIterator;
> > > import kafka.consumer.KafkaStream;
> > > import kafka.javaapi.consumer.ConsumerConnector;
> > > import kafka.message.Message;
> > >
> > >
> > > public class Consumer
> > > {
> > >   private final ConsumerConnector consumer;
> > >   private final String topic;
> > >
> > >   public Consumer(String topic)
> > >   {
> > >     consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
> > >             createConsumerConfig());
> > >     this.topic = topic;
> > >     System.out.println("Consumer at "+topic);
> > >   }
> > >
> > >   private static ConsumerConfig createConsumerConfig()
> > >   {
> > >     Properties props = new Properties();
> > >     props.put("zookeeper.connect", KafkaProperties.zkConnect);
> > >     props.put("group.id", KafkaProperties.groupId);
> > >     props.put("zookeeper.session.timeout.ms", "400");
> > >     props.put("zookeeper.sync.time.ms", "200");
> > >     props.put("auto.commit.interval.ms", "1000");
> > >
> > >     return new ConsumerConfig(props);
> > >
> > >   }
> > >
> > >   public void readdata() {
> > >     Map<String, Integer> topicCountMap = new HashMap<String,
> Integer>();
> > >     topicCountMap.put(topic, new Integer(1));
> > >     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > > consumer.createMessageStreams(topicCountMap);
> > >     KafkaStream<byte[], byte[]> stream =
>  consumerMap.get(topic).get(0);
> > >     ConsumerIterator<byte[], byte[]> it = stream.iterator();
> > >     while(it.hasNext())
> > >       System.out.println(new String(it.next().message()));
> > >   }
> > > }
> > >
> > >
> > > Thanks.
> > > --
> > > *Abhishek Bhattacharjee*
> > > *Pune Institute of Computer Technology*
> > >
> >
> >
> >
> > --
> > *Abhishek Bhattacharjee*
> > *Pune Institute of Computer Technology*
> >
>



-- 
*Abhishek Bhattacharjee*
*Pune Institute of Computer Technology*

Reply via email to