[ 
https://issues.apache.org/jira/browse/FLINK-14942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17176294#comment-17176294
 ] 

Jun Qin edited comment on FLINK-14942 at 8/12/20, 1:10 PM:
-----------------------------------------------------------

I have doubled checked, when creating a new savepoint from an existing 
savepoint, if the state for an operator in the existing savepoint is not 
touched (i.e., removed/modified), the SavepointMetadata of the new savepoint 
will contain the same path as the original metadata which is a relative path in 
Flink 1.11. When you try to restore a job from the new savepoint and load the 
state of those untouched operators, it will fail with:
{code:java}
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for KeyedProcessOperator_33e978864e75b8b6137396c7b1f7711d_(1/1) from 
any of the 1 provided restore options.Caused by: 
org.apache.flink.util.FlinkException: Could not restore keyed state backend for 
KeyedProcessOperator_33e978864e75b8b6137396c7b1f7711d_(1/1) from any of the 1 
provided restore options. at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317)
 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144)
 ... 9 moreCaused by: org.apache.flink.runtime.state.BackendBuildingException: 
Failed when trying to restore heap backend at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
 at 
org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347)
 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301)
 at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
 at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
 ... 11 moreCaused by: java.io.FileNotFoundException: 
/path/to/new/savepoint/a623a314-9dd1-4d60-bc8f-d56816f55f03 (No such file or 
directory) at java.base/java.io.FileInputStream.open0(Native Method) at 
java.base/java.io.FileInputStream.open(FileInputStream.java:219) at 
java.base/java.io.FileInputStream.<init>(FileInputStream.java:157) at 
org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
 at 
org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142) 
at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
 at 
org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
 at 
org.apache.flink.runtime.state.KeyGroupsStateHandle.openInputStream(KeyGroupsStateHandle.java:118)
 at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:124)
 at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
 ... 15 more{code}
For those untouched operators, a potential solution, I think, is to use the 
read/write API of state processing API library to re-create the state of those 
operators in the new savepoint. [~sjwiesman], What do you think? 


was (Author: qinjunjerry):
I have doubled checked, when you creating a new savepoint from an existing 
savepoint, if the state for an operator in the existing savepoint is not 
touched (i.e., removed/modified), the SavepointMetadata of the new savepoint 
will contain the same path as the original metadata which is a relative path in 
Flink 1.11. When you try to restore a job from the new savepoint and load the 
state of those untouched operators, it will fail with:
{code:java}
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for KeyedProcessOperator_33e978864e75b8b6137396c7b1f7711d_(1/1) from 
any of the 1 provided restore options.Caused by: 
org.apache.flink.util.FlinkException: Could not restore keyed state backend for 
KeyedProcessOperator_33e978864e75b8b6137396c7b1f7711d_(1/1) from any of the 1 
provided restore options. at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317)
 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144)
 ... 9 moreCaused by: org.apache.flink.runtime.state.BackendBuildingException: 
Failed when trying to restore heap backend at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
 at 
org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347)
 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301)
 at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
 at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
 ... 11 moreCaused by: java.io.FileNotFoundException: 
/path/to/new/savepoint/a623a314-9dd1-4d60-bc8f-d56816f55f03 (No such file or 
directory) at java.base/java.io.FileInputStream.open0(Native Method) at 
java.base/java.io.FileInputStream.open(FileInputStream.java:219) at 
java.base/java.io.FileInputStream.<init>(FileInputStream.java:157) at 
org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
 at 
org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142) 
at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
 at 
org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
 at 
org.apache.flink.runtime.state.KeyGroupsStateHandle.openInputStream(KeyGroupsStateHandle.java:118)
 at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:124)
 at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
 ... 15 more{code}
For those untouched operators, a potential solution, I think, is to use the 
read/write API of state processing API library to re-create the state of those 
operators in the new savepoint. [~sjwiesman], What do you think? 

> State Processing API: add an option to make deep copy
> -----------------------------------------------------
>
>                 Key: FLINK-14942
>                 URL: https://issues.apache.org/jira/browse/FLINK-14942
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / State Processor
>            Reporter: Jun Qin
>            Assignee: Jun Qin
>            Priority: Major
>              Labels: usability
>             Fix For: 1.12.0
>
>
> Current when a new savepoint is created based on a source savepoint, then 
> there are references in the new savepoint to the source savepoint. Here is 
> the [State Processing API 
> doc|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html]
>  says: 
> bq. Note: When basing a new savepoint on existing state, the state processor 
> api makes a shallow copy of the pointers to the existing operators. This 
> means that both savepoints share state and one cannot be deleted without 
> corrupting the other!
> This JIRA is to request an option to have a deep copy (instead of shallow 
> copy) such that the new savepoint is self-contained. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to