Guojun Li created FLINK-33932: --------------------------------- Summary: Support retry mechanism for rocksdb uploader Key: FLINK-33932 URL: https://issues.apache.org/jira/browse/FLINK-33932 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing Reporter: Guojun Li
Rocksdb uploader will throw exception and decline the current checkpoint if there are errors when uploading to remote file system like hdfs. The exception is as below: 2023-12-19 08:46:00,197 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Decline checkpoint 2 by task 5b008c2c048fa8534d648455e69f9497_fbcce2f96483f8f15e87dc6c9afd372f_183_0 of job ffffffffa025f19e0000000000000000 at application-6f1c6e3d-1702480803995-5093022-taskmanager-1-1 @ fdbd:dc61:1a:101:0:0:0:36 (dataPort=38789). org.apache.flink.util.SerializedThrowable: org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous task checkpoint failed. at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:320) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?] at java.lang.Thread.run(Thread.java:829) [?:?] Caused by: org.apache.flink.util.SerializedThrowable: java.lang.Exception: Could not materialize checkpoint 2 for operator GlobalGroupAggregate[132] -> Calc[133] (184/500)#0. at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:298) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] ... 4 more Caused by: org.apache.flink.util.SerializedThrowable: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush to file and close the file system output stream to hdfs://xxx/shared/97329cbd-fb90-45ca-92de-edf52d6e6fc0 in order to obtain the stream state handle at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?] at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?] at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:544) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] ... 3 more Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: Could not flush to file and close the file system output stream to hdfs://xxx/shared/97329cbd-fb90-45ca-92de-edf52d6e6fc0 in order to obtain the stream state handle at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:516) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] at org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:157) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] at org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:113) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:32) ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) ~[?:?] ... 3 more Caused by: org.apache.flink.util.SerializedThrowable: java.net.ConnectException: Connection timed out at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:?] at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777) ~[?:?] at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206) ~[hadoop-common-2.6.0-cdh5.4.4.jar:?] at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:532) ~[hadoop-common-2.6.0-cdh5.4.4.jar:?] at org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1835) ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?] at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.transfer(DFSOutputStream.java:1268) ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?] at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:1257) ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?] at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1414) ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?] at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:1149) ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?] at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:652) ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?] We can support retry mechanism for rocksdb uploader to decrease the failure rate of checkpointing in the async phase. -- This message was sent by Atlassian Jira (v8.20.10#820010)