pnowojski commented on a change in pull request #13761:
URL: https://github.com/apache/flink/pull/13761#discussion_r511986549



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
##########
@@ -206,13 +207,36 @@ public final void 
initializeState(StreamTaskStateInitializer streamTaskStateMana
                                        ManagedMemoryUseCase.STATE_BACKEND,
                                        
runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),
                                        
runtimeContext.getUserCodeClassLoader()),
-                               false);
+                               isUsingCustomRawKeyedState());
 
                stateHandler = new StreamOperatorStateHandler(context, 
getExecutionConfig(), cancelables);
                timeServiceManager = context.internalTimerServiceManager();
                stateHandler.initializeOperatorState(this);
        }
 
+       /**
+        * Indicates whether or not implementations of this class is writing to 
the raw keyed state streams
+        * on snapshots, using {@link #snapshotState(StateSnapshotContext)}. If 
yes, subclasses should
+        * override this method to return {@code true}.
+        *
+        * <p>Subclasses need to explicitly indicate the use of raw keyed state 
because, internally,
+        * the {@link AbstractStreamOperator} may attempt to read from it as 
well to restore heap-based timers and

Review comment:
       `AbstractStreamOperatorV2`
   
   and ditto two lines below.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
##########
@@ -206,13 +207,36 @@ public final void 
initializeState(StreamTaskStateInitializer streamTaskStateMana
                                        ManagedMemoryUseCase.STATE_BACKEND,
                                        
runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),
                                        
runtimeContext.getUserCodeClassLoader()),
-                               false);
+                               isUsingCustomRawKeyedState());
 
                stateHandler = new StreamOperatorStateHandler(context, 
getExecutionConfig(), cancelables);
                timeServiceManager = context.internalTimerServiceManager();
                stateHandler.initializeOperatorState(this);
        }
 
+       /**
+        * Indicates whether or not implementations of this class is writing to 
the raw keyed state streams
+        * on snapshots, using {@link #snapshotState(StateSnapshotContext)}. If 
yes, subclasses should
+        * override this method to return {@code true}.
+        *
+        * <p>Subclasses need to explicitly indicate the use of raw keyed state 
because, internally,
+        * the {@link AbstractStreamOperator} may attempt to read from it as 
well to restore heap-based timers and

Review comment:
       nit: `AbstractStreamOperatorV2`
   
   and ditto two lines below.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to