Feifan Wang created FLINK-30561:
-----------------------------------

             Summary: ChangelogStreamHandleReaderWithCache cause 
FileNotFoundException
                 Key: FLINK-30561
                 URL: https://issues.apache.org/jira/browse/FLINK-30561
             Project: Flink
          Issue Type: Bug
          Components: Runtime / State Backends
    Affects Versions: 1.16.0
            Reporter: Feifan Wang


When a job with state changelog enabled continues to restart, the following 
exceptions may occur :
{code:java}
java.lang.RuntimeException: java.io.FileNotFoundException: 
/data1/hadoop/yarn/nm-local-dir/usercache/hadoop-rt/appcache/application_1671689962742_1333392/dstl-cache-file/dstl6215344559415829831.tmp
 (No such file or directory)
    at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
    at 
org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:87)
    at 
org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.hasNext(StateChangelogHandleStreamHandleReader.java:69)
    at 
org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.readBackendHandle(ChangelogBackendRestoreOperation.java:107)
    at 
org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:78)
    at 
org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:94)
    at 
org.apache.flink.state.changelog.AbstractChangelogStateBackend.createKeyedStateBackend(AbstractChangelogStateBackend.java:136)
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336)
    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353)
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165)
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:265)
    at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: 
/data1/hadoop/yarn/nm-local-dir/usercache/hadoop-rt/appcache/application_1671689962742_1333392/dstl-cache-file/dstl6215344559415829831.tmp
 (No such file or directory)
    at java.io.FileInputStream.open0(Native Method)
    at java.io.FileInputStream.open(FileInputStream.java:195)
    at java.io.FileInputStream.<init>(FileInputStream.java:138)
    at 
org.apache.flink.changelog.fs.ChangelogStreamHandleReaderWithCache.openAndSeek(ChangelogStreamHandleReaderWithCache.java:158)
    at 
org.apache.flink.changelog.fs.ChangelogStreamHandleReaderWithCache.openAndSeek(ChangelogStreamHandleReaderWithCache.java:95)
    at 
org.apache.flink.changelog.fs.StateChangeIteratorImpl.read(StateChangeIteratorImpl.java:42)
    at 
org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:85)
    ... 21 more {code}
*Problem causes:*
 # *_ChangelogStreamHandleReaderWithCache_* use RefCountedFile manager local 
cache file. The reference count is incremented when the input stream is opened 
from the cache file, and decremented by one when the input stream is closed. So 
the input stream must be closed and only once.
 # _*StateChangelogHandleStreamHandleReader#getChanges()*_ may cause the input 
stream to be closed twice. This happens when changeIterator.read(tuple2.f0, 
tuple2.f1) throws an exception (for example, when the task is canceled for 
other reasons during the restore process) the current state change iterator 
will be closed twice.

{code:java}
private void advance() {
    while (!current.hasNext() && handleIterator.hasNext()) {
        try {
            current.close();
            Tuple2<StreamStateHandle, Long> tuple2 = handleIterator.next();
            LOG.debug("read at {} from {}", tuple2.f1, tuple2.f0);
            current = changeIterator.read(tuple2.f0, tuple2.f1);
        } catch (Exception e) {
            ExceptionUtils.rethrow(e);
        }
    }
}

@Override
public void close() throws Exception {
    current.close();
}{code}

So show make sure current state change iterator only be closed once. I suggest 
to make the following changes to _StateChangelogHandleStreamHandleReader **_ :
{code:java}
private boolean currentClosed = false;

private void advance() {
    while (!current.hasNext() && handleIterator.hasNext()) {
        try {
            current.close();
            currentClosed = true;

            Tuple2<StreamStateHandle, Long> tuple2 = handleIterator.next();
            LOG.debug("read at {} from {}", tuple2.f1, tuple2.f0);
            current = changeIterator.read(tuple2.f0, tuple2.f1);
            currentClosed = false;
        } catch (Exception e) {
            ExceptionUtils.rethrow(e);
        }
    }
}

@Override
public void close() throws Exception {
    if (!currentClosed) {
        current.close();
    }
}{code}
 

cc [~yuanmei] , [~roman] .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to