[ https://issues.apache.org/jira/browse/FLINK-11335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16780105#comment-16780105 ]
andy hoang edited comment on FLINK-11335 at 2/28/19 4:55 AM: ------------------------------------------------------------- [~dawidwys]Thanks for reply, I dont see any async commit failed message, whole list of log will look like below, I also update to flink 1.7.0 and kafka consumer to latest, still got the same problem. ``` ~<dependency>~ ~<groupId>org.apache.flink</groupId>~ ~<artifactId>flink-connector-kafka_2.11</artifactId>~ ~<version>1.7.0</version>~ ~</dependency>~ ``` The warning about `MultipartUploadOutputStream` is quite ambigous All log here: Right after deploy the app: ```==> container_1551327455160_0002_01_000001/jobmanager.log <== 2019-02-28 04:42:34,492 INFO org.apache.flink.yarn.YarnResourceManager - Registering TaskManager with ResourceID container_1551327455160_0002_01_000002 (akka.tcp://fl...@ip-10-16-1-215.ap-southeast-1.compute.internal:44931/user/taskmanager_0) at ResourceManager 2019-02-28 04:42:34,693 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Map -> Sink: Print to Std. Out (1/1) (850716ec4421c9e70852a0eba5975f01) switched from SCHEDULED to DEPLOYING. 2019-02-28 04:42:34,694 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Source: Custom Source -> Map -> Sink: Print to Std. Out (1/1) (attempt #0) to ip-10-16-1-215 2019-02-28 04:42:35,247 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Map -> Sink: Print to Std. Out (1/1) (850716ec4421c9e70852a0eba5975f01) switched from DEPLOYING to RUNNING. 2019-02-28 04:42:35,592 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1 @ 1551328955432 for job 40dd9e3dba228623226fcf3bda0d1c0a. 2019-02-28 04:42:35,662 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 1 by task 850716ec4421c9e70852a0eba5975f01 of job 40dd9e3dba228623226fcf3bda0d1c0a. 2019-02-28 04:42:35,663 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 1 of job 40dd9e3dba228623226fcf3bda0d1c0a. org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException: Task Source: Custom Source -> Map -> Sink: Print to Std. Out (1/1) was not running at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1166) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2019-02-28 04:42:41,484 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 2 @ 1551328961432 for job 40dd9e3dba228623226fcf3bda0d1c0a. 2019-02-28 04:42:46,555 INFO com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream - close closed:false s3://pp-andy-test/checkpoint/40dd9e3dba228623226fcf3bda0d1c0a/chk-2/_metadata 2019-02-28 04:42:46,588 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 2 for job 40dd9e3dba228623226fcf3bda0d1c0a (8091 bytes in 5153 ms). ``` Then a lot of log like this after running for a while: ``` ==> application_1551327455160_0002/container_1551327455160_0002_01_000002/taskmanager.log <== 2019-02-28 04:46:19,327 WARN org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher - Committing offsets to Kafka takes longer than the checkpoint interval. Skipping commit of previous offsets because newer complete checkpoint offsets are available. This does not compromise Flink's checkpoint integrity. 2019-02-28 04:46:19,329 INFO activity - ################## {"offset":1037251,"topic":"my_topic","partition":11} <====== my log to check offset if it is reprocessed ==> application_1551327455160_0002/container_1551327455160_0002_01_000001/jobmanager.log <== 2019-02-28 04:46:20,134 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 33 @ 1551329180078 for job 40dd9e3dba228623226fcf3bda0d1c0a. ==> application_1551327455160_0002/container_1551327455160_0002_01_000002/taskmanager.log <== 2019-02-28 04:46:22,330 INFO activity - ################## {"offset":1037252,"topic":"my_topic","partition":11} ==> application_1551327455160_0002/container_1551327455160_0002_01_000002/taskmanager.out <== Right((200,"MESSAGEEEEEE DETAIL")) ==> application_1551327455160_0002/container_1551327455160_0002_01_000002/taskmanager.log <== 2019-02-28 04:46:25,335 INFO activity - ################## {"offset":1037253,"topic":"my_topic","partition":11} 2019-02-28 04:46:25,394 INFO com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream - close closed:false s3://pp-andy-test/checkpoint/40dd9e3dba228623226fcf3bda0d1c0a/chk-33/70597bc7-728b-4c86-9488-2439f562fc98 ==> application_1551327455160_0002/container_1551327455160_0002_01_000001/jobmanager.log <== 2019-02-28 04:46:25,443 INFO com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream - close closed:false s3://pp-andy-test/checkpoint/40dd9e3dba228623226fcf3bda0d1c0a/chk-33/_metadata 2019-02-28 04:46:25,549 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 33 for job 40dd9e3dba228623226fcf3bda0d1c0a (8091 bytes in 5397 ms). 2019-02-28 04:46:26,132 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 34 @ 1551329186078 for job 40dd9e3dba228623226fcf3bda0d1c0a. ``` was (Author: andyhoang): [~dawidwys]Thanks for reply, I dont see any async commit failed message, whole list of log will look like below, I also update to flink 1.7.0 and kafka consumer to latest, still got the same problem. ``` <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.7.0</version> </dependency> ``` The warning about `MultipartUploadOutputStream` is quite ambigous All log here: Right after deploy the app: ```==> container_1551327455160_0002_01_000001/jobmanager.log <== 2019-02-28 04:42:34,492 INFO org.apache.flink.yarn.YarnResourceManager - Registering TaskManager with ResourceID container_1551327455160_0002_01_000002 (akka.tcp://fl...@ip-10-16-1-215.ap-southeast-1.compute.internal:44931/user/taskmanager_0) at ResourceManager 2019-02-28 04:42:34,693 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Map -> Sink: Print to Std. Out (1/1) (850716ec4421c9e70852a0eba5975f01) switched from SCHEDULED to DEPLOYING. 2019-02-28 04:42:34,694 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Source: Custom Source -> Map -> Sink: Print to Std. Out (1/1) (attempt #0) to ip-10-16-1-215 2019-02-28 04:42:35,247 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Map -> Sink: Print to Std. Out (1/1) (850716ec4421c9e70852a0eba5975f01) switched from DEPLOYING to RUNNING. 2019-02-28 04:42:35,592 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1 @ 1551328955432 for job 40dd9e3dba228623226fcf3bda0d1c0a. 2019-02-28 04:42:35,662 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 1 by task 850716ec4421c9e70852a0eba5975f01 of job 40dd9e3dba228623226fcf3bda0d1c0a. 2019-02-28 04:42:35,663 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 1 of job 40dd9e3dba228623226fcf3bda0d1c0a. org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException: Task Source: Custom Source -> Map -> Sink: Print to Std. Out (1/1) was not running at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1166) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2019-02-28 04:42:41,484 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 2 @ 1551328961432 for job 40dd9e3dba228623226fcf3bda0d1c0a. 2019-02-28 04:42:46,555 INFO com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream - close closed:false s3://pp-andy-test/checkpoint/40dd9e3dba228623226fcf3bda0d1c0a/chk-2/_metadata 2019-02-28 04:42:46,588 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 2 for job 40dd9e3dba228623226fcf3bda0d1c0a (8091 bytes in 5153 ms). ``` Then a lot of log like this after running for a while: ``` ==> application_1551327455160_0002/container_1551327455160_0002_01_000002/taskmanager.log <== 2019-02-28 04:46:19,327 WARN org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher - Committing offsets to Kafka takes longer than the checkpoint interval. Skipping commit of previous offsets because newer complete checkpoint offsets are available. This does not compromise Flink's checkpoint integrity. 2019-02-28 04:46:19,329 INFO activity - ################## {"offset":1037251,"topic":"my_topic","partition":11} <====== my log to check offset if it is reprocessed ==> application_1551327455160_0002/container_1551327455160_0002_01_000001/jobmanager.log <== 2019-02-28 04:46:20,134 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 33 @ 1551329180078 for job 40dd9e3dba228623226fcf3bda0d1c0a. ==> application_1551327455160_0002/container_1551327455160_0002_01_000002/taskmanager.log <== 2019-02-28 04:46:22,330 INFO activity - ################## {"offset":1037252,"topic":"my_topic","partition":11} ==> application_1551327455160_0002/container_1551327455160_0002_01_000002/taskmanager.out <== Right((200,"MESSAGEEEEEE DETAIL")) ==> application_1551327455160_0002/container_1551327455160_0002_01_000002/taskmanager.log <== 2019-02-28 04:46:25,335 INFO activity - ################## {"offset":1037253,"topic":"my_topic","partition":11} 2019-02-28 04:46:25,394 INFO com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream - close closed:false s3://pp-andy-test/checkpoint/40dd9e3dba228623226fcf3bda0d1c0a/chk-33/70597bc7-728b-4c86-9488-2439f562fc98 ==> application_1551327455160_0002/container_1551327455160_0002_01_000001/jobmanager.log <== 2019-02-28 04:46:25,443 INFO com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream - close closed:false s3://pp-andy-test/checkpoint/40dd9e3dba228623226fcf3bda0d1c0a/chk-33/_metadata 2019-02-28 04:46:25,549 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 33 for job 40dd9e3dba228623226fcf3bda0d1c0a (8091 bytes in 5397 ms). 2019-02-28 04:46:26,132 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 34 @ 1551329186078 for job 40dd9e3dba228623226fcf3bda0d1c0a. ``` > Kafka consumer can not commit offset at checkpoint > -------------------------------------------------- > > Key: FLINK-11335 > URL: https://issues.apache.org/jira/browse/FLINK-11335 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.6.2 > Environment: AWS EMR 5.20: hadoop, flink plugin > flink: 1.62 > run under yarn-cluster > Kafka cluster: 1.0 > > Reporter: andy hoang > Priority: Critical > > When trying to commit offset to kafka, I always get warning > {noformat} > 2019-01-15 11:18:55,405 WARN > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher - > Committing offsets to Kafka takes longer than the checkpoint interval. > Skipping commit of previous offsets because newer complete checkpoint offsets > are available. This does not compromise Flink's checkpoint integrity. > {noformat} > The result is not commiting any message to kafka > The code was simplified be remove business > {code:java} > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.setStateBackend(new FsStateBackend("s3://pp-andy-test/checkpoint")) > env.enableCheckpointing(6000, CheckpointingMode.AT_LEAST_ONCE) > env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE) > env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) > env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) > > env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) > val properties = new Properties() > properties.setProperty("group.id", "my_groupid") > //properties.setProperty("enable.auto.commit", "false") > val consumer = new FlinkKafkaConsumer011[ObjectNode]("my_topic", > new JSONKeyValueDeserializationSchema(true), > > properties).setStartFromGroupOffsets().setCommitOffsetsOnCheckpoints(true) > val stream = env.addSource(consumer) > > stream.map(new MapFunction[ObjectNode, Either[(Exception, ObjectNode), > (Int, ujson.Value)]] { > override def map(node:ObjectNode): scala.Either[(Exception, > ObjectNode), (Int, ujson.Value)] = { > logger.info("################## > %s".format(node.get("metadata").toString)) > Thread.sleep(3000) > return Right(200, writeJs(node.toString)) > } > }).print() > env.execute("pp_convoy_flink") > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)