[ https://issues.apache.org/jira/browse/FLINK-30561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17680850#comment-17680850 ]
Feifan Wang commented on FLINK-30561: ------------------------------------- Hi [~roman] , I submitted a pr to fix this problem, can you help me review it ? > ChangelogStreamHandleReaderWithCache cause FileNotFoundException > ---------------------------------------------------------------- > > Key: FLINK-30561 > URL: https://issues.apache.org/jira/browse/FLINK-30561 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends > Affects Versions: 1.16.0 > Reporter: Feifan Wang > Priority: Major > Labels: pull-request-available > > When a job with state changelog enabled continues to restart, the following > exceptions may occur : > {code:java} > java.lang.RuntimeException: java.io.FileNotFoundException: > /data1/hadoop/yarn/nm-local-dir/usercache/hadoop-rt/appcache/application_1671689962742_1333392/dstl-cache-file/dstl6215344559415829831.tmp > (No such file or directory) > at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) > at > org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:87) > at > org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.hasNext(StateChangelogHandleStreamHandleReader.java:69) > at > org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.readBackendHandle(ChangelogBackendRestoreOperation.java:107) > at > org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:78) > at > org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:94) > at > org.apache.flink.state.changelog.AbstractChangelogStateBackend.createKeyedStateBackend(AbstractChangelogStateBackend.java:136) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:265) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.FileNotFoundException: > /data1/hadoop/yarn/nm-local-dir/usercache/hadoop-rt/appcache/application_1671689962742_1333392/dstl-cache-file/dstl6215344559415829831.tmp > (No such file or directory) > at java.io.FileInputStream.open0(Native Method) > at java.io.FileInputStream.open(FileInputStream.java:195) > at java.io.FileInputStream.<init>(FileInputStream.java:138) > at > org.apache.flink.changelog.fs.ChangelogStreamHandleReaderWithCache.openAndSeek(ChangelogStreamHandleReaderWithCache.java:158) > at > org.apache.flink.changelog.fs.ChangelogStreamHandleReaderWithCache.openAndSeek(ChangelogStreamHandleReaderWithCache.java:95) > at > org.apache.flink.changelog.fs.StateChangeIteratorImpl.read(StateChangeIteratorImpl.java:42) > at > org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:85) > ... 21 more {code} > *Problem causes:* > # *_ChangelogStreamHandleReaderWithCache_* use RefCountedFile manager local > cache file. The reference count is incremented when the input stream is > opened from the cache file, and decremented by one when the input stream is > closed. So the input stream must be closed and only once. > # _*StateChangelogHandleStreamHandleReader#getChanges()*_ may cause the > input stream to be closed twice. This happens when > changeIterator.read(tuple2.f0, tuple2.f1) throws an exception (for example, > when the task is canceled for other reasons during the restore process) the > current state change iterator will be closed twice. > {code:java} > private void advance() { > while (!current.hasNext() && handleIterator.hasNext()) { > try { > current.close(); > Tuple2<StreamStateHandle, Long> tuple2 = handleIterator.next(); > LOG.debug("read at {} from {}", tuple2.f1, tuple2.f0); > current = changeIterator.read(tuple2.f0, tuple2.f1); > } catch (Exception e) { > ExceptionUtils.rethrow(e); > } > } > } > @Override > public void close() throws Exception { > current.close(); > }{code} > So we should make sure current state change iterator only be closed once. I > suggest to make the following changes to > _*StateChangelogHandleStreamHandleReader*_ : > {code:java} > private boolean currentClosed = false; > private void advance() { > while (!current.hasNext() && handleIterator.hasNext()) { > try { > current.close(); > currentClosed = true; > Tuple2<StreamStateHandle, Long> tuple2 = handleIterator.next(); > LOG.debug("read at {} from {}", tuple2.f1, tuple2.f0); > current = changeIterator.read(tuple2.f0, tuple2.f1); > currentClosed = false; > } catch (Exception e) { > ExceptionUtils.rethrow(e); > } > } > } > @Override > public void close() throws Exception { > if (!currentClosed) { > current.close(); > } > }{code} > > cc [~yuanmei] , [~roman] . -- This message was sent by Atlassian Jira (v8.20.10#820010)