lihaosky commented on code in PR #26567: URL: https://github.com/apache/flink/pull/26567#discussion_r2103359338
########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java: ########## @@ -443,6 +443,49 @@ public class ExecutionConfigOptions { "The max number of async retry attempts to make before task " + "execution is failed."); + // ------------------------------------------------------------------------ + // Async Table Function + // ------------------------------------------------------------------------ + @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING) + public static final ConfigOption<Integer> TABLE_EXEC_ASYNC_TABLE_BUFFER_CAPACITY = + key("table.exec.async-table.buffer-capacity") + .intType() + .defaultValue(10) + .withDescription( + "The max number of async i/o operations that the async table function can trigger."); + + @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING) + public static final ConfigOption<Duration> TABLE_EXEC_ASYNC_TABLE_TIMEOUT = + key("table.exec.async-table.timeout") + .durationType() + .defaultValue(Duration.ofMinutes(3)) + .withDescription( + "The async timeout for the asynchronous operation to complete."); Review Comment: Is this the timeout for each retry or total timeout for all retries? I suppose it's for each retry ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java: ########## @@ -373,4 +374,28 @@ public static void validateLambdaType(Class<?> baseClass, Type t) { + "Otherwise the type has to be specified explicitly using type information."); } } + + /** + * Will return true if the type of the given generic class type. + * + * @param clazz The generic class to check against + * @param type The type to be checked + */ + public static boolean isGenericOfClass(Class<?> clazz, Type type) { + Optional<ParameterizedType> parameterized = getParameterizedType(type); + return clazz.equals(type) + || parameterized.isPresent() && clazz.equals(parameterized.get().getRawType()); + } + + /** + * Returns an optional of a ParameterizedType, if that's what the type is. + * + * @param type The type to check + * @return optional which is present if the type is a ParameterizedType + */ + public static Optional<ParameterizedType> getParameterizedType(Type type) { Review Comment: Add unit test for these two function? ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCorrelateSplitRuleTest.java: ########## @@ -82,4 +90,31 @@ public void testCorrelateIndirectOtherWay() { String sqlQuery = "select * FROM MyTable, LATERAL TABLE(tableFunc(func1(ABS(a))))"; util.verifyRelPlan(sqlQuery); } + + @Test + public void testCorrelateWithSystem() { + String sqlQuery = "select * FROM MyTable, LATERAL TABLE(asyncTableFunc(ABS(a)))"; + util.verifyRelPlan(sqlQuery); + } + + @Test + public void testCorrelateWithScalar() { + String sqlQuery = "select * FROM MyTable, LATERAL TABLE(asyncTableFunc(scalar(a)))"; + util.verifyRelPlan(sqlQuery); + } + Review Comment: Can you add ``` @Test public void testCorrelateWithCast() { String sqlQuery = "select * FROM MyTable, LATERAL TABLE(asyncTableFunc(cast(cast(a as int) as int)))"; util.verifyRelPlan(sqlQuery); } ``` There was a fix I made internally around cast ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRule.java: ########## @@ -71,24 +72,35 @@ public class AsyncCalcSplitRule { * org.apache.flink.table.functions.AsyncScalarFunction}. Review Comment: as well as `AsyncTableFunction`? ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java: ########## @@ -373,4 +374,28 @@ public static void validateLambdaType(Class<?> baseClass, Type t) { + "Otherwise the type has to be specified explicitly using type information."); } } + + /** + * Will return true if the type of the given generic class type. Review Comment: ```suggestion * Will return true if the type of the given generic class type matches clazz. ``` ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecAsyncCorrelate.java: ########## @@ -0,0 +1,93 @@ +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.flink.FlinkVersion; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata; +import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; +import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecAsyncCorrelate; +import org.apache.flink.table.runtime.operators.join.FlinkJoinType; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexNode; + +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecAsyncCorrelate.ASYNC_CORRELATE_TRANSFORMATION; + +/** + * Stream {@link ExecNode} which matches along with join a Java/Scala user defined table function. + */ +@ExecNodeMetadata( + name = "stream-exec-async-correlate", + version = 1, + producedTransformations = ASYNC_CORRELATE_TRANSFORMATION, + minPlanVersion = FlinkVersion.v1_19, + minStateVersion = FlinkVersion.v1_19) Review Comment: Not exist yet in 1.19? ########## flink-table/flink-table-common/src/test/java/org/apache/flink/table/functions/UserDefinedFunctionHelperTest.java: ########## @@ -295,32 +327,69 @@ public static class ValidAsyncScalarFunction extends AsyncScalarFunction { public void eval(CompletableFuture<Integer> future, int i) {} } + /** Valid table function. */ + public static class ValidAsyncTableFunction extends AsyncTableFunction<Integer> { + public void eval(CompletableFuture<Collection<Integer>> future, int i) {} + } + private static class PrivateAsyncScalarFunction extends AsyncScalarFunction { public void eval(CompletableFuture<Integer> future, int i) {} } + private static class PrivateAsyncTableFunction extends AsyncTableFunction<Integer> { + public void eval(CompletableFuture<Collection<Integer>> future, int i) {} + } + /** No implementation method. */ public static class MissingImplementationAsyncScalarFunction extends AsyncScalarFunction { // nothing to do } + /** No implementation method. */ + public static class MissingImplementationAsyncTableFunction + extends AsyncTableFunction<Integer> { + // nothing to do + } + /** Implementation method is private. */ public static class PrivateMethodAsyncScalarFunction extends AsyncScalarFunction { private void eval(CompletableFuture<Integer> future, int i) {} } + /** Implementation method is private. */ + public static class PrivateMethodAsyncTableFunction extends AsyncTableFunction<Integer> { + private void eval(CompletableFuture<Collection<Integer>> future, int i) {} + } + /** Implementation method isn't void. */ public static class NonVoidAsyncScalarFunction extends AsyncScalarFunction { public String eval(CompletableFuture<Integer> future, int i) { return ""; } } + /** Implementation method isn't void. */ + public static class NonVoidAsyncTableFunction extends AsyncScalarFunction { + public String eval(CompletableFuture<Collection<Integer>> future, int i) { + return ""; + } + } + /** Implementation method isn't void. */ public static class NoFutureAsyncScalarFunction extends AsyncScalarFunction { public void eval(int i) {} } + /** Implementation method isn't void. */ Review Comment: first param isn't `CompletableFuture<Collection>`? ########## flink-table/flink-table-common/src/test/java/org/apache/flink/table/functions/UserDefinedFunctionHelperTest.java: ########## @@ -295,32 +327,69 @@ public static class ValidAsyncScalarFunction extends AsyncScalarFunction { public void eval(CompletableFuture<Integer> future, int i) {} } + /** Valid table function. */ + public static class ValidAsyncTableFunction extends AsyncTableFunction<Integer> { + public void eval(CompletableFuture<Collection<Integer>> future, int i) {} + } + private static class PrivateAsyncScalarFunction extends AsyncScalarFunction { public void eval(CompletableFuture<Integer> future, int i) {} } + private static class PrivateAsyncTableFunction extends AsyncTableFunction<Integer> { + public void eval(CompletableFuture<Collection<Integer>> future, int i) {} + } + /** No implementation method. */ public static class MissingImplementationAsyncScalarFunction extends AsyncScalarFunction { // nothing to do } + /** No implementation method. */ + public static class MissingImplementationAsyncTableFunction + extends AsyncTableFunction<Integer> { + // nothing to do + } + /** Implementation method is private. */ public static class PrivateMethodAsyncScalarFunction extends AsyncScalarFunction { private void eval(CompletableFuture<Integer> future, int i) {} } + /** Implementation method is private. */ + public static class PrivateMethodAsyncTableFunction extends AsyncTableFunction<Integer> { + private void eval(CompletableFuture<Collection<Integer>> future, int i) {} + } + /** Implementation method isn't void. */ public static class NonVoidAsyncScalarFunction extends AsyncScalarFunction { public String eval(CompletableFuture<Integer> future, int i) { return ""; } } + /** Implementation method isn't void. */ + public static class NonVoidAsyncTableFunction extends AsyncScalarFunction { + public String eval(CompletableFuture<Collection<Integer>> future, int i) { + return ""; + } + } + /** Implementation method isn't void. */ public static class NoFutureAsyncScalarFunction extends AsyncScalarFunction { public void eval(int i) {} } + /** Implementation method isn't void. */ + public static class NoFutureAsyncTableFunction extends AsyncTableFunction<Integer> { + public void eval(int i) {} + } + + /** Implementation method isn't void. */ Review Comment: ditto ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRule.java: ########## @@ -71,24 +72,35 @@ public class AsyncCalcSplitRule { * org.apache.flink.table.functions.AsyncScalarFunction}. */ public static class AsyncRemoteCalcCallFinder implements RemoteCalcCallFinder { + + private final FunctionKind functionKind; + + public AsyncRemoteCalcCallFinder() { + this(FunctionKind.ASYNC_SCALAR); + } Review Comment: nit: Not sure if we want to default to scalar function given both are supported now. Requiring functionKind is probably fine ########## docs/layouts/shortcodes/generated/sink_configuration.html: ########## @@ -0,0 +1,18 @@ +<table class="configuration table table-bordered"> Review Comment: This is unrelated the async table function? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org