HI,all: I specify the exact offsets the consumer should start from for each partition.But the Kafka consumer connot periodically commit the offsets to Zookeeper. I have disabled the checkpoint only if the job is stopped.This is my code:
val properties = new Properties() properties.setProperty("bootstrap.servers", config.kafka_input.kafka_base_config.brokers) properties.setProperty("zookeeper.connect", config.zookeeper_address) properties.setProperty("group.id<http://group.id>", config.kafka_input.groupId) properties.setProperty("session.timeout.ms", config.kafka_input.sessionTimeout) properties.setProperty("enable.auto.commit", config.kafka_input.autoCommit.toString) val flinkxConfigUtils = new WormholeFlinkxConfigUtils(config) val topics = flinkxConfigUtils.getKafkaTopicList val myConsumer = new FlinkKafkaConsumer010[(String, String, String, Int, Long)](topics, new WormholeDeserializationStringSchema, properties) val specificStartOffsets = flinkxConfigUtils.getTopicPartitionOffsetMap myConsumer.setStartFromSpecificOffsets(specificStartOffsets) Can anyone explain the problem? Thanks very much!