[ https://issues.apache.org/jira/browse/FLINK-25478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yun Tang updated FLINK-25478: ----------------------------- Description: Currently, changelog materialization would call RocksDB state backend's snapshot method to generate {{IncrementalRemoteKeyedStateHandle}} as ChangelogStateBackendHandleImpl's materialized artifacts. And before next materialization, it will always report the same {{IncrementalRemoteKeyedStateHandle}} as before. It's fine to register this for the 1st time. However, for the 2nd time to register {{IncrementalRemoteKeyedStateHandle}} (via {{{}ChangelogStateBackendHandleImpl#registerSharedStates{}}}), it will discard the private state artifacts without check the register reference: IncrementalRemoteKeyedStateHandle: {code:java} public void discardState() throws Exception { try { StateUtil.bestEffortDiscardAllStateObjects(privateState.values()); } catch (Exception e) { LOG.warn("Could not properly discard misc file states.", e); } } {code} Thus, this would delete the private state (such as RocksDB's MAINFEST), and once restore, job would not report FileNotFoundException. Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception. at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:403) at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:477) at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:90) at org.apache.flink.state.changelog.ChangelogStateBackend.lambda$createKeyedStateBackend$1(ChangelogStateBackend.java:153) at org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:68) at org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:221) at org.apache.flink.state.changelog.ChangelogStateBackend.createKeyedStateBackend(ChangelogStateBackend.java:145) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ... 10 more Caused by: java.io.FileNotFoundException: xxxxx at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127) at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68) at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:127) at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110) at org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49) at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) ~[?:1.8.0_102] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147) ~[?:1.8.0_102] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622) ~[?:1.8.0_102] ... 1 more was: Currently, changelog materialization would call RocksDB state backend's snapshot method to generate {{IncrementalRemoteKeyedStateHandle}} as ChangelogStateBackendHandleImpl's materialized artifacts. And before next materialization, it will always report the same {{IncrementalRemoteKeyedStateHandle}} as before. It's fine to register this for the 1st time. However, for the 2nd time to register {{IncrementalRemoteKeyedStateHandle}} (via {{ChangelogStateBackendHandleImpl#registerSharedStates}}), it will discard the private state artifacts without check the register reference: IncrementalRemoteKeyedStateHandle: {code:java} public void discardState() throws Exception { try { StateUtil.bestEffortDiscardAllStateObjects(privateState.values()); } catch (Exception e) { LOG.warn("Could not properly discard misc file states.", e); } } {code} Thus, this would delete the private state (such as RocksDB's MAINFEST), and once restore, job would not report FileNotFoundException. > Changelog materialization with incremental checkpoint could cause > checkpointed data lost > ---------------------------------------------------------------------------------------- > > Key: FLINK-25478 > URL: https://issues.apache.org/jira/browse/FLINK-25478 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / State Backends > Reporter: Yun Tang > Priority: Critical > Fix For: 1.15.0 > > > Currently, changelog materialization would call RocksDB state backend's > snapshot method to generate {{IncrementalRemoteKeyedStateHandle}} as > ChangelogStateBackendHandleImpl's materialized artifacts. And before next > materialization, it will always report the same > {{IncrementalRemoteKeyedStateHandle}} as before. > It's fine to register this for the 1st time. However, for the 2nd time to > register {{IncrementalRemoteKeyedStateHandle}} (via > {{{}ChangelogStateBackendHandleImpl#registerSharedStates{}}}), it will > discard the private state artifacts without check the register reference: > IncrementalRemoteKeyedStateHandle: > {code:java} > public void discardState() throws Exception { > try { > StateUtil.bestEffortDiscardAllStateObjects(privateState.values()); > } catch (Exception e) { > LOG.warn("Could not properly discard misc file states.", e); > } > } > {code} > Thus, this would delete the private state (such as RocksDB's MAINFEST), and > once restore, job would not report FileNotFoundException. > > Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught > unexpected exception. > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:403) > at > org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:477) > > at > org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:90) > > at > org.apache.flink.state.changelog.ChangelogStateBackend.lambda$createKeyedStateBackend$1(ChangelogStateBackend.java:153) > at > org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:68) > at > org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:221) > > at > org.apache.flink.state.changelog.ChangelogStateBackend.createKeyedStateBackend(ChangelogStateBackend.java:145) > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) > > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) > > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) > > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) > > ... 10 more > Caused by: java.io.FileNotFoundException: xxxxx > at > org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127) > > at > org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68) > > at > org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:127) > > at > org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110) > at > org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49) > > at > java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) > ~[?:1.8.0_102] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147) > ~[?:1.8.0_102] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622) > ~[?:1.8.0_102] > ... 1 more -- This message was sent by Atlassian Jira (v8.20.1#820001)