masteryhx commented on code in PR #24740: URL: https://github.com/apache/flink/pull/24740#discussion_r1590604525
########## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ########## @@ -104,15 +105,21 @@ public class AsyncExecutionController<K> implements StateRequestHandler { public AsyncExecutionController( MailboxExecutor mailboxExecutor, - StateExecutor stateExecutor, + AsyncKeyedStateBackend asyncKeyedStateBackend, int maxParallelism, int batchSize, long bufferTimeout, int maxInFlightRecords) { this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords); this.mailboxExecutor = mailboxExecutor; this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor); - this.stateExecutor = stateExecutor; + if (asyncKeyedStateBackend != null) { Review Comment: There are some considerations: 1. AsyncKeyedStateBackend should be initlized/setup with AEC, it's better to put the logic within AEC. 2. I prefer to let AEC control the life cycle of StateExecutor then operator could not know much about it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org