Can you check the committed offsets using bin/kafka-consumer-group.sh ? Also inspect your consumer's position via KafkaConsumer#position() to see where the consumer actually is in the topic.
-Matthias On 2/16/18 5:13 AM, Debraj Manna wrote: > I have posted the same question in stackoverflow also. But I have not got > any reply there also > > https://stackoverflow.com/questions/48826279/kafka-0-10-java-consumer-not-reading-message-from-topic > > On Fri, Feb 16, 2018 at 5:23 PM, Debraj Manna <subharaj.ma...@gmail.com> > wrote: > >> I have a simple java producer like below >> >> public class Producer >> { >> private final static String TOPIC = "my-example-topi8"; >> private final static String BOOTSTRAP_SERVERS = "localhost:8092"; >> >> public static void main( String[] args ) throws Exception { >> Producer<String, byte[]> producer = createProducer(); >> for(int i=0;i<3000;i++) { >> String msg = "Test Message-" + i; >> final ProducerRecord<String, byte[]> record = new >> ProducerRecord<String, byte[]>(TOPIC, "key" + i, msg.getBytes()); >> producer.send(record).get(); >> System.out.println("Sent message " + msg); >> } >> producer.close(); >> } >> >> private static Producer<String, byte[]> createProducer() { >> Properties props = new Properties(); >> props.put("metadata.broker.list", BOOTSTRAP_SERVERS); >> props.put("bootstrap.servers", BOOTSTRAP_SERVERS); >> props.put("client.id", "AppFromJava"); >> props.put("serializer.class", "kafka.serializer.DefaultEncoder"); >> props.put("key.serializer.class", "kafka.serializer. >> StringEncoder"); >> props.put("key.serializer", "org.apache.kafka.common. >> serialization.StringSerializer"); >> props.put("compression.codec", "snappy"); >> props.put("value.serializer", "org.apache.kafka.common. >> serialization.ByteArraySerializer"); >> return new KafkaProducer<String, byte[]>(props); >> } >> } >> >> I am trying to read data as below >> >> public class Consumer >> { >> private final static String TOPIC = "my-example-topi8"; >> private final static String BOOTSTRAP_SERVERS = "localhost:8092"; >> >> public static void main( String[] args ) throws Exception { >> Consumer<String, byte[]> consumer = createConsumer(); >> start(consumer); >> } >> >> static void start(Consumer<String, byte[]> consumer) throws >> InterruptedException { >> final int giveUp = 10; >> int noRecordsCount = 0; >> int stopCount = 1000; >> >> while (true) { >> final ConsumerRecords<String, byte[]> consumerRecords = >> consumer.poll(1000); >> if (consumerRecords.count()==0) { >> noRecordsCount++; >> if (noRecordsCount > giveUp) break; >> else continue; >> } >> >> >> consumerRecords.forEach(record -> { >> System.out.printf("\nConsumer Record:(%s, %s, %s)", >> record.key(), new String(record.value()), record.topic()); >> }); >> >> consumer.commitSync(); >> break; >> } >> consumer.close(); >> System.out.println("DONE"); >> } >> >> private static Consumer<String, byte[]> createConsumer() { >> final Properties props = new Properties(); >> props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, >> BOOTSTRAP_SERVERS); >> props.put(ConsumerConfig.GROUP_ID_CONFIG, >> "KafkaExampleConsumer"); >> props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, >> StringDeserializer.class.getName()); >> props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, >> ByteArrayDeserializer.class.getName()); >> props.put(ConsumerConfig.CLIENT_ID_CONFIG, "1234"); >> props.put("group.id", "test"); >> props.put("enable.auto.commit", "false"); >> >> // Create the consumer using props. >> final Consumer<String, byte[]> consumer = new KafkaConsumer(props); >> consumer.subscribe(Collections.singletonList(TOPIC)); >> return consumer; >> } >> } >> >> But the consumer is not reading any message from kafka. If I add the below >> at the very start() >> >> consumer.poll(0); >> >> consumer.seekToBeginning(consumer.assignment()); >> >> >> Then the consumer starts reading from the topic. But then each time the >> consumer is restarted it is reading message from the start of the topic >> which I don;t want. Can someone let me know what is going wrong and how can >> I fix this? >> >> >> Kafka Version 0.10 >> >> >> >
signature.asc
Description: OpenPGP digital signature