Zakelly commented on code in PR #24740: URL: https://github.com/apache/flink/pull/24740#discussion_r1590642587
########## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ########## @@ -104,15 +105,21 @@ public class AsyncExecutionController<K> implements StateRequestHandler { public AsyncExecutionController( MailboxExecutor mailboxExecutor, - StateExecutor stateExecutor, + AsyncKeyedStateBackend asyncKeyedStateBackend, int maxParallelism, int batchSize, long bufferTimeout, int maxInFlightRecords) { this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords); this.mailboxExecutor = mailboxExecutor; this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor); - this.stateExecutor = stateExecutor; + if (asyncKeyedStateBackend != null) { Review Comment: Well, I have different opinions... I suggest keeping `AEC` and `AKSB` simple by only keeping the necessary knowledge. It seems they work well even though they don't know each other. So I'd prefer do the binding within the operator. The UT is more easy to build if the `AEC` only takes `StateExecutor` in constructor. WDTY? ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java: ########## @@ -176,15 +180,31 @@ public StreamOperatorStateContext streamOperatorStateContext( try { // -------------- Keyed State Backend -------------- - keyedStatedBackend = - keyedStatedBackend( - keySerializer, - operatorIdentifierText, - prioritizedOperatorSubtaskStates, - streamTaskCloseableRegistry, - metricGroup, - managedMemoryFraction, - statsCollector); + // TODO: Only init async keyed state backend if supported, consider to init different + // state backend by the operator type. + if (stateBackend.supportsAsyncKeyedStateBackend()) { + asyncKeyedStateBackend = + keyedStatedBackend( + keySerializer, + operatorIdentifierText, + prioritizedOperatorSubtaskStates, + streamTaskCloseableRegistry, + metricGroup, + managedMemoryFraction, + statsCollector, + StateBackend::createAsyncKeyedStateBackend); + } else { Review Comment: Do you mean only create one type of state backend (sync/async). I'd suggest splitting this up, like https://github.com/apache/flink/pull/24697/files#diff-60f97b32726f78481f7852ea82daa2f3c9919933c93c234a049442195ccf0e2b does. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org