masteryhx commented on code in PR #24614:
URL: https://github.com/apache/flink/pull/24614#discussion_r1555175989


##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.core.state.InternalStateFuture;
+
+import javax.annotation.Nullable;
+
+/**
+ * A request encapsulates the necessary data to perform a state request.
+ *
+ * @param <K> Type of partitioned key.
+ * @param <IN> Type of input of this request.
+ * @param <OUT> Type of value that request will return.
+ */
+public class StateRequest<K, IN, OUT> {
+
+    /** The type of processing request. */
+    public enum RequestType {
+        /** Process one record without state access. */
+        SYNC,

Review Comment:
   The naming is a bit confusing.
   I may thinked it as accessing state synchronously if I missed above comment.
   How about `NOP` or other names at least says `without state access`.
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateExecutor.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.concurrent.CompletableFuture;
+
+/** Executor for executing batch {@link StateRequest}s. */
+@Internal
+public interface StateExecutor {
+    /**
+     * Execute a batch of state requests.
+     *
+     * @param processingRequests the given batch of processing requests
+     * @return A future can determine whether execution has completed.
+     */
+    CompletableFuture<Boolean> executeBatchRequests(
+            Iterable<StateRequest<?, ?, ?>> processingRequests);

Review Comment:
   Different state types could be executed together, right ?
   How could we get specific state operations ? Current `RequestType` seems not 
enough.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.core.state.InternalStateFuture;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The Async Execution Controller (AEC) receives processing requests from 
operators, and put them
+ * into execution according to some strategies.
+ *
+ * <p>It is responsible for:
+ * <li>Preserving the sequence of elements bearing the same key by delaying 
subsequent requests
+ *     until the processing of preceding ones is finalized.
+ * <li>Tracking the in-flight data(records) and blocking the input if too much 
data in flight
+ *     (back-pressure). It invokes {@link MailboxExecutor#yield()} to pause 
current operations,
+ *     allowing for the execution of callbacks (mails in Mailbox).
+ *
+ * @param <R> the type of the record
+ * @param <K> the type of the key
+ */
+public class AsyncExecutionController<R, K> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AsyncExecutionController.class);
+
+    public static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000;
+
+    /** The max allow number of in-flight records. */
+    private final int maxInFlightRecordNum;
+
+    /** The key accounting unit which is used to detect the key conflict. */
+    final KeyAccountingUnit<R, K> keyAccountingUnit;
+
+    /**
+     * A factory to build {@link 
org.apache.flink.core.state.InternalStateFuture}, this will auto
+     * wire the created future with mailbox executor. Also conducting the 
context switch.
+     */
+    private final StateFutureFactory<R, K> stateFutureFactory;
+
+    /** The state executor where the {@link StateRequest} is actually 
executed. */
+    final StateExecutor stateExecutor;
+
+    /** The corresponding context that currently runs in task thread. */
+    RecordContext<R, K> currentContext;
+
+    public AsyncExecutionController(MailboxExecutor mailboxExecutor, 
StateExecutor stateExecutor) {
+        this(mailboxExecutor, stateExecutor, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
+    }
+
+    public AsyncExecutionController(
+            MailboxExecutor mailboxExecutor, StateExecutor stateExecutor, int 
maxInFlightRecords) {
+        this.keyAccountingUnit = new KeyAccountingUnit<>();
+        this.stateFutureFactory = new StateFutureFactory<>(this, 
mailboxExecutor);
+        this.stateExecutor = stateExecutor;
+        this.maxInFlightRecordNum = maxInFlightRecords;
+        LOG.info("Create AsyncExecutionController: maxInFlightRecordsNum {}", 
maxInFlightRecords);
+    }
+
+    /**
+     * Build a new context based on record and key. Also wired with internal 
{@link
+     * KeyAccountingUnit}.
+     *
+     * @param record the given record.
+     * @param key the given key.
+     * @return the built record context.
+     */
+    public RecordContext<R, K> buildContext(R record, K key) {

Review Comment:
   Could we unify this method with `setCurrentContext` ?
   Of course we could provide `getCurrentContext` only for tests.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.core.state.InternalStateFuture;
+
+import javax.annotation.Nullable;
+
+/**
+ * A request encapsulates the necessary data to perform a state request.
+ *
+ * @param <K> Type of partitioned key.
+ * @param <IN> Type of input of this request.
+ * @param <OUT> Type of value that request will return.
+ */
+public class StateRequest<K, IN, OUT> {

Review Comment:
   How about also adding type for `State`, e.g. `StateRequest<S extends State, 
K, IN, OUT>` ? 
   Then we could also avoid some explicit casting when `getState`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.core.state.InternalStateFuture;
+
+import javax.annotation.Nullable;
+
+/**
+ * A request encapsulates the necessary data to perform a state request.
+ *
+ * @param <K> Type of partitioned key.
+ * @param <IN> Type of input of this request.
+ * @param <OUT> Type of value that request will return.
+ */
+public class StateRequest<K, IN, OUT> {
+
+    /** The type of processing request. */
+    public enum RequestType {
+        /** Process one record without state access. */
+        SYNC,
+        /** Get from one {@link State}. */
+        GET,
+        /** Put to one {@link State}. */
+        PUT,
+        /** Merge value to an exist key in {@link State}. Mainly used for 
listState. */
+        MERGE,
+        /** Delete from one {@link State}. */
+        DELETE
+    }
+
+    /** The underlying state to be accessed, can be empty for {@link 
RequestType#SYNC}. */
+    @Nullable private final State state;
+
+    /** The type of this request. */
+    private final RequestType type;
+
+    /** The payload(input) of this request. */
+    @Nullable private final IN payload;
+
+    /** The future to collect the result of the request. */
+    private InternalStateFuture<OUT> stateFuture;
+
+    /** The record context of this request. */
+    private RecordContext<?, K> context;
+
+    StateRequest(@Nullable State state, RequestType type, @Nullable IN 
payload) {
+        this.state = state;
+        this.type = type;
+        this.payload = payload;
+    }
+
+    RequestType getRequestType() {
+        return type;
+    }
+
+    @Nullable
+    IN getPayload() {
+        return payload;
+    }
+
+    @Nullable
+    State getState() {
+        return state;
+    }
+
+    InternalStateFuture<OUT> getFuture() {
+        return stateFuture;
+    }
+
+    void setFuture(InternalStateFuture<OUT> future) {
+        stateFuture = future;
+    }
+
+    RecordContext<?, K> getRecordContext() {
+        return context;
+    }
+
+    void setRecordContext(RecordContext<?, K> context) {
+        this.context = context;
+    }

Review Comment:
   I am also thinking about whether we could avoid 'two-step setting'.
   How about extracting a new class e.g. `StateRequestContext` for AEC?
   Then State layer could only see `StateRequest`, AEC could also only set its 
Context.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/KeyAccountingUnit.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Key accounting unit holds the current in-flight key and tracks the 
corresponding ongoing records,
+ * which is used to preserve the ordering of independent chained {@link
+ * org.apache.flink.api.common.state.v2.StateFuture}.
+ *
+ * @param <R> the type of record
+ * @param <K> the type of key
+ */
+public class KeyAccountingUnit<R, K> {

Review Comment:
   We may have other implementations or it may be a interface, right ?
   Just a minor suggestion, How about adding TODO here ?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java:
##########
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.core.state.StateFutureUtils;
+import org.apache.flink.runtime.asyncprocessing.StateRequest.RequestType;
+import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
+import org.apache.flink.util.Preconditions;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Test for {@link AsyncExecutionController}. */
+class AsyncExecutionControllerTest {
+
+    // TODO: this test is not well completed, cause buffering in AEC is not 
implemented.
+    // Yet, just for illustrating the interaction between AEC and Async state 
API.
+    @Test
+    void testBasicRun() {
+        TestAsyncExecutionController<String, String> aec =
+                new TestAsyncExecutionController<>(
+                        new SyncMailboxExecutor(), new TestStateExecutor());
+        TestUnderlyingState underlyingState = new TestUnderlyingState();
+        TestValueState valueState = new TestValueState(aec, underlyingState);
+        AtomicInteger output = new AtomicInteger();
+        Runnable userCode =
+                () -> {
+                    valueState
+                            .asyncValue()
+                            .thenCompose(
+                                    val -> {
+                                        int updated = (val == null ? 1 : (val 
+ 1));
+                                        return valueState
+                                                .asyncUpdate(updated)
+                                                .thenCompose(
+                                                        o ->
+                                                                
StateFutureUtils.completedFuture(
+                                                                        
updated));
+                                    })
+                            .thenAccept(val -> output.set(val));
+                };
+
+        // ============================ element1 ============================
+        String record1 = "key1-r1";
+        String key1 = "key1";
+        // Simulate the wrapping in {@link 
RecordProcessorUtils#getRecordProcessor()}, wrapping the
+        // record and key with RecordContext.
+        RecordContext<String, String> recordContext1 = 
aec.buildContext(record1, key1);
+        aec.setCurrentContext(recordContext1);
+        // execute user code
+        userCode.run();
+
+        // Single-step run.
+        // Firstly, the user code generates value get in active buffer.
+        assertThat(aec.activeBuffer.size()).isEqualTo(1);
+        assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
+        aec.triggerIfNeeded(true);
+        // After running, the value update is in active buffer.
+        assertThat(aec.activeBuffer.size()).isEqualTo(1);
+        assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
+        aec.triggerIfNeeded(true);
+        // Value update finishes.
+        assertThat(aec.activeBuffer.size()).isEqualTo(0);
+        assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(0);
+        assertThat(output.get()).isEqualTo(1);
+        assertThat(recordContext1.getReferenceCount()).isEqualTo(0);
+
+        // ============================ element 2 & 3 
============================
+        String record2 = "key1-r2";
+        String key2 = "key1";
+        RecordContext<String, String> recordContext2 = 
aec.buildContext(record2, key2);
+        aec.setCurrentContext(recordContext2);
+        // execute user code
+        userCode.run();
+
+        String record3 = "key1-r3";
+        String key3 = "key1";
+        RecordContext<String, String> recordContext3 = 
aec.buildContext(record3, key3);
+        aec.setCurrentContext(recordContext3);
+        // execute user code
+        userCode.run();
+
+        // Single-step run.
+        // Firstly, the user code for record2 generates value get in active 
buffer,
+        // while user code for record3 generates value get in blocking buffer.
+        assertThat(aec.activeBuffer.size()).isEqualTo(1);
+        assertThat(aec.blockingBuffer.size()).isEqualTo(1);
+        assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
+        aec.triggerIfNeeded(true);
+        // After running, the value update for record2 is in active buffer.
+        assertThat(aec.activeBuffer.size()).isEqualTo(1);
+        assertThat(aec.blockingBuffer.size()).isEqualTo(1);
+        assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
+        aec.triggerIfNeeded(true);
+        // Value update for record2 finishes. The value get for record3 is 
still in blocking status.
+        assertThat(aec.activeBuffer.size()).isEqualTo(0);
+        assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(0);
+        assertThat(output.get()).isEqualTo(2);
+        assertThat(recordContext2.getReferenceCount()).isEqualTo(0);
+        assertThat(aec.blockingBuffer.size()).isEqualTo(1);
+
+        aec.migrateBlockingToActive();
+        // Value get for record3 is ready for run.
+
+        assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
+        assertThat(aec.activeBuffer.size()).isEqualTo(1);
+        assertThat(aec.blockingBuffer.size()).isEqualTo(0);
+        assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
+        aec.triggerIfNeeded(true);
+        // After running, the value update for record3 is in active buffer.
+        assertThat(aec.activeBuffer.size()).isEqualTo(1);
+        assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
+        aec.triggerIfNeeded(true);
+        // Value update for record3 finishes.
+        assertThat(aec.activeBuffer.size()).isEqualTo(0);
+        assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(0);
+        assertThat(output.get()).isEqualTo(3);
+        assertThat(recordContext3.getReferenceCount()).isEqualTo(0);
+
+        // ============================ element4 ============================
+        String record4 = "key3-r3";
+        String key4 = "key3";
+        RecordContext<String, String> recordContext4 = 
aec.buildContext(record4, key4);
+        aec.setCurrentContext(recordContext4);
+        // execute user code
+        userCode.run();
+
+        // Single-step run for another key.
+        // Firstly, the user code generates value get in active buffer.
+        assertThat(aec.activeBuffer.size()).isEqualTo(1);
+        assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
+        aec.triggerIfNeeded(true);
+        // After running, the value update is in active buffer.
+        assertThat(aec.activeBuffer.size()).isEqualTo(1);
+        assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
+        aec.triggerIfNeeded(true);
+        // Value update finishes.
+        assertThat(aec.activeBuffer.size()).isEqualTo(0);
+        assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(0);
+        assertThat(output.get()).isEqualTo(1);
+        assertThat(recordContext4.getReferenceCount()).isEqualTo(0);
+    }
+
+    /**
+     * An AsyncExecutionController for testing purpose, which integrates with 
basic buffer
+     * mechanism.
+     */
+    static class TestAsyncExecutionController<R, K> extends 
AsyncExecutionController<R, K> {
+
+        LinkedList<StateRequest<K, ?, ?>> activeBuffer;
+
+        LinkedList<StateRequest<K, ?, ?>> blockingBuffer;
+
+        public TestAsyncExecutionController(
+                MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) {
+            super(mailboxExecutor, stateExecutor);
+            activeBuffer = new LinkedList<>();
+            blockingBuffer = new LinkedList<>();
+        }
+
+        @Override
+        <IN, OUT> void insertActiveBuffer(StateRequest<K, IN, OUT> request) {
+            activeBuffer.push(request);
+        }
+
+        <IN, OUT> void insertBlockingBuffer(StateRequest<K, IN, OUT> request) {
+            blockingBuffer.push(request);
+        }
+
+        void triggerIfNeeded(boolean force) {
+            if (!force) {
+                // Disable normal trigger, to perform single-step debugging 
and check.
+                return;
+            }
+            LinkedList<StateRequest<?, ?, ?>> toRun = new 
LinkedList<>(activeBuffer);
+            activeBuffer.clear();
+            stateExecutor.executeBatchRequests(toRun);
+        }
+
+        void migrateBlockingToActive() {
+            Iterator<StateRequest<K, ?, ?>> blockingIter = 
blockingBuffer.iterator();
+            while (blockingIter.hasNext()) {
+                StateRequest<K, ?, ?> request = blockingIter.next();
+                if (request.getRecordContext().tryOccupyKey()) {
+                    insertActiveBuffer(request);
+                    blockingIter.remove();
+                }
+            }
+        }
+    }
+
+    /** Simulate the underlying state that is actually used to execute the 
request. */
+    static class TestUnderlyingState {
+
+        private HashMap<String, Integer> hashMap;
+
+        public TestUnderlyingState() {
+            this.hashMap = new HashMap<>();
+        }
+
+        public Integer get(String key) {
+            return hashMap.get(key);
+        }
+
+        public void update(String key, Integer val) {
+            hashMap.put(key, val);
+        }
+    }
+
+    static class TestValueState implements ValueState<Integer> {
+
+        private AsyncExecutionController<String, String> 
asyncExecutionController;
+
+        private TestUnderlyingState underlyingState;
+
+        public TestValueState(
+                AsyncExecutionController<String, String> aec, 
TestUnderlyingState underlyingState) {
+            this.asyncExecutionController = aec;
+            this.underlyingState = underlyingState;
+        }
+
+        @Override
+        public StateFuture<Void> asyncClear() {
+            StateRequest<String, Void, Void> request =

Review Comment:
   Generate `StateRequest` in State layer looks good to me.
   But how could we handle `SYNC` type which is invisible for `State` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to