andy hoang created FLINK-11335: ---------------------------------- Summary: Kafka consumer can not commit offset at checkpoint Key: FLINK-11335 URL: https://issues.apache.org/jira/browse/FLINK-11335 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 1.6.2 Environment: AWS EMR 5.20: hadoop, flink plugin
flink: 1.62 run under yarn-cluster Kafka cluster: 1.0 Reporter: andy hoang When trying to commit offset to kafka, I always get warning {noformat} 2019-01-15 11:18:55,405 WARN org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher - Committing offsets to Kafka takes longer than the checkpoint interval. Skipping commit of previous offsets because newer complete checkpoint offsets are available. This does not compromise Flink's checkpoint integrity. {noformat} The code was simplified be remove business {code:java} val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStateBackend(new FsStateBackend("s3://pp-andy-test/checkpoint")) env.enableCheckpointing(6000, CheckpointingMode.AT_LEAST_ONCE) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE) env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) val properties = new Properties() properties.setProperty("group.id", "my_groupid") //properties.setProperty("enable.auto.commit", "false") val consumer = new FlinkKafkaConsumer011[ObjectNode]("my_topic", new JSONKeyValueDeserializationSchema(true), properties).setStartFromGroupOffsets().setCommitOffsetsOnCheckpoints(true) val stream = env.addSource(consumer) stream.map(new MapFunction[ObjectNode, Either[(Exception, ObjectNode), (Int, ujson.Value)]] { override def map(node:ObjectNode): scala.Either[(Exception, ObjectNode), (Int, ujson.Value)] = { logger.info("################## %s".format(node.get("metadata").toString)) Thread.sleep(3000) return Right(200, writeJs(node.toString)) } }).print() env.execute("pp_convoy_flink") } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)