[ https://issues.apache.org/jira/browse/FLINK-33932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17800806#comment-17800806 ]
Martijn Visser commented on FLINK-33932: ---------------------------------------- [~xiangyu0xf] If I'm understanding you correctly, you don't want to make this configurable but enable this by default, without the option to disable it? > Support retry mechanism for rocksdb uploader > -------------------------------------------- > > Key: FLINK-33932 > URL: https://issues.apache.org/jira/browse/FLINK-33932 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing > Reporter: Guojun Li > Priority: Major > Labels: pull-request-available > > Rocksdb uploader will throw exception and decline the current checkpoint if > there are errors when uploading to remote file system like hdfs. > The exception is as below: > 2023-12-19 08:46:00,197 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Decline > checkpoint 2 by task > 5b008c2c048fa8534d648455e69f9497_fbcce2f96483f8f15e87dc6c9afd372f_183_0 of > job ffffffffa025f19e0000000000000000 at > application-6f1c6e3d-1702480803995-5093022-taskmanager-1-1 @ > fdbd:dc61:1a:101:0:0:0:36 (dataPort=38789). > org.apache.flink.util.SerializedThrowable: > org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous task > checkpoint failed. > at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:320) > ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155) > ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > ~[?:?] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > ~[?:?] > at java.lang.Thread.run(Thread.java:829) [?:?] > Caused by: org.apache.flink.util.SerializedThrowable: java.lang.Exception: > Could not materialize checkpoint 2 for operator GlobalGroupAggregate[132] -> > Calc[133] (184/500)#0. > at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:298) > ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] > ... 4 more > Caused by: org.apache.flink.util.SerializedThrowable: > java.util.concurrent.ExecutionException: java.io.IOException: Could not flush > to file and close the file system output stream to > hdfs://xxx/shared/97329cbd-fb90-45ca-92de-edf52d6e6fc0 in order to obtain the > stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?] > at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?] > at > org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:544) > ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] > at > org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54) > ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191) > ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) > ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] > ... 3 more > Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: > Could not flush to file and close the file system output stream to > hdfs://xxx/shared/97329cbd-fb90-45ca-92de-edf52d6e6fc0 in order to obtain the > stream state handle > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:516) > ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] > at > org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:157) > ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] > at > org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:113) > ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] > at > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:32) > ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT] > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) > ~[?:?] > ... 3 more > Caused by: org.apache.flink.util.SerializedThrowable: > java.net.ConnectException: Connection timed out > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:?] > at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777) > ~[?:?] > at > org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206) > ~[hadoop-common-2.6.0-cdh5.4.4.jar:?] > at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:532) > ~[hadoop-common-2.6.0-cdh5.4.4.jar:?] > at > org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1835) > ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?] > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.transfer(DFSOutputStream.java:1268) > ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?] > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:1257) > ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?] > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1414) > ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?] > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:1149) > ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?] > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:652) > ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?] > > We can support retry mechanism for rocksdb uploader to decrease the failure > rate of checkpointing in the async phase. -- This message was sent by Atlassian Jira (v8.20.10#820010)