jectpro7 commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r1577181524


##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -94,29 +109,83 @@ public class AsyncExecutionController<K> {
      */
     final AtomicInteger inFlightRecordNum;
 
-    public AsyncExecutionController(MailboxExecutor mailboxExecutor, 
StateExecutor stateExecutor) {
-        this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, 
DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
-    }
+    /** The executor service that schedules and calls the triggers of this 
task. */
+    ScheduledExecutorService scheduledExecutor;
+
+    ScheduledFuture<Void> currentScheduledFuture;
+
+    /**
+     * The current trigger sequence number, used to distinguish different 
triggers. Every time a
+     * trigger occurs, {@code currentTriggerSeq} increases by one.
+     */
+    AtomicLong currentTriggerSeq;
 
     public AsyncExecutionController(
             MailboxExecutor mailboxExecutor,
+            CallbackExceptionHandler exceptionHandler,
             StateExecutor stateExecutor,
             int batchSize,
+            long bufferTimeout,
             int maxInFlightRecords) {
         this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
         this.mailboxExecutor = mailboxExecutor;
-        this.stateFutureFactory = new StateFutureFactory<>(this, 
mailboxExecutor);
+        this.callbackExceptionHandler = exceptionHandler;
+        this.stateFutureFactory =
+                new StateFutureFactory<>(this, mailboxExecutor, 
callbackExceptionHandler);
         this.stateExecutor = stateExecutor;
         this.batchSize = batchSize;
+        this.bufferTimeout = bufferTimeout;
         this.maxInFlightRecordNum = maxInFlightRecords;
         this.stateRequestsBuffer = new StateRequestBuffer<>();
         this.inFlightRecordNum = new AtomicInteger(0);
+        this.currentTriggerSeq = new AtomicLong(0);
+
+        // ----------------- initialize buffer timeout -------------------
+        this.currentScheduledFuture = null;
+        if (bufferTimeout > 0) {
+            this.scheduledExecutor =
+                    new ScheduledThreadPoolExecutor(
+                            1, new 
ExecutorThreadFactory("AEC-timeout-scheduler"));

Review Comment:
   Is that possible to  share the `ScheduledThreadPoolExecutor` for all AEC to 
reduce the overhead of thread context switching? It only for scheduling and the 
task is non-blocking, which should run very fast. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to