curcur commented on a change in pull request #14943:
URL: https://github.com/apache/flink/pull/14943#discussion_r584023097



##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateConfigUtil.java
##########
@@ -46,19 +46,24 @@ public static StateTtlConfig createTtlConfig(long 
retentionTime) {
         }
     }
 
-    public static boolean isStateImmutableInStateBackend(KeyedStateBackend<?> 
stateBackend) {
+    public static boolean isStateImmutableInStateBackend(KeyedStateBackend<?> 
keyedStateBackend) {
         // TODO: remove the hard code check once FLINK-21027 is supported
         // state key and value is immutable only when using rocksdb state 
backend and timer
+        KeyedStateBackend<?> rootKeyedStateBackend =
+                keyedStateBackend instanceof DelegateKeyedStateBackend
+                        ? ((DelegateKeyedStateBackend<?>) keyedStateBackend)
+                                .getDelegatedKeyedStateBackend()
+                        : keyedStateBackend;
+
         boolean isRocksDbState =
-                
ROCKSDB_KEYED_STATE_BACKEND.equals(stateBackend.getClass().getCanonicalName());
-        boolean isHeapTimer = false;
-        if (stateBackend instanceof AbstractKeyedStateBackend) {
-            // currently, requiresLegacySynchronousTimerSnapshots()
-            // indicates the underlying uses heap-bsased timer
-            isHeapTimer =
-                    ((AbstractKeyedStateBackend<?>) stateBackend)
-                            
.requiresLegacySynchronousTimerSnapshots(CheckpointType.CHECKPOINT);
-        }
+                ROCKSDB_KEYED_STATE_BACKEND.equals(
+                        rootKeyedStateBackend.getClass().getCanonicalName());

Review comment:
       The returned logic of isStateImmutableInStateBackend() is not the same 
as what requiresLegacySynchronousTimerSnapshots() returns, please check my 
highlights of the last comment.
   
   requiresLegacySynchronousTimerSnapshots() == false only indicates it does 
not uses a heap timer; it does not indicate it is a rocksdb statebackend.
   
   the required logic needs requiresLegacySynchronousTimerSnapshots() == false 
&& uses rocks db




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to