[ 
https://issues.apache.org/jira/browse/FLINK-36125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17875516#comment-17875516
 ] 

Burak Ozakinci commented on FLINK-36125:
----------------------------------------

Hello [~zakelly], thanks for rapid response.

There is not many interesting logs before the issue actually. The sequence is 
like:

 
{code:java}
Triggering checkpoint 4694 (type=CheckpointType{name='Checkpoint', 
sharingFilesStrategy=FORWARD_BACKWARD}) @ 1724110749502 for job 
6676b9fb581c449ea78d95efdd338ee1. {code}
{code:java}
Committing ../chk-4694/_metadata with MPU ID {code}
{code:java}
Reported span: 
SimpleSpan{scope=org.apache.flink.runtime.checkpoint.CheckpointStatsTracker, 
name=Checkpoint, startTsMillis=1724110749502, endTsMillis=1724110753198, 
attributes={jobId=6676b9fb581c449ea78d95efdd338ee1, checkpointId=4694, 
checkpointStatus=COMPLETED, fullSize=364882338, checkpointedSize=151129380}}
{code}
 
{code:java}
Completed checkpoint 4694 for job 6676b9fb581c449ea78d95efdd338ee1 (364882338 
bytes, checkpointDuration=3931 ms, finalizationTime=286 ms). {code}
And then ZK connection suspends with

 
{code:java}
Session
 0x1008e9071d30003 for server zk-cs/123.12.123.12:1212, Closing socket 
connection. Attempting reconnect except it is a SessionExpiredException.{code}
It is likely that this file is deleted at some point.

The file does not exist in the directory.

 

> File not found exception on restoring state handles with file merging
> ---------------------------------------------------------------------
>
>                 Key: FLINK-36125
>                 URL: https://issues.apache.org/jira/browse/FLINK-36125
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.20.0
>            Reporter: Burak Ozakinci
>            Priority: Critical
>             Fix For: 2.0.0, 1.20.1
>
>         Attachments: app_code.txt
>
>
> 1.20 app with file merging with across checkpoints option enabled.
> {*}execution.checkpointing.file-merging.enabled{*}: true
> {*}execution.checkpointing.file-merging.across-checkpoint-boundary{*}: true
> App uses Zookeeper for HA and S3 for HDFS with RocksDB state backend.
> Summary of events:
>  * App started to fail and restarted due to Zookeeper connection failure
>  * It tried to use the previous directory with the log below while restoring
>  * 
> {code:java}
> Reusing previous directory 
> s3:.../taskowned/job_6676b9fb581c449ea78d95efdd338ee1_tm_10.99.195.1-6122-34e093
>  for checkpoint file-merging. {code}
>  
>  * FileMergingSnapshotManager could not find the checkpoint file under 
> `taskowned` directory and the app started to failover.
>  
>  
> {code:java}
> java.io.FileNotFoundException: No such file or directory: 
> s3:.../taskowned/job_6676b9fb581c449ea78d95efdd338ee1_tm_10.99.196.233-6122-acb0af/5ce4c69f-f02a-4f91-a656-9abdfa9d47fd
>      at 
> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.lambda$restoreStateHandles$11(FileMergingSnapshotManagerBase.java:880)
>         at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1134)     
>   at 
> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.lambda$restoreStateHandles$12(FileMergingSnapshotManagerBase.java:861)
>         at 
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>     at 
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
>   at 
> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
>   at 
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>     at 
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>     at 
> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
>   at 
> java.base/java.util.stream.Streams$StreamBuilderImpl.forEachRemaining(Streams.java:411)
>       at 
> java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658)
>         at 
> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)
>   at 
> java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
>     at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
>       at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
>        at 
> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>       at 
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>         at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>       at 
> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
>      at 
> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)
>   at java.base/java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:1033)  
>     at 
> java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
>         at 
> java.base/java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:734)
>       at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
>       at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
>        at 
> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>       at 
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>         at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>       at 
> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
>      at 
> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)
>   at 
> java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
>     at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
>       at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
>        at 
> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>       at 
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>         at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>       at 
> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
>      at 
> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.restoreStateHandles(FileMergingSnapshotManagerBase.java:858)
>   at 
> org.apache.flink.runtime.checkpoint.filemerging.SubtaskFileMergingManagerRestoreOperation.restore(SubtaskFileMergingManagerRestoreOperation.java:102)
>         at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.registerRestoredStateToFileMergingManager(StreamTaskStateInitializerImpl.java:353)
>    at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:275)
>      at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>    at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
>      at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
>      at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:972)
>  at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:941) 
>    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:765)       at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:577) at 
> java.base/java.lang.Thread.run(Thread.java:829)Caused by: 
> java.io.FileNotFoundException: No such file or directory: 
> s3://0825968657d315d337418c6a010371c3320d5792/6676b9fb581c449ea78d95efdd338ee1-021367244083-1724062514899/checkpoints/6676b9fb581c449ea78d95efdd338ee1/taskowned/job_6676b9fb581c449ea78d95efdd338ee1_tm_10.99.196.233-6122-acb0af/5ce4c69f-f02a-4f91-a656-9abdfa9d47fd
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3866)
>       at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
>    at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getFileStatus$24(S3AFileSystem.java:3556)
>       at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
>      at 
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
>  at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
>  at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
>  at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:3554) 
>        at 
> org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85)
>  at 
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.getFileStatus(PluginFileSystemFactory.java:105)
>  at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.getFileStatus(SafetyNetWrapperFileSystem.java:65)
>         at 
> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.getFileSize(FileMergingSnapshotManagerBase.java:910)
>   at 
> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.lambda$restoreStateHandles$11(FileMergingSnapshotManagerBase.java:878)
>  {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to