[ https://issues.apache.org/jira/browse/FLINK-11335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
andy hoang updated FLINK-11335: ------------------------------- Description: When trying to commit offset to kafka, I always get warning {noformat} 2019-01-15 11:18:55,405 WARN org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher - Committing offsets to Kafka takes longer than the checkpoint interval. Skipping commit of previous offsets because newer complete checkpoint offsets are available. This does not compromise Flink's checkpoint integrity. {noformat} The result is not commiting any message to kafka The code was simplified be remove business {code:java} val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStateBackend(new FsStateBackend("s3://pp-andy-test/checkpoint")) env.enableCheckpointing(6000, CheckpointingMode.AT_LEAST_ONCE) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE) env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) val properties = new Properties() properties.setProperty("group.id", "my_groupid") //properties.setProperty("enable.auto.commit", "false") val consumer = new FlinkKafkaConsumer011[ObjectNode]("my_topic", new JSONKeyValueDeserializationSchema(true), properties).setStartFromGroupOffsets().setCommitOffsetsOnCheckpoints(true) val stream = env.addSource(consumer) stream.map(new MapFunction[ObjectNode, Either[(Exception, ObjectNode), (Int, ujson.Value)]] { override def map(node:ObjectNode): scala.Either[(Exception, ObjectNode), (Int, ujson.Value)] = { logger.info("################## %s".format(node.get("metadata").toString)) Thread.sleep(3000) return Right(200, writeJs(node.toString)) } }).print() env.execute("pp_convoy_flink") } {code} was: When trying to commit offset to kafka, I always get warning {noformat} 2019-01-15 11:18:55,405 WARN org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher - Committing offsets to Kafka takes longer than the checkpoint interval. Skipping commit of previous offsets because newer complete checkpoint offsets are available. This does not compromise Flink's checkpoint integrity. {noformat} The code was simplified be remove business {code:java} val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStateBackend(new FsStateBackend("s3://pp-andy-test/checkpoint")) env.enableCheckpointing(6000, CheckpointingMode.AT_LEAST_ONCE) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE) env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) val properties = new Properties() properties.setProperty("group.id", "my_groupid") //properties.setProperty("enable.auto.commit", "false") val consumer = new FlinkKafkaConsumer011[ObjectNode]("my_topic", new JSONKeyValueDeserializationSchema(true), properties).setStartFromGroupOffsets().setCommitOffsetsOnCheckpoints(true) val stream = env.addSource(consumer) stream.map(new MapFunction[ObjectNode, Either[(Exception, ObjectNode), (Int, ujson.Value)]] { override def map(node:ObjectNode): scala.Either[(Exception, ObjectNode), (Int, ujson.Value)] = { logger.info("################## %s".format(node.get("metadata").toString)) Thread.sleep(3000) return Right(200, writeJs(node.toString)) } }).print() env.execute("pp_convoy_flink") } {code} > Kafka consumer can not commit offset at checkpoint > -------------------------------------------------- > > Key: FLINK-11335 > URL: https://issues.apache.org/jira/browse/FLINK-11335 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.6.2 > Environment: AWS EMR 5.20: hadoop, flink plugin > flink: 1.62 > run under yarn-cluster > Kafka cluster: 1.0 > > Reporter: andy hoang > Priority: Critical > > When trying to commit offset to kafka, I always get warning > {noformat} > 2019-01-15 11:18:55,405 WARN > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher - > Committing offsets to Kafka takes longer than the checkpoint interval. > Skipping commit of previous offsets because newer complete checkpoint offsets > are available. This does not compromise Flink's checkpoint integrity. > {noformat} > The result is not commiting any message to kafka > The code was simplified be remove business > {code:java} > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.setStateBackend(new FsStateBackend("s3://pp-andy-test/checkpoint")) > env.enableCheckpointing(6000, CheckpointingMode.AT_LEAST_ONCE) > env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE) > env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) > env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) > > env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) > val properties = new Properties() > properties.setProperty("group.id", "my_groupid") > //properties.setProperty("enable.auto.commit", "false") > val consumer = new FlinkKafkaConsumer011[ObjectNode]("my_topic", > new JSONKeyValueDeserializationSchema(true), > > properties).setStartFromGroupOffsets().setCommitOffsetsOnCheckpoints(true) > val stream = env.addSource(consumer) > > stream.map(new MapFunction[ObjectNode, Either[(Exception, ObjectNode), > (Int, ujson.Value)]] { > override def map(node:ObjectNode): scala.Either[(Exception, > ObjectNode), (Int, ujson.Value)] = { > logger.info("################## > %s".format(node.get("metadata").toString)) > Thread.sleep(3000) > return Right(200, writeJs(node.toString)) > } > }).print() > env.execute("pp_convoy_flink") > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)