Hi all, I am new to Kafka. I have a problem reading from a topic multiple times with kafka-console-consumer and a Java client. Any help is appreciated.
I have used a tool called Kandalf (https://github.com/hellofresh/kandalf) to extract RabbitMQ messages to be pushed in to a Kafka topic `usage-records`. The Kafka version was 0.8.x that came with the Kandalf set of Docker containers. In my local, I removed all files in `kafka-logs` except .log and .index as I was not sure how the 0.8.x metadata is compatible with Kafka 1.0.1. I did also have a empty Zookeeper locally. The folder looked like below. ``` kafka-logs/usage-records-0 kafka-logs/usage-records-0/00000000000000000000.log kafka-logs/usage-records-0/00000000000000000000.index ``` When I start Kafka, I see logs which talk about building some index / also possibly caching some metadata in Zookeeper. I am not sure what data is actually stored in Zookeeper. Then I start a kafka-console-consumer to read all records from beginning and it reads >0 records. If I run the same kafka-console-consumer several times from beginning, it returns same number of records each time. I originally tried to play with Kafka streams and I got a simple aggregate-count working but could not get aggregate-sum working and then I thought let me first understand KafkaConsumer well. So I wrote a class to play with KafkaConsumer class and once the code is run; it yields no records for the topic eventhough I would expect it to as it should start from offset zero. I later run kafka-console-consumer to read the same topic from beginning, it then returns zero records. I do not know which tool I can use to check whether the topics have been emptied. To my understanding - there is no client that I have used that issued a topic delete - offset of consumer should not matter as kafka-console-consumer is run with flag --from-beginning The Java code using KafkaConsumer ``` import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Collections; import java.util.Properties; public class ConsumerMain { public static final String TOPIC = "usage-records"; public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "consumer.main.2"); props.put("key.deserializer", StringDeserializer.class); props.put("value.deserializer", StringDeserializer.class); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singleton(TOPIC)); while (true) { ConsumerRecords<String, String> records = consumer.poll(500); System.out.println(">> Obtained records of size: " + records.count()); records.forEach(record -> System.out.println(record.value())); } } } ``` Thanks Raghavan