I have a simple java producer like below public class Producer { private final static String TOPIC = "my-example-topi8"; private final static String BOOTSTRAP_SERVERS = "localhost:8092";
public static void main( String[] args ) throws Exception { Producer<String, byte[]> producer = createProducer(); for(int i=0;i<3000;i++) { String msg = "Test Message-" + i; final ProducerRecord<String, byte[]> record = new ProducerRecord<String, byte[]>(TOPIC, "key" + i, msg.getBytes()); producer.send(record).get(); System.out.println("Sent message " + msg); } producer.close(); } private static Producer<String, byte[]> createProducer() { Properties props = new Properties(); props.put("metadata.broker.list", BOOTSTRAP_SERVERS); props.put("bootstrap.servers", BOOTSTRAP_SERVERS); props.put("client.id", "AppFromJava"); props.put("serializer.class", "kafka.serializer.DefaultEncoder"); props.put("key.serializer.class", "kafka.serializer.StringEncoder"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("compression.codec", "snappy"); props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); return new KafkaProducer<String, byte[]>(props); } } I am trying to read data as below public class Consumer { private final static String TOPIC = "my-example-topi8"; private final static String BOOTSTRAP_SERVERS = "localhost:8092"; public static void main( String[] args ) throws Exception { Consumer<String, byte[]> consumer = createConsumer(); start(consumer); } static void start(Consumer<String, byte[]> consumer) throws InterruptedException { final int giveUp = 10; int noRecordsCount = 0; int stopCount = 1000; while (true) { final ConsumerRecords<String, byte[]> consumerRecords = consumer.poll(1000); if (consumerRecords.count()==0) { noRecordsCount++; if (noRecordsCount > giveUp) break; else continue; } consumerRecords.forEach(record -> { System.out.printf("\nConsumer Record:(%s, %s, %s)", record.key(), new String(record.value()), record.topic()); }); consumer.commitSync(); break; } consumer.close(); System.out.println("DONE"); } private static Consumer<String, byte[]> createConsumer() { final Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleConsumer"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); props.put(ConsumerConfig.CLIENT_ID_CONFIG, "1234"); props.put("group.id", "test"); props.put("enable.auto.commit", "false"); // Create the consumer using props. final Consumer<String, byte[]> consumer = new KafkaConsumer(props); consumer.subscribe(Collections.singletonList(TOPIC)); return consumer; } } But the consumer is not reading any message from kafka. If I add the below at the very start() consumer.poll(0); consumer.seekToBeginning(consumer.assignment()); Then the consumer starts reading from the topic. But then each time the consumer is restarted it is reading message from the start of the topic which I don;t want. Can someone let me know what is going wrong and how can I fix this? Kafka Version 0.10