curcur commented on a change in pull request #14943: URL: https://github.com/apache/flink/pull/14943#discussion_r584605952
########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateConfigUtil.java ########## @@ -46,19 +46,24 @@ public static StateTtlConfig createTtlConfig(long retentionTime) { } } - public static boolean isStateImmutableInStateBackend(KeyedStateBackend<?> stateBackend) { + public static boolean isStateImmutableInStateBackend(KeyedStateBackend<?> keyedStateBackend) { // TODO: remove the hard code check once FLINK-21027 is supported // state key and value is immutable only when using rocksdb state backend and timer + KeyedStateBackend<?> rootKeyedStateBackend = + keyedStateBackend instanceof DelegateKeyedStateBackend + ? ((DelegateKeyedStateBackend<?>) keyedStateBackend) + .getDelegatedKeyedStateBackend() + : keyedStateBackend; + boolean isRocksDbState = - ROCKSDB_KEYED_STATE_BACKEND.equals(stateBackend.getClass().getCanonicalName()); - boolean isHeapTimer = false; - if (stateBackend instanceof AbstractKeyedStateBackend) { - // currently, requiresLegacySynchronousTimerSnapshots() - // indicates the underlying uses heap-bsased timer - isHeapTimer = - ((AbstractKeyedStateBackend<?>) stateBackend) - .requiresLegacySynchronousTimerSnapshots(CheckpointType.CHECKPOINT); - } + ROCKSDB_KEYED_STATE_BACKEND.equals( + rootKeyedStateBackend.getClass().getCanonicalName()); Review comment: That logically makes more sense now ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org