Thanks for the reply. Actually in my use-case I need to control the offsets my self so should I use SimpleConsumer instead of Group Consumers ?
On Tue, Jan 21, 2014 at 9:38 PM, Jun Rao <jun...@gmail.com> wrote: > "auto.offset.reset" is only used when offsets don't exist in ZK. In your > case, the consumer likely already committed the offsets to ZK. So, after > restarting, the consumer will resume from where it left off, instead of > re-getting everything again. This is the expected behavior during normal > operation. If you are testing, you can use a new consumer group. > > Thanks, > > Jun > > > On Tue, Jan 21, 2014 at 8:02 AM, Abhishek Bhattacharjee < > abhishek.bhattacharje...@gmail.com> wrote: > > > I read the faqs and I added "auto.offset.reset" property in the > > configuration setting of storm. Then I ran my producer code and then I > ran > > my consumer code when I ran the consumer code it printed all the messages > > that were created by producer but after stopping the consumer when I ran > it > > again it didn't show any messages. I think the offset was not reset. What > > do you think is going wrong ? > > > > Thanks > > > > > > On Mon, Jan 20, 2014 at 9:42 PM, Jun Rao <jun...@gmail.com> wrote: > > > > > Could you check the following FAQ? > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whydoesmyconsumernevergetanydata > > > ? > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Myconsumerseemstohavestopped,why > > > ? > > > > > > Thanks, > > > > > > Jun > > > > > > > > > On Mon, Jan 20, 2014 at 7:22 AM, Abhishek Bhattacharjee < > > > abhishek.bhattacharje...@gmail.com> wrote: > > > > > > > Sorry I have sent both codes as consumer codes. This is the producer > > > code. > > > > > > > > *Producer.java* > > > > > > > > package kafka.examples; > > > > > > > > > > > > import java.util.Properties; > > > > import kafka.producer.KeyedMessage; > > > > import kafka.producer.ProducerConfig; > > > > > > > > public class Producer/* extends Thread*/ > > > > { > > > > private final kafka.javaapi.producer.Producer<Integer, String> > > > producer; > > > > private final String topic; > > > > private final Properties props = new Properties(); > > > > > > > > public Producer(String topic) > > > > { > > > > props.put("serializer.class", "kafka.serializer.StringEncoder"); > > > > props.put("metadata.broker.list", "localhost:9092"); > > > > // Use random partitioner. Don't need the key type. Just set it > to > > > > Integer. > > > > // The message is of type String. > > > > producer = new kafka.javaapi.producer.Producer<Integer, > String>(new > > > > ProducerConfig(props)); > > > > this.topic = topic; > > > > System.out.println("Producer at "+this.topic); > > > > } > > > > > > > > public void putdata() { > > > > int messageNo = 1; > > > > while(messageNo < 100) > > > > { > > > > String messageStr = new String("Message_" + messageNo); > > > > producer.send(new KeyedMessage<Integer, String>(topic > > > ,messageStr)); > > > > messageNo = messageNo +1; > > > > } > > > > producer.close(); > > > > System.out.println("Producer exit"); > > > > } > > > > > > > > } > > > > > > > > > > > > On Mon, Jan 20, 2014 at 8:46 PM, Abhishek Bhattacharjee < > > > > abhishek.bhattacharje...@gmail.com> wrote: > > > > > > > > > Hello, > > > > > I am new to kafka and facing some problem. > > > > > My producer code works properly and sends data. > > > > > But the consumer is not able to read it. > > > > > Here are the codes for Producer and Consumer. > > > > > Something is wrong with the Consumer.java code can someone please > > help > > > > > with this. > > > > > > > > > > > > > > > *Producer.java* > > > > > > > > > > package kafka.examples; > > > > > > > > > > > > > > > import java.util.HashMap; > > > > > import java.util.List; > > > > > import java.util.Map; > > > > > import java.util.Properties; > > > > > import kafka.consumer.ConsumerConfig; > > > > > import kafka.consumer.ConsumerIterator; > > > > > import kafka.consumer.KafkaStream; > > > > > import kafka.javaapi.consumer.ConsumerConnector; > > > > > import kafka.message.Message; > > > > > > > > > > > > > > > public class Consumer > > > > > { > > > > > private final ConsumerConnector consumer; > > > > > private final String topic; > > > > > > > > > > public Consumer(String topic) > > > > > { > > > > > consumer = kafka.consumer.Consumer.createJavaConsumerConnector( > > > > > createConsumerConfig()); > > > > > this.topic = topic; > > > > > System.out.println("Consumer at "+this.topic); > > > > > } > > > > > > > > > > private static ConsumerConfig createConsumerConfig() > > > > > { > > > > > Properties props = new Properties(); > > > > > props.put("zookeeper.connect", KafkaProperties.zkConnect); > > > > > props.put("group.id", KafkaProperties.groupId); > > > > > props.put("zookeeper.session.timeout.ms", "400"); > > > > > props.put("zookeeper.sync.time.ms", "200"); > > > > > props.put("auto.commit.interval.ms", "1000"); > > > > > > > > > > return new ConsumerConfig(props); > > > > > > > > > > } > > > > > > > > > > public void readdata() { > > > > > Map<String, Integer> topicCountMap = new HashMap<String, > Integer>(); > > > > > topicCountMap.put(topic, new Integer(1)); > > > > > Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = > > > > > consumer.createMessageStreams(topicCountMap); > > > > > KafkaStream<byte[], byte[]> stream = > consumerMap.get(topic).get(0); > > > > > ConsumerIterator<byte[], byte[]> it = stream.iterator(); > > > > > System.out.println("Inside read data"); > > > > > while(it.hasNext()) > > > > > System.out.println(new String(it.next().message())); > > > > > > > > > > } > > > > > } > > > > > > > > > > And this is the consumer code. > > > > > > > > > > *Consumer.java* > > > > > > > > > > package kafka.examples; > > > > > > > > > > > > > > > import java.util.HashMap; > > > > > import java.util.List; > > > > > import java.util.Map; > > > > > import java.util.Properties; > > > > > import kafka.consumer.ConsumerConfig; > > > > > import kafka.consumer.ConsumerIterator; > > > > > import kafka.consumer.KafkaStream; > > > > > import kafka.javaapi.consumer.ConsumerConnector; > > > > > import kafka.message.Message; > > > > > > > > > > > > > > > public class Consumer > > > > > { > > > > > private final ConsumerConnector consumer; > > > > > private final String topic; > > > > > > > > > > public Consumer(String topic) > > > > > { > > > > > consumer = kafka.consumer.Consumer.createJavaConsumerConnector( > > > > > createConsumerConfig()); > > > > > this.topic = topic; > > > > > System.out.println("Consumer at "+topic); > > > > > } > > > > > > > > > > private static ConsumerConfig createConsumerConfig() > > > > > { > > > > > Properties props = new Properties(); > > > > > props.put("zookeeper.connect", KafkaProperties.zkConnect); > > > > > props.put("group.id", KafkaProperties.groupId); > > > > > props.put("zookeeper.session.timeout.ms", "400"); > > > > > props.put("zookeeper.sync.time.ms", "200"); > > > > > props.put("auto.commit.interval.ms", "1000"); > > > > > > > > > > return new ConsumerConfig(props); > > > > > > > > > > } > > > > > > > > > > public void readdata() { > > > > > Map<String, Integer> topicCountMap = new HashMap<String, > > > Integer>(); > > > > > topicCountMap.put(topic, new Integer(1)); > > > > > Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = > > > > > consumer.createMessageStreams(topicCountMap); > > > > > KafkaStream<byte[], byte[]> stream = > > > consumerMap.get(topic).get(0); > > > > > ConsumerIterator<byte[], byte[]> it = stream.iterator(); > > > > > while(it.hasNext()) > > > > > System.out.println(new String(it.next().message())); > > > > > } > > > > > } > > > > > > > > > > > > > > > Thanks. > > > > > -- > > > > > *Abhishek Bhattacharjee* > > > > > *Pune Institute of Computer Technology* > > > > > > > > > > > > > > > > > > > > > -- > > > > *Abhishek Bhattacharjee* > > > > *Pune Institute of Computer Technology* > > > > > > > > > > > > > > > -- > > *Abhishek Bhattacharjee* > > *Pune Institute of Computer Technology* > > > -- *Abhishek Bhattacharjee* *Pune Institute of Computer Technology*