I have posted the same question in stackoverflow also. But I have not got any reply there also
https://stackoverflow.com/questions/48826279/kafka-0-10-java-consumer-not-reading-message-from-topic On Fri, Feb 16, 2018 at 5:23 PM, Debraj Manna <subharaj.ma...@gmail.com> wrote: > I have a simple java producer like below > > public class Producer > { > private final static String TOPIC = "my-example-topi8"; > private final static String BOOTSTRAP_SERVERS = "localhost:8092"; > > public static void main( String[] args ) throws Exception { > Producer<String, byte[]> producer = createProducer(); > for(int i=0;i<3000;i++) { > String msg = "Test Message-" + i; > final ProducerRecord<String, byte[]> record = new > ProducerRecord<String, byte[]>(TOPIC, "key" + i, msg.getBytes()); > producer.send(record).get(); > System.out.println("Sent message " + msg); > } > producer.close(); > } > > private static Producer<String, byte[]> createProducer() { > Properties props = new Properties(); > props.put("metadata.broker.list", BOOTSTRAP_SERVERS); > props.put("bootstrap.servers", BOOTSTRAP_SERVERS); > props.put("client.id", "AppFromJava"); > props.put("serializer.class", "kafka.serializer.DefaultEncoder"); > props.put("key.serializer.class", "kafka.serializer. > StringEncoder"); > props.put("key.serializer", "org.apache.kafka.common. > serialization.StringSerializer"); > props.put("compression.codec", "snappy"); > props.put("value.serializer", "org.apache.kafka.common. > serialization.ByteArraySerializer"); > return new KafkaProducer<String, byte[]>(props); > } > } > > I am trying to read data as below > > public class Consumer > { > private final static String TOPIC = "my-example-topi8"; > private final static String BOOTSTRAP_SERVERS = "localhost:8092"; > > public static void main( String[] args ) throws Exception { > Consumer<String, byte[]> consumer = createConsumer(); > start(consumer); > } > > static void start(Consumer<String, byte[]> consumer) throws > InterruptedException { > final int giveUp = 10; > int noRecordsCount = 0; > int stopCount = 1000; > > while (true) { > final ConsumerRecords<String, byte[]> consumerRecords = > consumer.poll(1000); > if (consumerRecords.count()==0) { > noRecordsCount++; > if (noRecordsCount > giveUp) break; > else continue; > } > > > consumerRecords.forEach(record -> { > System.out.printf("\nConsumer Record:(%s, %s, %s)", > record.key(), new String(record.value()), record.topic()); > }); > > consumer.commitSync(); > break; > } > consumer.close(); > System.out.println("DONE"); > } > > private static Consumer<String, byte[]> createConsumer() { > final Properties props = new Properties(); > props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, > BOOTSTRAP_SERVERS); > props.put(ConsumerConfig.GROUP_ID_CONFIG, > "KafkaExampleConsumer"); > props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > StringDeserializer.class.getName()); > props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > ByteArrayDeserializer.class.getName()); > props.put(ConsumerConfig.CLIENT_ID_CONFIG, "1234"); > props.put("group.id", "test"); > props.put("enable.auto.commit", "false"); > > // Create the consumer using props. > final Consumer<String, byte[]> consumer = new KafkaConsumer(props); > consumer.subscribe(Collections.singletonList(TOPIC)); > return consumer; > } > } > > But the consumer is not reading any message from kafka. If I add the below > at the very start() > > consumer.poll(0); > > consumer.seekToBeginning(consumer.assignment()); > > > Then the consumer starts reading from the topic. But then each time the > consumer is restarted it is reading message from the start of the topic > which I don;t want. Can someone let me know what is going wrong and how can > I fix this? > > > Kafka Version 0.10 > > >