[ https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17725470#comment-17725470 ]
Feifan Wang edited comment on FLINK-29913 at 5/23/23 3:45 PM: -------------------------------------------------------------- Thanks for the clarification [~roman] ! {quote}Further, regarding the approach of using unique registry key, I agree with Congxian Qiu , we can just choose a stable register key generation method based on remote file name (such as use md5 digest of remote file name) , which can replace of IncrementalRemoteKeyedStateHandle#createSharedStateRegistryKeyFromFileName() . The mapping of local sst file name to StreamStateHandle never changed , so the part of RocksDB recovery does not need to be changed. {quote} I mean we still use local file name as key of sharedState map in _*IncrementalRemoteKeyedStateHandle*_ and use remote file path when generating SharedStateRegisterKey. Changes in _*IncrementalRemoteKeyedStateHandle*_ like this : {code:java} ... private final Map<StateHandleID, StreamStateHandle> sharedState; // still use local file name as key of this map, corresponding to the “never change” I mentioned above ... public void registerSharedStates(SharedStateRegistry stateRegistry, long checkpointID) { ... for (Map.Entry<StateHandleID, StreamStateHandle> sharedStateHandle : sharedState.entrySet()) { SharedStateRegistryKey registryKey = generateRegisterKey(sharedStateHandle.getValue); // changed line StreamStateHandle reference = stateRegistry.registerReference( registryKey, sharedStateHandle.getValue(), checkpointID); sharedStateHandle.setValue(reference); } } private static SharedStateRegistryKey generateRegisterKey(StreamStateHandle stateHandle) { String keyString = null; if (stateHandle instanceof FileStateHandle) { keyString = ((FileStateHandle) stateHandle).getFilePath().toString(); } else if (stateHandle instanceof ByteStreamStateHandle) { keyString = ((ByteStreamStateHandle) stateHandle).getHandleName(); } else { keyString = Integer.toString(System.identityHashCode(stateHandle)); } return new SharedStateRegistryKey(md5sum(keyString)); // may be other digest algorithm } {code} And we can only use normal handles (not PlaceholderStreamStateHandle) in IncrementalRemoteKeyedStateHandle to make sure IncrementalRemoteKeyedStateHandle#generateRegisterKey() method never get a PlaceholderStreamStateHandle. was (Author: feifan wang): Thanks for the clarification [~roman] ! {quote}Further, regarding the approach of using unique registry key, I agree with Congxian Qiu , we can just choose a stable register key generation method based on remote file name (such as use md5 digest of remote file name) , which can replace of IncrementalRemoteKeyedStateHandle#createSharedStateRegistryKeyFromFileName() . The mapping of local sst file name to StreamStateHandle never changed , so the part of RocksDB recovery does not need to be changed. {quote} I mean we still use local file name as key of sharedState map in _*IncrementalRemoteKeyedStateHandle*_ and use remote file path when generating SharedStateRegisterKey. Changes in _*IncrementalRemoteKeyedStateHandle*_ like this : {code:java} ... private final Map<StateHandleID, StreamStateHandle> sharedState; // still use local file name as key of this map, corresponding to the “never change” I mentioned above ... public void registerSharedStates(SharedStateRegistry stateRegistry, long checkpointID) { ... for (Map.Entry<StateHandleID, StreamStateHandle> sharedStateHandle : sharedState.entrySet()) { SharedStateRegistryKey registryKey = generateRegisterKey(sharedStateHandle.getValue); // changed line StreamStateHandle reference = stateRegistry.registerReference( registryKey, sharedStateHandle.getValue(), checkpointID); sharedStateHandle.setValue(reference); } } private static SharedStateRegistryKey generateRegisterKey(StreamStateHandle stateHandle) { String keyString = null; if (stateHandle instanceof FileStateHandle) { keyString = ((FileStateHandle) stateHandle).getFilePath().toString(); } else if (stateHandle instanceof ByteStreamStateHandle) { keyString = ((ByteStreamStateHandle) stateHandle).getHandleName(); } else { keyString = Integer.toString(System.identityHashCode(stateHandle)); } return new SharedStateRegistryKey(md5sum(keyString)); // may be other digest algorithm } {code} > Shared state would be discarded by mistake when maxConcurrentCheckpoint>1 > ------------------------------------------------------------------------- > > Key: FLINK-29913 > URL: https://issues.apache.org/jira/browse/FLINK-29913 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing > Affects Versions: 1.15.0, 1.16.0, 1.17.0 > Reporter: Yanfei Lei > Assignee: Feifan Wang > Priority: Major > Fix For: 1.16.2, 1.17.2 > > > When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state > backend would be discarded by registering the same name handle. See > [https://github.com/apache/flink/pull/21050#discussion_r1011061072] > cc [~roman] -- This message was sent by Atlassian Jira (v8.20.10#820010)