I want to test a Kafka example. I am using Kafka 0.10.0.1. The producer: object ProducerApp extends App { val topic = "topicTest" val props = new Properties() props.put("bootstrap.servers", "localhost:9092") props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer") props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String, String](props) for(i <- 0 to 20) { val record = new ProducerRecord(topic, "key "+i," value "+i) producer.send(record) Thread.sleep(100) } }
The consumer (the topic "topicTest" is created with 1 partition): object ConsumerApp extends App { val topic = "topicTest" val properties = new Properties properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer") properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") val consumer = new KafkaConsumer[String, String](properties) consumer.subscribe(scala.List(topic).asJava) while (true) { consumer.seekToBeginning(consumer.assignment()) val records:ConsumerRecords[String,String] = consumer.poll(20000) println("records size "+records.count()) records.asScala.foreach(rec => println("offset "+rec.offset())) } } The problem is that the consumer reads randomly from the offset 0 at the first iteration but at the other iterations it does. I want to know the reason and how can I make the consumer reads from the offset 0 at all the iterations. The expected result is: records size 6 offset 0 offset 1 offset 2 offset 3 offset 4 offset 5 records size 6 offset 0 offset 1 offset 2 offset 3 offset 4 offset 5 .. but the obtained result is: records size 4 offset 2 offset 3 offset 4 offset 5 records size 6 offset 0 offset 1 offset 2 offset 3 offset 4 offset 5 .. I want that the consumer reads all the records at all the iterations (from the offset 0 to the offset 5)