Feifan Wang created FLINK-30561: ----------------------------------- Summary: ChangelogStreamHandleReaderWithCache cause FileNotFoundException Key: FLINK-30561 URL: https://issues.apache.org/jira/browse/FLINK-30561 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.16.0 Reporter: Feifan Wang
When a job with state changelog enabled continues to restart, the following exceptions may occur : {code:java} java.lang.RuntimeException: java.io.FileNotFoundException: /data1/hadoop/yarn/nm-local-dir/usercache/hadoop-rt/appcache/application_1671689962742_1333392/dstl-cache-file/dstl6215344559415829831.tmp (No such file or directory) at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) at org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:87) at org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.hasNext(StateChangelogHandleStreamHandleReader.java:69) at org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.readBackendHandle(ChangelogBackendRestoreOperation.java:107) at org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:78) at org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:94) at org.apache.flink.state.changelog.AbstractChangelogStateBackend.createKeyedStateBackend(AbstractChangelogStateBackend.java:136) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:265) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.FileNotFoundException: /data1/hadoop/yarn/nm-local-dir/usercache/hadoop-rt/appcache/application_1671689962742_1333392/dstl-cache-file/dstl6215344559415829831.tmp (No such file or directory) at java.io.FileInputStream.open0(Native Method) at java.io.FileInputStream.open(FileInputStream.java:195) at java.io.FileInputStream.<init>(FileInputStream.java:138) at org.apache.flink.changelog.fs.ChangelogStreamHandleReaderWithCache.openAndSeek(ChangelogStreamHandleReaderWithCache.java:158) at org.apache.flink.changelog.fs.ChangelogStreamHandleReaderWithCache.openAndSeek(ChangelogStreamHandleReaderWithCache.java:95) at org.apache.flink.changelog.fs.StateChangeIteratorImpl.read(StateChangeIteratorImpl.java:42) at org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:85) ... 21 more {code} *Problem causes:* # *_ChangelogStreamHandleReaderWithCache_* use RefCountedFile manager local cache file. The reference count is incremented when the input stream is opened from the cache file, and decremented by one when the input stream is closed. So the input stream must be closed and only once. # _*StateChangelogHandleStreamHandleReader#getChanges()*_ may cause the input stream to be closed twice. This happens when changeIterator.read(tuple2.f0, tuple2.f1) throws an exception (for example, when the task is canceled for other reasons during the restore process) the current state change iterator will be closed twice. {code:java} private void advance() { while (!current.hasNext() && handleIterator.hasNext()) { try { current.close(); Tuple2<StreamStateHandle, Long> tuple2 = handleIterator.next(); LOG.debug("read at {} from {}", tuple2.f1, tuple2.f0); current = changeIterator.read(tuple2.f0, tuple2.f1); } catch (Exception e) { ExceptionUtils.rethrow(e); } } } @Override public void close() throws Exception { current.close(); }{code} So show make sure current state change iterator only be closed once. I suggest to make the following changes to _StateChangelogHandleStreamHandleReader **_ : {code:java} private boolean currentClosed = false; private void advance() { while (!current.hasNext() && handleIterator.hasNext()) { try { current.close(); currentClosed = true; Tuple2<StreamStateHandle, Long> tuple2 = handleIterator.next(); LOG.debug("read at {} from {}", tuple2.f1, tuple2.f0); current = changeIterator.read(tuple2.f0, tuple2.f1); currentClosed = false; } catch (Exception e) { ExceptionUtils.rethrow(e); } } } @Override public void close() throws Exception { if (!currentClosed) { current.close(); } }{code} cc [~yuanmei] , [~roman] . -- This message was sent by Atlassian Jira (v8.20.10#820010)