[ 
https://issues.apache.org/jira/browse/FLINK-11335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16780105#comment-16780105
 ] 

andy hoang edited comment on FLINK-11335 at 2/28/19 4:55 AM:
-------------------------------------------------------------

[~dawidwys]Thanks for reply, I dont see any async commit failed message, whole 
list of log will look like below, I also update to flink 1.7.0 and kafka 
consumer to latest, still got the same problem.
 ```
 ~<dependency>~
 ~<groupId>org.apache.flink</groupId>~
 ~<artifactId>flink-connector-kafka_2.11</artifactId>~
 ~<version>1.7.0</version>~
 ~</dependency>~
 ```


 The warning about `MultipartUploadOutputStream` is quite ambigous

All log here:
 Right after deploy the app:
 ```==> container_1551327455160_0002_01_000001/jobmanager.log <==
 2019-02-28 04:42:34,492 INFO org.apache.flink.yarn.YarnResourceManager - 
Registering TaskManager with ResourceID container_1551327455160_0002_01_000002 
(akka.tcp://fl...@ip-10-16-1-215.ap-southeast-1.compute.internal:44931/user/taskmanager_0)
 at ResourceManager
 2019-02-28 04:42:34,693 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source 
-> Map -> Sink: Print to Std. Out (1/1) (850716ec4421c9e70852a0eba5975f01) 
switched from SCHEDULED to DEPLOYING.
 2019-02-28 04:42:34,694 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Source: 
Custom Source -> Map -> Sink: Print to Std. Out (1/1) (attempt #0) to 
ip-10-16-1-215
 2019-02-28 04:42:35,247 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source 
-> Map -> Sink: Print to Std. Out (1/1) (850716ec4421c9e70852a0eba5975f01) 
switched from DEPLOYING to RUNNING.
 2019-02-28 04:42:35,592 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 1 @ 1551328955432 for job 40dd9e3dba228623226fcf3bda0d1c0a.
 2019-02-28 04:42:35,662 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 
1 by task 850716ec4421c9e70852a0eba5975f01 of job 
40dd9e3dba228623226fcf3bda0d1c0a.
 2019-02-28 04:42:35,663 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding 
checkpoint 1 of job 40dd9e3dba228623226fcf3bda0d1c0a.
 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException:
 Task Source: Custom Source -> Map -> Sink: Print to Std. Out (1/1) was not 
running
 at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1166)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
 2019-02-28 04:42:41,484 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 2 @ 1551328961432 for job 40dd9e3dba228623226fcf3bda0d1c0a.
 2019-02-28 04:42:46,555 INFO 
com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream - close 
closed:false 
s3://pp-andy-test/checkpoint/40dd9e3dba228623226fcf3bda0d1c0a/chk-2/_metadata
 2019-02-28 04:42:46,588 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 2 for job 40dd9e3dba228623226fcf3bda0d1c0a (8091 bytes in 5153 ms).
 ```

Then a lot of log like this after running for a while:
 ```

==> 
application_1551327455160_0002/container_1551327455160_0002_01_000002/taskmanager.log
 <==
 2019-02-28 04:46:19,327 WARN 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher - Committing 
offsets to Kafka takes longer than the checkpoint interval. Skipping commit of 
previous offsets because newer complete checkpoint offsets are available. This 
does not compromise Flink's checkpoint integrity.
 2019-02-28 04:46:19,329 INFO activity - ##################

{"offset":1037251,"topic":"my_topic","partition":11}

<====== my log to check offset if it is reprocessed

==> 
application_1551327455160_0002/container_1551327455160_0002_01_000001/jobmanager.log
 <==
 2019-02-28 04:46:20,134 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 33 @ 1551329180078 for job 40dd9e3dba228623226fcf3bda0d1c0a.

==> 
application_1551327455160_0002/container_1551327455160_0002_01_000002/taskmanager.log
 <==
 2019-02-28 04:46:22,330 INFO activity - ##################

{"offset":1037252,"topic":"my_topic","partition":11}

==> 
application_1551327455160_0002/container_1551327455160_0002_01_000002/taskmanager.out
 <==
 Right((200,"MESSAGEEEEEE DETAIL"))

==> 
application_1551327455160_0002/container_1551327455160_0002_01_000002/taskmanager.log
 <==
 2019-02-28 04:46:25,335 INFO activity - ##################

{"offset":1037253,"topic":"my_topic","partition":11}

2019-02-28 04:46:25,394 INFO 
com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream - close 
closed:false 
s3://pp-andy-test/checkpoint/40dd9e3dba228623226fcf3bda0d1c0a/chk-33/70597bc7-728b-4c86-9488-2439f562fc98

==> 
application_1551327455160_0002/container_1551327455160_0002_01_000001/jobmanager.log
 <==
 2019-02-28 04:46:25,443 INFO 
com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream - close 
closed:false 
s3://pp-andy-test/checkpoint/40dd9e3dba228623226fcf3bda0d1c0a/chk-33/_metadata
 2019-02-28 04:46:25,549 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 33 for job 40dd9e3dba228623226fcf3bda0d1c0a (8091 bytes in 5397 ms).
 2019-02-28 04:46:26,132 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 34 @ 1551329186078 for job 40dd9e3dba228623226fcf3bda0d1c0a.
 ```


was (Author: andyhoang):
[~dawidwys]Thanks for reply, I dont see any async commit failed message, whole 
list of log will look like below, I also update to flink 1.7.0 and kafka 
consumer to latest, still got the same problem.
```
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.7.0</version>
</dependency>
```
The warning about `MultipartUploadOutputStream` is quite ambigous

All log here:
Right after deploy the app:
```==> container_1551327455160_0002_01_000001/jobmanager.log <==
2019-02-28 04:42:34,492 INFO  org.apache.flink.yarn.YarnResourceManager         
            - Registering TaskManager with ResourceID 
container_1551327455160_0002_01_000002 
(akka.tcp://fl...@ip-10-16-1-215.ap-southeast-1.compute.internal:44931/user/taskmanager_0)
 at ResourceManager
2019-02-28 04:42:34,693 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom 
Source -> Map -> Sink: Print to Std. Out (1/1) 
(850716ec4421c9e70852a0eba5975f01) switched from SCHEDULED to DEPLOYING.
2019-02-28 04:42:34,694 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying 
Source: Custom Source -> Map -> Sink: Print to Std. Out (1/1) (attempt #0) to 
ip-10-16-1-215
2019-02-28 04:42:35,247 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom 
Source -> Map -> Sink: Print to Std. Out (1/1) 
(850716ec4421c9e70852a0eba5975f01) switched from DEPLOYING to RUNNING.
2019-02-28 04:42:35,592 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering 
checkpoint 1 @ 1551328955432 for job 40dd9e3dba228623226fcf3bda0d1c0a.
2019-02-28 04:42:35,662 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Decline 
checkpoint 1 by task 850716ec4421c9e70852a0eba5975f01 of job 
40dd9e3dba228623226fcf3bda0d1c0a.
2019-02-28 04:42:35,663 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Discarding 
checkpoint 1 of job 40dd9e3dba228623226fcf3bda0d1c0a.
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException:
 Task Source: Custom Source -> Map -> Sink: Print to Std. Out (1/1) was not 
running
        at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1166)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
2019-02-28 04:42:41,484 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering 
checkpoint 2 @ 1551328961432 for job 40dd9e3dba228623226fcf3bda0d1c0a.
2019-02-28 04:42:46,555 INFO  
com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream   - close 
closed:false 
s3://pp-andy-test/checkpoint/40dd9e3dba228623226fcf3bda0d1c0a/chk-2/_metadata
2019-02-28 04:42:46,588 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed 
checkpoint 2 for job 40dd9e3dba228623226fcf3bda0d1c0a (8091 bytes in 5153 ms).
```

Then a lot of log like this after running for a while:
```

==> 
application_1551327455160_0002/container_1551327455160_0002_01_000002/taskmanager.log
 <==
2019-02-28 04:46:19,327 WARN  
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher  - Committing 
offsets to Kafka takes longer than the checkpoint interval. Skipping commit of 
previous offsets because newer complete checkpoint offsets are available. This 
does not compromise Flink's checkpoint integrity.
2019-02-28 04:46:19,329 INFO  activity                                          
            - ################## 
{"offset":1037251,"topic":"my_topic","partition":11} <====== my log to check 
offset if it is  reprocessed

==> 
application_1551327455160_0002/container_1551327455160_0002_01_000001/jobmanager.log
 <==
2019-02-28 04:46:20,134 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering 
checkpoint 33 @ 1551329180078 for job 40dd9e3dba228623226fcf3bda0d1c0a.


==> 
application_1551327455160_0002/container_1551327455160_0002_01_000002/taskmanager.log
 <==
2019-02-28 04:46:22,330 INFO  activity                                          
            - ################## 
{"offset":1037252,"topic":"my_topic","partition":11}

==> 
application_1551327455160_0002/container_1551327455160_0002_01_000002/taskmanager.out
 <==
Right((200,"MESSAGEEEEEE DETAIL"))

==> 
application_1551327455160_0002/container_1551327455160_0002_01_000002/taskmanager.log
 <==
2019-02-28 04:46:25,335 INFO  activity                                          
            - ################## 
{"offset":1037253,"topic":"my_topic","partition":11}
2019-02-28 04:46:25,394 INFO  
com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream   - close 
closed:false 
s3://pp-andy-test/checkpoint/40dd9e3dba228623226fcf3bda0d1c0a/chk-33/70597bc7-728b-4c86-9488-2439f562fc98

==> 
application_1551327455160_0002/container_1551327455160_0002_01_000001/jobmanager.log
 <==
2019-02-28 04:46:25,443 INFO  
com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream   - close 
closed:false 
s3://pp-andy-test/checkpoint/40dd9e3dba228623226fcf3bda0d1c0a/chk-33/_metadata
2019-02-28 04:46:25,549 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed 
checkpoint 33 for job 40dd9e3dba228623226fcf3bda0d1c0a (8091 bytes in 5397 ms).
2019-02-28 04:46:26,132 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering 
checkpoint 34 @ 1551329186078 for job 40dd9e3dba228623226fcf3bda0d1c0a.
```

> Kafka consumer can not commit offset at checkpoint
> --------------------------------------------------
>
>                 Key: FLINK-11335
>                 URL: https://issues.apache.org/jira/browse/FLINK-11335
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.6.2
>         Environment: AWS EMR 5.20: hadoop, flink plugin
> flink: 1.62
> run under yarn-cluster
> Kafka cluster: 1.0
>  
>            Reporter: andy hoang
>            Priority: Critical
>
> When trying to commit offset to kafka, I always get warning
> {noformat}
> 2019-01-15 11:18:55,405 WARN  
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher  - 
> Committing offsets to Kafka takes longer than the checkpoint interval. 
> Skipping commit of previous offsets because newer complete checkpoint offsets 
> are available. This does not compromise Flink's checkpoint integrity.
> {noformat}
> The result is not commiting any message to kafka
> The code was simplified be remove business
> {code:java}
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     env.setStateBackend(new FsStateBackend("s3://pp-andy-test/checkpoint"))
>     env.enableCheckpointing(6000, CheckpointingMode.AT_LEAST_ONCE)
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>     env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
>     env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>   
> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>     val properties = new Properties()
>     properties.setProperty("group.id", "my_groupid")
>     //properties.setProperty("enable.auto.commit", "false")
>     val consumer = new FlinkKafkaConsumer011[ObjectNode]("my_topic",
>       new JSONKeyValueDeserializationSchema(true),
>       
> properties).setStartFromGroupOffsets().setCommitOffsetsOnCheckpoints(true)
>     val stream = env.addSource(consumer)
>     
>     stream.map(new MapFunction[ObjectNode, Either[(Exception, ObjectNode), 
> (Int, ujson.Value)]] {
>       override def map(node:ObjectNode): scala.Either[(Exception, 
> ObjectNode), (Int, ujson.Value)] = {
>           logger.info("################## 
> %s".format(node.get("metadata").toString))
>           Thread.sleep(3000)
>           return Right(200, writeJs(node.toString))
>       }
>     }).print()
>     env.execute("pp_convoy_flink")
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to