Hi All, I'm using kafka Manual Partition Assignment api to read kafka topic. I found that if i use the "seekToBeginning" method ,the consumer will not auto commit offset to kafka even if the "enable.auto.commit" is "true".
My code like next: Properties props = new Properties(); props.put("bootstrap.servers", "host:9092"); props.put("group.id", groupid); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", autooffsetreset); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.kerberos.service.name", "aaa"); props.put("sasl.mechanism", "GSSAPI"); Map<TopicPartition, Long> fromOffsets = new HashMap<>(); fromOffsets.put(new TopicPartition(topic, 0), (long) -1); fromOffsets.put(new TopicPartition(topic, 1), (long) -1); fromOffsets.put(new TopicPartition(topic, 2), (long) -1); fromOffsets.put(new TopicPartition(topic, 3), (long) -1); fromOffsets.put(new TopicPartition(topic, 4), (long) -1); fromOffsets.put(new TopicPartition(topic, 5), (long) -1); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.assign(fromOffsets.keySet()); consumer.seekToBeginning(fromOffsets.keySet()); int i = 0; while (true) { ConsumerRecords<String, String> records = consumer.poll(1000); if (records.isEmpty()) { System.out.println("i is " + i); consumer.commitSync(); try { Thread.sleep(10 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } } for (ConsumerRecord<String, String> record : records) { i++; } ....... Is there something i should know to avoid this problem ? Thanks!