jectpro7 commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r1576541718


##########
flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java:
##########
@@ -46,13 +46,18 @@ public class StateFutureImpl<T> implements 
InternalStateFuture<T> {
     /** The callback runner. */
     protected final CallbackRunner callbackRunner;
 
-    public StateFutureImpl(CallbackRunner callbackRunner) {
+    protected final CallbackExceptionHandler exceptionHandler;
+
+    public StateFutureImpl(
+            CallbackRunner callbackRunner, CallbackExceptionHandler 
exceptionHandler) {
         this.completableFuture = new CompletableFuture<>();
         this.callbackRunner = callbackRunner;
+        this.exceptionHandler = exceptionHandler;
     }
 
     @Override
-    public <U> StateFuture<U> thenApply(Function<? super T, ? extends U> fn) {
+    public <U> StateFuture<U> thenApply(

Review Comment:
   `thenApply` can also be simplified with `thenCompose` by 
wrapping`CompletedStateFuture`



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java:
##########
@@ -58,17 +60,35 @@ public abstract class AbstractAsyncStateStreamOperator<OUT> 
extends AbstractStre
 
     private RecordContext currentProcessingContext;
 
+    private Environment environment;
+
     /** Initialize necessary state components for {@link 
AbstractStreamOperator}. */
     @Override
     public void setup(
             StreamTask<?, ?> containingTask,
             StreamConfig config,
             Output<StreamRecord<OUT>> output) {
         super.setup(containingTask, config, output);
-        // TODO: properly read config and setup
-        final MailboxExecutor mailboxExecutor =
-                containingTask.getEnvironment().getMainMailboxExecutor();
-        this.asyncExecutionController = new 
AsyncExecutionController(mailboxExecutor, null);
+        final Environment environment = containingTask.getEnvironment();
+        final MailboxExecutor mailboxExecutor = 
environment.getMainMailboxExecutor();
+        final int inFlightRecordsLimit =
+                
environment.getExecutionConfig().getAsyncInflightRecordsLimit();
+        final int asyncBufferSize = 
environment.getExecutionConfig().getAsyncStateBufferSize();
+        final long asyncBufferTimeout =
+                environment.getExecutionConfig().getAsyncStateBufferTimeout();
+        // TODO: initial state executor and set state executor for aec
+        this.asyncExecutionController =
+                new AsyncExecutionController(
+                        mailboxExecutor,
+                        this::handleStateCallbackException,
+                        null,
+                        asyncBufferSize,
+                        asyncBufferTimeout,
+                        inFlightRecordsLimit);
+    }
+
+    private void handleStateCallbackException(String message, Throwable 
exception) {

Review Comment:
   shall we make it `protected` so the user could customize it?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -94,29 +109,83 @@ public class AsyncExecutionController<K> {
      */
     final AtomicInteger inFlightRecordNum;
 
-    public AsyncExecutionController(MailboxExecutor mailboxExecutor, 
StateExecutor stateExecutor) {
-        this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, 
DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
-    }
+    /** The executor service that schedules and calls the triggers of this 
task. */
+    ScheduledExecutorService scheduledExecutor;
+
+    ScheduledFuture<Void> currentScheduledFuture;
+
+    /**
+     * The current trigger sequence number, used to distinguish different 
triggers. Every time a
+     * trigger occurs, {@code currentTriggerSeq} increases by one.
+     */
+    AtomicLong currentTriggerSeq;
 
     public AsyncExecutionController(
             MailboxExecutor mailboxExecutor,
+            CallbackExceptionHandler exceptionHandler,
             StateExecutor stateExecutor,
             int batchSize,
+            long bufferTimeout,
             int maxInFlightRecords) {
         this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
         this.mailboxExecutor = mailboxExecutor;
-        this.stateFutureFactory = new StateFutureFactory<>(this, 
mailboxExecutor);
+        this.callbackExceptionHandler = exceptionHandler;
+        this.stateFutureFactory =
+                new StateFutureFactory<>(this, mailboxExecutor, 
callbackExceptionHandler);
         this.stateExecutor = stateExecutor;
         this.batchSize = batchSize;
+        this.bufferTimeout = bufferTimeout;
         this.maxInFlightRecordNum = maxInFlightRecords;
         this.stateRequestsBuffer = new StateRequestBuffer<>();
         this.inFlightRecordNum = new AtomicInteger(0);
+        this.currentTriggerSeq = new AtomicLong(0);
+
+        // ----------------- initialize buffer timeout -------------------
+        this.currentScheduledFuture = null;
+        if (bufferTimeout > 0) {
+            this.scheduledExecutor =
+                    new ScheduledThreadPoolExecutor(
+                            1, new 
ExecutorThreadFactory("AEC-timeout-scheduler"));
+            ((ScheduledThreadPoolExecutor) 
this.scheduledExecutor).setRemoveOnCancelPolicy(true);
+            // make sure shutdown removes all pending tasks
+            ((ScheduledThreadPoolExecutor) this.scheduledExecutor)
+                    
.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+            ((ScheduledThreadPoolExecutor) this.scheduledExecutor)
+                    .setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+        } else {
+            this.scheduledExecutor = null;
+        }
+
         LOG.info(
-                "Create AsyncExecutionController: batchSize {}, 
maxInFlightRecordsNum {}",
+                "Create AsyncExecutionController: batchSize {}, bufferTimeout 
{}, maxInFlightRecordsNum {}",
                 batchSize,
+                bufferTimeout,
                 maxInFlightRecords);
     }
 
+    void scheduleTimeout(long triggerSeq) {
+        if (bufferTimeout > 0) {

Review Comment:
   If I understand it correctly, when `bufferTimeout <= 0`, some records may 
have higher latency if the streaming is slow, which needs long time to reach 
the limit of batchSize. Is the behaviou expected?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to