[ https://issues.apache.org/jira/browse/FLINK-36125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Zakelly Lan reassigned FLINK-36125: ----------------------------------- Assignee: Zakelly Lan > File not found exception on restoring state handles with file merging > --------------------------------------------------------------------- > > Key: FLINK-36125 > URL: https://issues.apache.org/jira/browse/FLINK-36125 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing > Affects Versions: 1.20.0 > Reporter: Burak Ozakinci > Assignee: Zakelly Lan > Priority: Critical > Fix For: 2.0.0, 1.20.1 > > Attachments: app_code.txt > > > 1.20 app with file merging with across checkpoints option enabled. > {*}execution.checkpointing.file-merging.enabled{*}: true > {*}execution.checkpointing.file-merging.across-checkpoint-boundary{*}: true > App uses Zookeeper for HA and S3 for HDFS with RocksDB state backend. > Summary of events: > * App started to fail and restarted due to Zookeeper connection failure > * It tried to use the previous directory with the log below while restoring > * > {code:java} > Reusing previous directory > s3:.../taskowned/job_6676b9fb581c449ea78d95efdd338ee1_tm_10.99.195.1-6122-34e093 > for checkpoint file-merging. {code} > > * FileMergingSnapshotManager could not find the checkpoint file under > `taskowned` directory and the app started to failover. > > > {code:java} > java.io.FileNotFoundException: No such file or directory: > s3:.../taskowned/job_6676b9fb581c449ea78d95efdd338ee1_tm_10.99.196.233-6122-acb0af/5ce4c69f-f02a-4f91-a656-9abdfa9d47fd > at > org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.lambda$restoreStateHandles$11(FileMergingSnapshotManagerBase.java:880) > at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1134) > at > org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.lambda$restoreStateHandles$12(FileMergingSnapshotManagerBase.java:861) > at > java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) > at > java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at > java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) > at > java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) > at > java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) > at > java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) > at > java.base/java.util.stream.Streams$StreamBuilderImpl.forEachRemaining(Streams.java:411) > at > java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658) > at > java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274) > at > java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) > at > java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) > at > java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) > at > java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274) > at java.base/java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:1033) > at > java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) > at > java.base/java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:734) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) > at > java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) > at > java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) > at > java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274) > at > java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) > at > java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) > at > java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) > at > org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.restoreStateHandles(FileMergingSnapshotManagerBase.java:858) > at > org.apache.flink.runtime.checkpoint.filemerging.SubtaskFileMergingManagerRestoreOperation.restore(SubtaskFileMergingManagerRestoreOperation.java:102) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.registerRestoredStateToFileMergingManager(StreamTaskStateInitializerImpl.java:353) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:275) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:972) > at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:941) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:765) at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:577) at > java.base/java.lang.Thread.run(Thread.java:829)Caused by: > java.io.FileNotFoundException: No such file or directory: > s3://0825968657d315d337418c6a010371c3320d5792/6676b9fb581c449ea78d95efdd338ee1-021367244083-1724062514899/checkpoints/6676b9fb581c449ea78d95efdd338ee1/taskowned/job_6676b9fb581c449ea78d95efdd338ee1_tm_10.99.196.233-6122-acb0af/5ce4c69f-f02a-4f91-a656-9abdfa9d47fd > at > org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3866) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getFileStatus$24(S3AFileSystem.java:3556) > at > org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499) > at > org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:3554) > at > org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85) > at > org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.getFileStatus(PluginFileSystemFactory.java:105) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.getFileStatus(SafetyNetWrapperFileSystem.java:65) > at > org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.getFileSize(FileMergingSnapshotManagerBase.java:910) > at > org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.lambda$restoreStateHandles$11(FileMergingSnapshotManagerBase.java:878) > {code} > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)