pnowojski commented on a change in pull request #13761: URL: https://github.com/apache/flink/pull/13761#discussion_r511986549
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java ########## @@ -206,13 +207,36 @@ public final void initializeState(StreamTaskStateInitializer streamTaskStateMana ManagedMemoryUseCase.STATE_BACKEND, runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(), runtimeContext.getUserCodeClassLoader()), - false); + isUsingCustomRawKeyedState()); stateHandler = new StreamOperatorStateHandler(context, getExecutionConfig(), cancelables); timeServiceManager = context.internalTimerServiceManager(); stateHandler.initializeOperatorState(this); } + /** + * Indicates whether or not implementations of this class is writing to the raw keyed state streams + * on snapshots, using {@link #snapshotState(StateSnapshotContext)}. If yes, subclasses should + * override this method to return {@code true}. + * + * <p>Subclasses need to explicitly indicate the use of raw keyed state because, internally, + * the {@link AbstractStreamOperator} may attempt to read from it as well to restore heap-based timers and Review comment: `AbstractStreamOperatorV2` and ditto two lines below. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java ########## @@ -206,13 +207,36 @@ public final void initializeState(StreamTaskStateInitializer streamTaskStateMana ManagedMemoryUseCase.STATE_BACKEND, runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(), runtimeContext.getUserCodeClassLoader()), - false); + isUsingCustomRawKeyedState()); stateHandler = new StreamOperatorStateHandler(context, getExecutionConfig(), cancelables); timeServiceManager = context.internalTimerServiceManager(); stateHandler.initializeOperatorState(this); } + /** + * Indicates whether or not implementations of this class is writing to the raw keyed state streams + * on snapshots, using {@link #snapshotState(StateSnapshotContext)}. If yes, subclasses should + * override this method to return {@code true}. + * + * <p>Subclasses need to explicitly indicate the use of raw keyed state because, internally, + * the {@link AbstractStreamOperator} may attempt to read from it as well to restore heap-based timers and Review comment: nit: `AbstractStreamOperatorV2` and ditto two lines below. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org