masteryhx commented on code in PR #24740: URL: https://github.com/apache/flink/pull/24740#discussion_r1590911242
########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java: ########## @@ -176,15 +180,31 @@ public StreamOperatorStateContext streamOperatorStateContext( try { // -------------- Keyed State Backend -------------- - keyedStatedBackend = - keyedStatedBackend( - keySerializer, - operatorIdentifierText, - prioritizedOperatorSubtaskStates, - streamTaskCloseableRegistry, - metricGroup, - managedMemoryFraction, - statsCollector); + // TODO: Only init async keyed state backend if supported, consider to init different + // state backend by the operator type. + if (stateBackend.supportsAsyncKeyedStateBackend()) { + asyncKeyedStateBackend = + keyedStatedBackend( + keySerializer, + operatorIdentifierText, + prioritizedOperatorSubtaskStates, + streamTaskCloseableRegistry, + metricGroup, + managedMemoryFraction, + statsCollector, + StateBackend::createAsyncKeyedStateBackend); + } else { Review Comment: As discussed offline, Since current creation of `KeyedStateBackend` contains some allocation of resources, e.g. managed memory, we remain creating one type here. Also add a TODO to support `KeyedStateBackend` for `AsyncKeyedStateBackend` in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org