Zakelly commented on code in PR #25501:
URL: https://github.com/apache/flink/pull/25501#discussion_r1805992325


##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java:
##########
@@ -241,33 +242,40 @@ public StreamOperatorStateContext 
streamOperatorStateContext(
             
streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);
 
             // -------------- Internal Timer Service Manager --------------
+            // if the operator indicates that it is using custom raw keyed 
state,
+            // then whatever was written in the raw keyed state snapshot was 
NOT written
+            // by the internal timer services (because there is only ever one 
user of raw keyed
+            // state);
+            // in this case, timers should not attempt to restore timers from 
the raw keyed
+            // state.
+            final Iterable<KeyGroupStatePartitionStreamProvider> 
restoredRawKeyedStateTimers =
+                    (prioritizedOperatorSubtaskStates.isRestored() && 
!isUsingCustomRawKeyedState)
+                            ? rawKeyedStateInputs
+                            : Collections.emptyList();
             if (keyedStatedBackend != null) {
-
-                // if the operator indicates that it is using custom raw keyed 
state,
-                // then whatever was written in the raw keyed state snapshot 
was NOT written
-                // by the internal timer services (because there is only ever 
one user of raw keyed
-                // state);
-                // in this case, timers should not attempt to restore timers 
from the raw keyed
-                // state.
-                final Iterable<KeyGroupStatePartitionStreamProvider> 
restoredRawKeyedStateTimers =
-                        (prioritizedOperatorSubtaskStates.isRestored()
-                                        && !isUsingCustomRawKeyedState)
-                                ? rawKeyedStateInputs
-                                : Collections.emptyList();
-
                 timeServiceManager =
                         timeServiceManagerProvider.create(
                                 
environment.getMetricGroup().getIOMetricGroup(),
                                 keyedStatedBackend,
+                                keyedStatedBackend.getKeyGroupRange(),
+                                
environment.getUserCodeClassLoader().asClassLoader(),
+                                keyContext,
+                                processingTimeService,
+                                restoredRawKeyedStateTimers,
+                                cancellationContext);
+            }
+            if (asyncKeyedStateBackend != null) {

Review Comment:
   How about this:
   ```
   if (stateBackend.supportsAsyncKeyedStateBackend()) {
       asyncTimeServiceManager =
                           timeServiceManagerProvider.create(xxxxxxx);
   } else {
       asyncTimeServiceManager = timeServiceManager;
   }
   ```
   
   In case `AsyncKeyedStateBackendAdaptor` is used, there is no need to create 
another `timeServiceManager`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to