jectpro7 commented on code in PR #24698: URL: https://github.com/apache/flink/pull/24698#discussion_r1577181524
########## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ########## @@ -94,29 +109,83 @@ public class AsyncExecutionController<K> { */ final AtomicInteger inFlightRecordNum; - public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) { - this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); - } + /** The executor service that schedules and calls the triggers of this task. */ + ScheduledExecutorService scheduledExecutor; + + ScheduledFuture<Void> currentScheduledFuture; + + /** + * The current trigger sequence number, used to distinguish different triggers. Every time a + * trigger occurs, {@code currentTriggerSeq} increases by one. + */ + AtomicLong currentTriggerSeq; public AsyncExecutionController( MailboxExecutor mailboxExecutor, + CallbackExceptionHandler exceptionHandler, StateExecutor stateExecutor, int batchSize, + long bufferTimeout, int maxInFlightRecords) { this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords); this.mailboxExecutor = mailboxExecutor; - this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor); + this.callbackExceptionHandler = exceptionHandler; + this.stateFutureFactory = + new StateFutureFactory<>(this, mailboxExecutor, callbackExceptionHandler); this.stateExecutor = stateExecutor; this.batchSize = batchSize; + this.bufferTimeout = bufferTimeout; this.maxInFlightRecordNum = maxInFlightRecords; this.stateRequestsBuffer = new StateRequestBuffer<>(); this.inFlightRecordNum = new AtomicInteger(0); + this.currentTriggerSeq = new AtomicLong(0); + + // ----------------- initialize buffer timeout ------------------- + this.currentScheduledFuture = null; + if (bufferTimeout > 0) { + this.scheduledExecutor = + new ScheduledThreadPoolExecutor( + 1, new ExecutorThreadFactory("AEC-timeout-scheduler")); Review Comment: Is that possible to share the `ScheduledThreadPoolExecutor` for all AEC to reduce the overhead of thread context switching? It only for scheduling and the task is non-blocking, which should run very fast. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org