curcur commented on a change in pull request #14943: URL: https://github.com/apache/flink/pull/14943#discussion_r583669077
########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateConfigUtil.java ########## @@ -46,19 +46,24 @@ public static StateTtlConfig createTtlConfig(long retentionTime) { } } - public static boolean isStateImmutableInStateBackend(KeyedStateBackend<?> stateBackend) { + public static boolean isStateImmutableInStateBackend(KeyedStateBackend<?> keyedStateBackend) { // TODO: remove the hard code check once FLINK-21027 is supported // state key and value is immutable only when using rocksdb state backend and timer + KeyedStateBackend<?> rootKeyedStateBackend = + keyedStateBackend instanceof DelegateKeyedStateBackend + ? ((DelegateKeyedStateBackend<?>) keyedStateBackend) + .getDelegatedKeyedStateBackend() + : keyedStateBackend; + boolean isRocksDbState = - ROCKSDB_KEYED_STATE_BACKEND.equals(stateBackend.getClass().getCanonicalName()); - boolean isHeapTimer = false; - if (stateBackend instanceof AbstractKeyedStateBackend) { - // currently, requiresLegacySynchronousTimerSnapshots() - // indicates the underlying uses heap-bsased timer - isHeapTimer = - ((AbstractKeyedStateBackend<?>) stateBackend) - .requiresLegacySynchronousTimerSnapshots(CheckpointType.CHECKPOINT); - } + ROCKSDB_KEYED_STATE_BACKEND.equals( + rootKeyedStateBackend.getClass().getCanonicalName()); Review comment: keyedStateBackend.requiresLegacySynchronousTimerSnapshots(CheckpointType.CHECKPOINT) == true means it `use rocksdb` && `uses Heap Timer` What needs to return is `use rocksdb` && `NOT use Heap Timer` They are not the same, and you can not simply return `keyedStateBackend.requiresLegacySynchronousTimerSnapshots(CheckpointType.CHECKPOINT);` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org