[ 
https://issues.apache.org/jira/browse/FLINK-11335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16780105#comment-16780105
 ] 

andy hoang edited comment on FLINK-11335 at 2/28/19 5:15 AM:
-------------------------------------------------------------

[~dawidwys]Thanks for reply, I dont see any async commit failed message.

>From the  kafka side, I'm sure that no offset is commited. Because every time 
>I start/cancel/restart it will reprocess the same message from same offset, 
>this can easily detect by grep the offset in logs.

I also try with auto commit offset, it worked as expected, but I dont really 
need that feature. The reason is: if the mapping step is slower the reading 
message step from kafka, it will auto commit un-processed message, if I 
stop/start the app again, messages will be lost.

 

 

whole list of log will look like below, I also update to flink 1.7.0 and kafka 
consumer to latest, still got the same problem.
{code:java}
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-connector-kafka_2.11</artifactId>
 <version>1.7.0</version>
 </dependency>{code}
The warning about `MultipartUploadOutputStream` is quite ambigous

All log here:
 Right after deploy the app:
{code:java}
==> container_1551327455160_0002_01_000001/jobmanager.log <==
2019-02-28 04:42:34,492 INFO org.apache.flink.yarn.YarnResourceManager - 
Registering TaskManager with ResourceID container_1551327455160_0002_01_000002 
(akka.tcp://fl...@ip-10-16-1-215.ap-southeast-1.compute.internal:44931/user/taskmanager_0)
 at ResourceManager
2019-02-28 04:42:34,693 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source 
-> Map -> Sink: Print to Std. Out (1/1) (850716ec4421c9e70852a0eba5975f01) 
switched from SCHEDULED to DEPLOYING.
2019-02-28 04:42:34,694 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Source: 
Custom Source -> Map -> Sink: Print to Std. Out (1/1) (attempt #0) to 
ip-10-16-1-215
2019-02-28 04:42:35,247 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source 
-> Map -> Sink: Print to Std. Out (1/1) (850716ec4421c9e70852a0eba5975f01) 
switched from DEPLOYING to RUNNING.
2019-02-28 04:42:35,592 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 1 @ 1551328955432 for job 40dd9e3dba228623226fcf3bda0d1c0a.
2019-02-28 04:42:35,662 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 
1 by task 850716ec4421c9e70852a0eba5975f01 of job 
40dd9e3dba228623226fcf3bda0d1c0a.
2019-02-28 04:42:35,663 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding 
checkpoint 1 of job 40dd9e3dba228623226fcf3bda0d1c0a.
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException:
 Task Source: Custom Source -> Map -> Sink: Print to Std. Out (1/1) was not 
running
at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1166)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2019-02-28 04:42:41,484 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 2 @ 1551328961432 for job 40dd9e3dba228623226fcf3bda0d1c0a.
2019-02-28 04:42:46,555 INFO 
com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream - close 
closed:false 
s3://pp-andy-test/checkpoint/40dd9e3dba228623226fcf3bda0d1c0a/chk-2/_metadata
2019-02-28 04:42:46,588 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 2 for job 40dd9e3dba228623226fcf3bda0d1c0a (8091 bytes in 5153 ms).
{code}
Then a lot of log like this after running for a while:

 
{code:java}
==> 
application_1551327455160_0002/container_1551327455160_0002_01_000002/taskmanager.log
 <==
 2019-02-28 04:46:19,327 WARN 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher - Committing 
offsets to Kafka takes longer than the checkpoint interval. Skipping commit of 
previous offsets because newer complete checkpoint offsets are available. This 
does not compromise Flink's checkpoint integrity.
 2019-02-28 04:46:19,329 INFO activity - ##################
{"offset":1037251,"topic":"my_topic","partition":11}<====== ##########my log, 
so I can grep if same offset is reprocessed
==> 
application_1551327455160_0002/container_1551327455160_0002_01_000001/jobmanager.log
 <==
 2019-02-28 04:46:20,134 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 33 @ 1551329180078 for job 40dd9e3dba228623226fcf3bda0d1c0a.
==> 
application_1551327455160_0002/container_1551327455160_0002_01_000002/taskmanager.log
 <==
 2019-02-28 04:46:22,330 INFO activity - ##################
{"offset":1037252,"topic":"my_topic","partition":11}
==> 
application_1551327455160_0002/container_1551327455160_0002_01_000002/taskmanager.out
 <==
 Right((200,"MESSAGEEEEEE DETAIL"))
==> 
application_1551327455160_0002/container_1551327455160_0002_01_000002/taskmanager.log
 <==
 2019-02-28 04:46:25,335 INFO activity - ##################
{"offset":1037253,"topic":"my_topic","partition":11}
2019-02-28 04:46:25,394 INFO 
com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream - close 
closed:false 
s3://pp-andy-test/checkpoint/40dd9e3dba228623226fcf3bda0d1c0a/chk-33/70597bc7-728b-4c86-9488-2439f562fc98
==> 
application_1551327455160_0002/container_1551327455160_0002_01_000001/jobmanager.log
 <==
 2019-02-28 04:46:25,443 INFO 
com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream - close 
closed:false 
s3://pp-andy-test/checkpoint/40dd9e3dba228623226fcf3bda0d1c0a/chk-33/_metadata
 2019-02-28 04:46:25,549 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 33 for job 40dd9e3dba228623226fcf3bda0d1c0a (8091 bytes in 5397 ms).
 2019-02-28 04:46:26,132 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 34 @ 1551329186078 for job 40dd9e3dba228623226fcf3bda0d1c0a.
{code}


was (Author: andyhoang):
[~dawidwys]Thanks for reply, I dont see any async commit failed message.

>From the  kafka side, I'm sure that no offset is commited. Because every time 
>I start/cancel/restart it will reprocess the same message from same offset, 
>this can easily detect by grep the offset in logs.

I also try with auto commit offset, it worked as expected, but I dont really 
need that feature. The reason is: if the mapping step is slower the reading 
message step from kafka, it will auto commit un-processed message, if I 
stop/start the app again, messages will be lost.

 

 

whole list of log will look like below, I also update to flink 1.7.0 and kafka 
consumer to latest, still got the same problem.
{code:java}
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-connector-kafka_2.11</artifactId>
 <version>1.7.0</version>
 </dependency>{code}
The warning about `MultipartUploadOutputStream` is quite ambigous

All log here:
 Right after deploy the app:
{code:java}
==> container_1551327455160_0002_01_000001/jobmanager.log <==
2019-02-28 04:42:34,492 INFO org.apache.flink.yarn.YarnResourceManager - 
Registering TaskManager with ResourceID container_1551327455160_0002_01_000002 
(akka.tcp://fl...@ip-10-16-1-215.ap-southeast-1.compute.internal:44931/user/taskmanager_0)
 at ResourceManager
2019-02-28 04:42:34,693 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source 
-> Map -> Sink: Print to Std. Out (1/1) (850716ec4421c9e70852a0eba5975f01) 
switched from SCHEDULED to DEPLOYING.
2019-02-28 04:42:34,694 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Source: 
Custom Source -> Map -> Sink: Print to Std. Out (1/1) (attempt #0) to 
ip-10-16-1-215
2019-02-28 04:42:35,247 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source 
-> Map -> Sink: Print to Std. Out (1/1) (850716ec4421c9e70852a0eba5975f01) 
switched from DEPLOYING to RUNNING.
2019-02-28 04:42:35,592 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 1 @ 1551328955432 for job 40dd9e3dba228623226fcf3bda0d1c0a.
2019-02-28 04:42:35,662 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 
1 by task 850716ec4421c9e70852a0eba5975f01 of job 
40dd9e3dba228623226fcf3bda0d1c0a.
2019-02-28 04:42:35,663 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding 
checkpoint 1 of job 40dd9e3dba228623226fcf3bda0d1c0a.
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException:
 Task Source: Custom Source -> Map -> Sink: Print to Std. Out (1/1) was not 
running
at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1166)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2019-02-28 04:42:41,484 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 2 @ 1551328961432 for job 40dd9e3dba228623226fcf3bda0d1c0a.
2019-02-28 04:42:46,555 INFO 
com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream - close 
closed:false 
s3://pp-andy-test/checkpoint/40dd9e3dba228623226fcf3bda0d1c0a/chk-2/_metadata
2019-02-28 04:42:46,588 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 2 for job 40dd9e3dba228623226fcf3bda0d1c0a (8091 bytes in 5153 ms).
{code}
Then a lot of log like this after running for a while:

 
{code:java}
==> 
application_1551327455160_0002/container_1551327455160_0002_01_000002/taskmanager.log
 <==
 2019-02-28 04:46:19,327 WARN 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher - Committing 
offsets to Kafka takes longer than the checkpoint interval. Skipping commit of 
previous offsets because newer complete checkpoint offsets are available. This 
does not compromise Flink's checkpoint integrity.
 2019-02-28 04:46:19,329 INFO activity - ##################
{"offset":1037251,"topic":"my_topic","partition":11}
<====== my log to check offset if it is reprocessed
==> 
application_1551327455160_0002/container_1551327455160_0002_01_000001/jobmanager.log
 <==
 2019-02-28 04:46:20,134 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 33 @ 1551329180078 for job 40dd9e3dba228623226fcf3bda0d1c0a.
==> 
application_1551327455160_0002/container_1551327455160_0002_01_000002/taskmanager.log
 <==
 2019-02-28 04:46:22,330 INFO activity - ##################
{"offset":1037252,"topic":"my_topic","partition":11}
==> 
application_1551327455160_0002/container_1551327455160_0002_01_000002/taskmanager.out
 <==
 Right((200,"MESSAGEEEEEE DETAIL"))
==> 
application_1551327455160_0002/container_1551327455160_0002_01_000002/taskmanager.log
 <==
 2019-02-28 04:46:25,335 INFO activity - ##################
{"offset":1037253,"topic":"my_topic","partition":11}
2019-02-28 04:46:25,394 INFO 
com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream - close 
closed:false 
s3://pp-andy-test/checkpoint/40dd9e3dba228623226fcf3bda0d1c0a/chk-33/70597bc7-728b-4c86-9488-2439f562fc98
==> 
application_1551327455160_0002/container_1551327455160_0002_01_000001/jobmanager.log
 <==
 2019-02-28 04:46:25,443 INFO 
com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream - close 
closed:false 
s3://pp-andy-test/checkpoint/40dd9e3dba228623226fcf3bda0d1c0a/chk-33/_metadata
 2019-02-28 04:46:25,549 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 33 for job 40dd9e3dba228623226fcf3bda0d1c0a (8091 bytes in 5397 ms).
 2019-02-28 04:46:26,132 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 34 @ 1551329186078 for job 40dd9e3dba228623226fcf3bda0d1c0a.
{code}

> Kafka consumer can not commit offset at checkpoint
> --------------------------------------------------
>
>                 Key: FLINK-11335
>                 URL: https://issues.apache.org/jira/browse/FLINK-11335
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.6.2
>         Environment: AWS EMR 5.20: hadoop, flink plugin
> flink: 1.62
> run under yarn-cluster
> Kafka cluster: 1.0
>  
>            Reporter: andy hoang
>            Priority: Critical
>
> When trying to commit offset to kafka, I always get warning
> {noformat}
> 2019-01-15 11:18:55,405 WARN  
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher  - 
> Committing offsets to Kafka takes longer than the checkpoint interval. 
> Skipping commit of previous offsets because newer complete checkpoint offsets 
> are available. This does not compromise Flink's checkpoint integrity.
> {noformat}
> The result is not commiting any message to kafka
> The code was simplified be remove business
> {code:java}
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     env.setStateBackend(new FsStateBackend("s3://pp-andy-test/checkpoint"))
>     env.enableCheckpointing(6000, CheckpointingMode.AT_LEAST_ONCE)
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>     env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
>     env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>   
> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>     val properties = new Properties()
>     properties.setProperty("group.id", "my_groupid")
>     //properties.setProperty("enable.auto.commit", "false")
>     val consumer = new FlinkKafkaConsumer011[ObjectNode]("my_topic",
>       new JSONKeyValueDeserializationSchema(true),
>       
> properties).setStartFromGroupOffsets().setCommitOffsetsOnCheckpoints(true)
>     val stream = env.addSource(consumer)
>     
>     stream.map(new MapFunction[ObjectNode, Either[(Exception, ObjectNode), 
> (Int, ujson.Value)]] {
>       override def map(node:ObjectNode): scala.Either[(Exception, 
> ObjectNode), (Int, ujson.Value)] = {
>           logger.info("################## 
> %s".format(node.get("metadata").toString))
>           Thread.sleep(3000)
>           return Right(200, writeJs(node.toString))
>       }
>     }).print()
>     env.execute("pp_convoy_flink")
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to