[ 
https://issues.apache.org/jira/browse/FLINK-11335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

andy hoang updated FLINK-11335:
-------------------------------
    Description: 
When trying to commit offset to kafka, I always get warning
{noformat}
2019-01-15 11:18:55,405 WARN  
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher  - 
Committing offsets to Kafka takes longer than the checkpoint interval. Skipping 
commit of previous offsets because newer complete checkpoint offsets are 
available. This does not compromise Flink's checkpoint integrity.
{noformat}
The result is not commiting any message to kafka
The code was simplified be remove business

{code:java}

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStateBackend(new FsStateBackend("s3://pp-andy-test/checkpoint"))
    env.enableCheckpointing(6000, CheckpointingMode.AT_LEAST_ONCE)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
  
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    val properties = new Properties()

    properties.setProperty("group.id", "my_groupid")
    //properties.setProperty("enable.auto.commit", "false")

    val consumer = new FlinkKafkaConsumer011[ObjectNode]("my_topic",
      new JSONKeyValueDeserializationSchema(true),
      properties).setStartFromGroupOffsets().setCommitOffsetsOnCheckpoints(true)
    val stream = env.addSource(consumer)
    
    stream.map(new MapFunction[ObjectNode, Either[(Exception, ObjectNode), 
(Int, ujson.Value)]] {

      override def map(node:ObjectNode): scala.Either[(Exception, ObjectNode), 
(Int, ujson.Value)] = {
          logger.info("################## 
%s".format(node.get("metadata").toString))
          Thread.sleep(3000)
          return Right(200, writeJs(node.toString))
      }
    }).print()
    env.execute("pp_convoy_flink")
  }
{code}

  was:
When trying to commit offset to kafka, I always get warning
{noformat}
2019-01-15 11:18:55,405 WARN  
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher  - 
Committing offsets to Kafka takes longer than the checkpoint interval. Skipping 
commit of previous offsets because newer complete checkpoint offsets are 
available. This does not compromise Flink's checkpoint integrity.
{noformat}
The code was simplified be remove business

{code:java}

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStateBackend(new FsStateBackend("s3://pp-andy-test/checkpoint"))
    env.enableCheckpointing(6000, CheckpointingMode.AT_LEAST_ONCE)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
  
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    val properties = new Properties()

    properties.setProperty("group.id", "my_groupid")
    //properties.setProperty("enable.auto.commit", "false")

    val consumer = new FlinkKafkaConsumer011[ObjectNode]("my_topic",
      new JSONKeyValueDeserializationSchema(true),
      properties).setStartFromGroupOffsets().setCommitOffsetsOnCheckpoints(true)
    val stream = env.addSource(consumer)
    
    stream.map(new MapFunction[ObjectNode, Either[(Exception, ObjectNode), 
(Int, ujson.Value)]] {

      override def map(node:ObjectNode): scala.Either[(Exception, ObjectNode), 
(Int, ujson.Value)] = {
          logger.info("################## 
%s".format(node.get("metadata").toString))
          Thread.sleep(3000)
          return Right(200, writeJs(node.toString))
      }
    }).print()
    env.execute("pp_convoy_flink")
  }
{code}


> Kafka consumer can not commit offset at checkpoint
> --------------------------------------------------
>
>                 Key: FLINK-11335
>                 URL: https://issues.apache.org/jira/browse/FLINK-11335
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.6.2
>         Environment: AWS EMR 5.20: hadoop, flink plugin
> flink: 1.62
> run under yarn-cluster
> Kafka cluster: 1.0
>  
>            Reporter: andy hoang
>            Priority: Critical
>
> When trying to commit offset to kafka, I always get warning
> {noformat}
> 2019-01-15 11:18:55,405 WARN  
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher  - 
> Committing offsets to Kafka takes longer than the checkpoint interval. 
> Skipping commit of previous offsets because newer complete checkpoint offsets 
> are available. This does not compromise Flink's checkpoint integrity.
> {noformat}
> The result is not commiting any message to kafka
> The code was simplified be remove business
> {code:java}
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     env.setStateBackend(new FsStateBackend("s3://pp-andy-test/checkpoint"))
>     env.enableCheckpointing(6000, CheckpointingMode.AT_LEAST_ONCE)
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>     env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
>     env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>   
> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>     val properties = new Properties()
>     properties.setProperty("group.id", "my_groupid")
>     //properties.setProperty("enable.auto.commit", "false")
>     val consumer = new FlinkKafkaConsumer011[ObjectNode]("my_topic",
>       new JSONKeyValueDeserializationSchema(true),
>       
> properties).setStartFromGroupOffsets().setCommitOffsetsOnCheckpoints(true)
>     val stream = env.addSource(consumer)
>     
>     stream.map(new MapFunction[ObjectNode, Either[(Exception, ObjectNode), 
> (Int, ujson.Value)]] {
>       override def map(node:ObjectNode): scala.Either[(Exception, 
> ObjectNode), (Int, ujson.Value)] = {
>           logger.info("################## 
> %s".format(node.get("metadata").toString))
>           Thread.sleep(3000)
>           return Right(200, writeJs(node.toString))
>       }
>     }).print()
>     env.execute("pp_convoy_flink")
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to