[ 
https://issues.apache.org/jira/browse/FLINK-33932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801162#comment-17801162
 ] 

xiangyu feng commented on FLINK-33932:
--------------------------------------

[~martijnvisser] AFAIK, this retry is not about "retry" checkpointing but to 
ensure that Checkpoint can successfully upload the local file to the remote 
storage during the asynchronous phase. For files that partially upload to 
remote but failed eventually, they won‘t be considered as part of this 
checkpoint because no file path will be returned. This retry mechanism works 
well in our production for the past two years without any correctness issues.

> Support retry mechanism for rocksdb uploader
> --------------------------------------------
>
>                 Key: FLINK-33932
>                 URL: https://issues.apache.org/jira/browse/FLINK-33932
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing
>            Reporter: Guojun Li
>            Priority: Major
>              Labels: pull-request-available
>
> Rocksdb uploader will throw exception and decline the current checkpoint if 
> there are errors when uploading to remote file system like hdfs.
> The exception is as below:
> 2023-12-19 08:46:00,197 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Decline 
> checkpoint 2 by task 
> 5b008c2c048fa8534d648455e69f9497_fbcce2f96483f8f15e87dc6c9afd372f_183_0 of 
> job ffffffffa025f19e0000000000000000 at 
> application-6f1c6e3d-1702480803995-5093022-taskmanager-1-1 @ 
> fdbd:dc61:1a:101:0:0:0:36 (dataPort=38789).
> org.apache.flink.util.SerializedThrowable: 
> org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous task 
> checkpoint failed.
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:320)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  ~[?:?]
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  ~[?:?]
>     at java.lang.Thread.run(Thread.java:829) [?:?]
> Caused by: org.apache.flink.util.SerializedThrowable: java.lang.Exception: 
> Could not materialize checkpoint 2 for operator GlobalGroupAggregate[132] -> 
> Calc[133] (184/500)#0.
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:298)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     ... 4 more
> Caused by: org.apache.flink.util.SerializedThrowable: 
> java.util.concurrent.ExecutionException: java.io.IOException: Could not flush 
> to file and close the file system output stream to 
> hdfs://xxx/shared/97329cbd-fb90-45ca-92de-edf52d6e6fc0 in order to obtain the 
> stream state handle
>     at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?]
>     at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?]
>     at 
> org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:544)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     ... 3 more
> Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: 
> Could not flush to file and close the file system output stream to 
> hdfs://xxx/shared/97329cbd-fb90-45ca-92de-edf52d6e6fc0 in order to obtain the 
> stream state handle
>     at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:516)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:157)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:113)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:32)
>  ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
>     at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
>  ~[?:?]
>     ... 3 more
> Caused by: org.apache.flink.util.SerializedThrowable: 
> java.net.ConnectException: Connection timed out
>     at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:?]
>     at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777) 
> ~[?:?]
>     at 
> org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
>  ~[hadoop-common-2.6.0-cdh5.4.4.jar:?]
>     at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:532) 
> ~[hadoop-common-2.6.0-cdh5.4.4.jar:?]
>     at 
> org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1835)
>  ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?]
>     at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.transfer(DFSOutputStream.java:1268)
>  ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?]
>     at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:1257)
>  ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?]
>     at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1414)
>  ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?]
>     at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:1149)
>  ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?]
>     at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:652)
>  ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?]
>  
> We can support retry mechanism for rocksdb uploader to decrease the failure 
> rate of checkpointing in the async phase.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to