masteryhx commented on code in PR #24614: URL: https://github.com/apache/flink/pull/24614#discussion_r1555175989
########## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.apache.flink.api.common.state.v2.State; +import org.apache.flink.core.state.InternalStateFuture; + +import javax.annotation.Nullable; + +/** + * A request encapsulates the necessary data to perform a state request. + * + * @param <K> Type of partitioned key. + * @param <IN> Type of input of this request. + * @param <OUT> Type of value that request will return. + */ +public class StateRequest<K, IN, OUT> { + + /** The type of processing request. */ + public enum RequestType { + /** Process one record without state access. */ + SYNC, Review Comment: The naming is a bit confusing. I may thinked it as accessing state synchronously if I missed above comment. How about `NOP` or other names at least says `without state access`. ########## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateExecutor.java: ########## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.apache.flink.annotation.Internal; + +import java.util.concurrent.CompletableFuture; + +/** Executor for executing batch {@link StateRequest}s. */ +@Internal +public interface StateExecutor { + /** + * Execute a batch of state requests. + * + * @param processingRequests the given batch of processing requests + * @return A future can determine whether execution has completed. + */ + CompletableFuture<Boolean> executeBatchRequests( + Iterable<StateRequest<?, ?, ?>> processingRequests); Review Comment: Different state types could be executed together, right ? How could we get specific state operations ? Current `RequestType` seems not enough. ########## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ########## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.core.state.InternalStateFuture; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The Async Execution Controller (AEC) receives processing requests from operators, and put them + * into execution according to some strategies. + * + * <p>It is responsible for: + * <li>Preserving the sequence of elements bearing the same key by delaying subsequent requests + * until the processing of preceding ones is finalized. + * <li>Tracking the in-flight data(records) and blocking the input if too much data in flight + * (back-pressure). It invokes {@link MailboxExecutor#yield()} to pause current operations, + * allowing for the execution of callbacks (mails in Mailbox). + * + * @param <R> the type of the record + * @param <K> the type of the key + */ +public class AsyncExecutionController<R, K> { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutionController.class); + + public static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000; + + /** The max allow number of in-flight records. */ + private final int maxInFlightRecordNum; + + /** The key accounting unit which is used to detect the key conflict. */ + final KeyAccountingUnit<R, K> keyAccountingUnit; + + /** + * A factory to build {@link org.apache.flink.core.state.InternalStateFuture}, this will auto + * wire the created future with mailbox executor. Also conducting the context switch. + */ + private final StateFutureFactory<R, K> stateFutureFactory; + + /** The state executor where the {@link StateRequest} is actually executed. */ + final StateExecutor stateExecutor; + + /** The corresponding context that currently runs in task thread. */ + RecordContext<R, K> currentContext; + + public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) { + this(mailboxExecutor, stateExecutor, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); + } + + public AsyncExecutionController( + MailboxExecutor mailboxExecutor, StateExecutor stateExecutor, int maxInFlightRecords) { + this.keyAccountingUnit = new KeyAccountingUnit<>(); + this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor); + this.stateExecutor = stateExecutor; + this.maxInFlightRecordNum = maxInFlightRecords; + LOG.info("Create AsyncExecutionController: maxInFlightRecordsNum {}", maxInFlightRecords); + } + + /** + * Build a new context based on record and key. Also wired with internal {@link + * KeyAccountingUnit}. + * + * @param record the given record. + * @param key the given key. + * @return the built record context. + */ + public RecordContext<R, K> buildContext(R record, K key) { Review Comment: Could we unify this method with `setCurrentContext` ? Of course we could provide `getCurrentContext` only for tests. ########## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.apache.flink.api.common.state.v2.State; +import org.apache.flink.core.state.InternalStateFuture; + +import javax.annotation.Nullable; + +/** + * A request encapsulates the necessary data to perform a state request. + * + * @param <K> Type of partitioned key. + * @param <IN> Type of input of this request. + * @param <OUT> Type of value that request will return. + */ +public class StateRequest<K, IN, OUT> { Review Comment: How about also adding type for `State`, e.g. `StateRequest<S extends State, K, IN, OUT>` ? Then we could also avoid some explicit casting when `getState`. ########## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.apache.flink.api.common.state.v2.State; +import org.apache.flink.core.state.InternalStateFuture; + +import javax.annotation.Nullable; + +/** + * A request encapsulates the necessary data to perform a state request. + * + * @param <K> Type of partitioned key. + * @param <IN> Type of input of this request. + * @param <OUT> Type of value that request will return. + */ +public class StateRequest<K, IN, OUT> { + + /** The type of processing request. */ + public enum RequestType { + /** Process one record without state access. */ + SYNC, + /** Get from one {@link State}. */ + GET, + /** Put to one {@link State}. */ + PUT, + /** Merge value to an exist key in {@link State}. Mainly used for listState. */ + MERGE, + /** Delete from one {@link State}. */ + DELETE + } + + /** The underlying state to be accessed, can be empty for {@link RequestType#SYNC}. */ + @Nullable private final State state; + + /** The type of this request. */ + private final RequestType type; + + /** The payload(input) of this request. */ + @Nullable private final IN payload; + + /** The future to collect the result of the request. */ + private InternalStateFuture<OUT> stateFuture; + + /** The record context of this request. */ + private RecordContext<?, K> context; + + StateRequest(@Nullable State state, RequestType type, @Nullable IN payload) { + this.state = state; + this.type = type; + this.payload = payload; + } + + RequestType getRequestType() { + return type; + } + + @Nullable + IN getPayload() { + return payload; + } + + @Nullable + State getState() { + return state; + } + + InternalStateFuture<OUT> getFuture() { + return stateFuture; + } + + void setFuture(InternalStateFuture<OUT> future) { + stateFuture = future; + } + + RecordContext<?, K> getRecordContext() { + return context; + } + + void setRecordContext(RecordContext<?, K> context) { + this.context = context; + } Review Comment: I am also thinking about whether we could avoid 'two-step setting'. How about extracting a new class e.g. `StateRequestContext` for AEC? Then State layer could only see `StateRequest`, AEC could also only set its Context. ########## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/KeyAccountingUnit.java: ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.apache.flink.annotation.VisibleForTesting; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Key accounting unit holds the current in-flight key and tracks the corresponding ongoing records, + * which is used to preserve the ordering of independent chained {@link + * org.apache.flink.api.common.state.v2.StateFuture}. + * + * @param <R> the type of record + * @param <K> the type of key + */ +public class KeyAccountingUnit<R, K> { Review Comment: We may have other implementations or it may be a interface, right ? Just a minor suggestion, How about adding TODO here ? ########## flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java: ########## @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.api.common.state.v2.ValueState; +import org.apache.flink.core.state.StateFutureUtils; +import org.apache.flink.runtime.asyncprocessing.StateRequest.RequestType; +import org.apache.flink.runtime.mailbox.SyncMailboxExecutor; +import org.apache.flink.util.Preconditions; + +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +/** Test for {@link AsyncExecutionController}. */ +class AsyncExecutionControllerTest { + + // TODO: this test is not well completed, cause buffering in AEC is not implemented. + // Yet, just for illustrating the interaction between AEC and Async state API. + @Test + void testBasicRun() { + TestAsyncExecutionController<String, String> aec = + new TestAsyncExecutionController<>( + new SyncMailboxExecutor(), new TestStateExecutor()); + TestUnderlyingState underlyingState = new TestUnderlyingState(); + TestValueState valueState = new TestValueState(aec, underlyingState); + AtomicInteger output = new AtomicInteger(); + Runnable userCode = + () -> { + valueState + .asyncValue() + .thenCompose( + val -> { + int updated = (val == null ? 1 : (val + 1)); + return valueState + .asyncUpdate(updated) + .thenCompose( + o -> + StateFutureUtils.completedFuture( + updated)); + }) + .thenAccept(val -> output.set(val)); + }; + + // ============================ element1 ============================ + String record1 = "key1-r1"; + String key1 = "key1"; + // Simulate the wrapping in {@link RecordProcessorUtils#getRecordProcessor()}, wrapping the + // record and key with RecordContext. + RecordContext<String, String> recordContext1 = aec.buildContext(record1, key1); + aec.setCurrentContext(recordContext1); + // execute user code + userCode.run(); + + // Single-step run. + // Firstly, the user code generates value get in active buffer. + assertThat(aec.activeBuffer.size()).isEqualTo(1); + assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1); + aec.triggerIfNeeded(true); + // After running, the value update is in active buffer. + assertThat(aec.activeBuffer.size()).isEqualTo(1); + assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1); + aec.triggerIfNeeded(true); + // Value update finishes. + assertThat(aec.activeBuffer.size()).isEqualTo(0); + assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(0); + assertThat(output.get()).isEqualTo(1); + assertThat(recordContext1.getReferenceCount()).isEqualTo(0); + + // ============================ element 2 & 3 ============================ + String record2 = "key1-r2"; + String key2 = "key1"; + RecordContext<String, String> recordContext2 = aec.buildContext(record2, key2); + aec.setCurrentContext(recordContext2); + // execute user code + userCode.run(); + + String record3 = "key1-r3"; + String key3 = "key1"; + RecordContext<String, String> recordContext3 = aec.buildContext(record3, key3); + aec.setCurrentContext(recordContext3); + // execute user code + userCode.run(); + + // Single-step run. + // Firstly, the user code for record2 generates value get in active buffer, + // while user code for record3 generates value get in blocking buffer. + assertThat(aec.activeBuffer.size()).isEqualTo(1); + assertThat(aec.blockingBuffer.size()).isEqualTo(1); + assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1); + aec.triggerIfNeeded(true); + // After running, the value update for record2 is in active buffer. + assertThat(aec.activeBuffer.size()).isEqualTo(1); + assertThat(aec.blockingBuffer.size()).isEqualTo(1); + assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1); + aec.triggerIfNeeded(true); + // Value update for record2 finishes. The value get for record3 is still in blocking status. + assertThat(aec.activeBuffer.size()).isEqualTo(0); + assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(0); + assertThat(output.get()).isEqualTo(2); + assertThat(recordContext2.getReferenceCount()).isEqualTo(0); + assertThat(aec.blockingBuffer.size()).isEqualTo(1); + + aec.migrateBlockingToActive(); + // Value get for record3 is ready for run. + + assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1); + assertThat(aec.activeBuffer.size()).isEqualTo(1); + assertThat(aec.blockingBuffer.size()).isEqualTo(0); + assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1); + aec.triggerIfNeeded(true); + // After running, the value update for record3 is in active buffer. + assertThat(aec.activeBuffer.size()).isEqualTo(1); + assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1); + aec.triggerIfNeeded(true); + // Value update for record3 finishes. + assertThat(aec.activeBuffer.size()).isEqualTo(0); + assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(0); + assertThat(output.get()).isEqualTo(3); + assertThat(recordContext3.getReferenceCount()).isEqualTo(0); + + // ============================ element4 ============================ + String record4 = "key3-r3"; + String key4 = "key3"; + RecordContext<String, String> recordContext4 = aec.buildContext(record4, key4); + aec.setCurrentContext(recordContext4); + // execute user code + userCode.run(); + + // Single-step run for another key. + // Firstly, the user code generates value get in active buffer. + assertThat(aec.activeBuffer.size()).isEqualTo(1); + assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1); + aec.triggerIfNeeded(true); + // After running, the value update is in active buffer. + assertThat(aec.activeBuffer.size()).isEqualTo(1); + assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1); + aec.triggerIfNeeded(true); + // Value update finishes. + assertThat(aec.activeBuffer.size()).isEqualTo(0); + assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(0); + assertThat(output.get()).isEqualTo(1); + assertThat(recordContext4.getReferenceCount()).isEqualTo(0); + } + + /** + * An AsyncExecutionController for testing purpose, which integrates with basic buffer + * mechanism. + */ + static class TestAsyncExecutionController<R, K> extends AsyncExecutionController<R, K> { + + LinkedList<StateRequest<K, ?, ?>> activeBuffer; + + LinkedList<StateRequest<K, ?, ?>> blockingBuffer; + + public TestAsyncExecutionController( + MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) { + super(mailboxExecutor, stateExecutor); + activeBuffer = new LinkedList<>(); + blockingBuffer = new LinkedList<>(); + } + + @Override + <IN, OUT> void insertActiveBuffer(StateRequest<K, IN, OUT> request) { + activeBuffer.push(request); + } + + <IN, OUT> void insertBlockingBuffer(StateRequest<K, IN, OUT> request) { + blockingBuffer.push(request); + } + + void triggerIfNeeded(boolean force) { + if (!force) { + // Disable normal trigger, to perform single-step debugging and check. + return; + } + LinkedList<StateRequest<?, ?, ?>> toRun = new LinkedList<>(activeBuffer); + activeBuffer.clear(); + stateExecutor.executeBatchRequests(toRun); + } + + void migrateBlockingToActive() { + Iterator<StateRequest<K, ?, ?>> blockingIter = blockingBuffer.iterator(); + while (blockingIter.hasNext()) { + StateRequest<K, ?, ?> request = blockingIter.next(); + if (request.getRecordContext().tryOccupyKey()) { + insertActiveBuffer(request); + blockingIter.remove(); + } + } + } + } + + /** Simulate the underlying state that is actually used to execute the request. */ + static class TestUnderlyingState { + + private HashMap<String, Integer> hashMap; + + public TestUnderlyingState() { + this.hashMap = new HashMap<>(); + } + + public Integer get(String key) { + return hashMap.get(key); + } + + public void update(String key, Integer val) { + hashMap.put(key, val); + } + } + + static class TestValueState implements ValueState<Integer> { + + private AsyncExecutionController<String, String> asyncExecutionController; + + private TestUnderlyingState underlyingState; + + public TestValueState( + AsyncExecutionController<String, String> aec, TestUnderlyingState underlyingState) { + this.asyncExecutionController = aec; + this.underlyingState = underlyingState; + } + + @Override + public StateFuture<Void> asyncClear() { + StateRequest<String, Void, Void> request = Review Comment: Generate `StateRequest` in State layer looks good to me. But how could we handle `SYNC` type which is invisible for `State` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org