AlanConfluent commented on code in PR #26567: URL: https://github.com/apache/flink/pull/26567#discussion_r2110383237
########## docs/layouts/shortcodes/generated/sink_configuration.html: ########## @@ -0,0 +1,18 @@ +<table class="configuration table table-bordered"> Review Comment: Yes, it is unrelated. Not sure why it was generated. Let me remove it. ########## flink-table/flink-table-common/src/test/java/org/apache/flink/table/functions/UserDefinedFunctionHelperTest.java: ########## @@ -295,32 +327,69 @@ public static class ValidAsyncScalarFunction extends AsyncScalarFunction { public void eval(CompletableFuture<Integer> future, int i) {} } + /** Valid table function. */ + public static class ValidAsyncTableFunction extends AsyncTableFunction<Integer> { + public void eval(CompletableFuture<Collection<Integer>> future, int i) {} + } + private static class PrivateAsyncScalarFunction extends AsyncScalarFunction { public void eval(CompletableFuture<Integer> future, int i) {} } + private static class PrivateAsyncTableFunction extends AsyncTableFunction<Integer> { + public void eval(CompletableFuture<Collection<Integer>> future, int i) {} + } + /** No implementation method. */ public static class MissingImplementationAsyncScalarFunction extends AsyncScalarFunction { // nothing to do } + /** No implementation method. */ + public static class MissingImplementationAsyncTableFunction + extends AsyncTableFunction<Integer> { + // nothing to do + } + /** Implementation method is private. */ public static class PrivateMethodAsyncScalarFunction extends AsyncScalarFunction { private void eval(CompletableFuture<Integer> future, int i) {} } + /** Implementation method is private. */ + public static class PrivateMethodAsyncTableFunction extends AsyncTableFunction<Integer> { + private void eval(CompletableFuture<Collection<Integer>> future, int i) {} + } + /** Implementation method isn't void. */ public static class NonVoidAsyncScalarFunction extends AsyncScalarFunction { public String eval(CompletableFuture<Integer> future, int i) { return ""; } } + /** Implementation method isn't void. */ + public static class NonVoidAsyncTableFunction extends AsyncScalarFunction { + public String eval(CompletableFuture<Collection<Integer>> future, int i) { + return ""; + } + } + /** Implementation method isn't void. */ public static class NoFutureAsyncScalarFunction extends AsyncScalarFunction { public void eval(int i) {} } + /** Implementation method isn't void. */ Review Comment: Updated comment to be correct ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java: ########## @@ -443,6 +443,49 @@ public class ExecutionConfigOptions { "The max number of async retry attempts to make before task " + "execution is failed."); + // ------------------------------------------------------------------------ + // Async Table Function + // ------------------------------------------------------------------------ + @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING) + public static final ConfigOption<Integer> TABLE_EXEC_ASYNC_TABLE_BUFFER_CAPACITY = + key("table.exec.async-table.buffer-capacity") + .intType() + .defaultValue(10) + .withDescription( + "The max number of async i/o operations that the async table function can trigger."); + + @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING) + public static final ConfigOption<Duration> TABLE_EXEC_ASYNC_TABLE_TIMEOUT = + key("table.exec.async-table.timeout") + .durationType() + .defaultValue(Duration.ofMinutes(3)) + .withDescription( + "The async timeout for the asynchronous operation to complete."); Review Comment: It's for all retries to complete. Let me update the description to be clearer. ########## flink-table/flink-table-common/src/test/java/org/apache/flink/table/functions/UserDefinedFunctionHelperTest.java: ########## @@ -295,32 +327,69 @@ public static class ValidAsyncScalarFunction extends AsyncScalarFunction { public void eval(CompletableFuture<Integer> future, int i) {} } + /** Valid table function. */ + public static class ValidAsyncTableFunction extends AsyncTableFunction<Integer> { + public void eval(CompletableFuture<Collection<Integer>> future, int i) {} + } + private static class PrivateAsyncScalarFunction extends AsyncScalarFunction { public void eval(CompletableFuture<Integer> future, int i) {} } + private static class PrivateAsyncTableFunction extends AsyncTableFunction<Integer> { + public void eval(CompletableFuture<Collection<Integer>> future, int i) {} + } + /** No implementation method. */ public static class MissingImplementationAsyncScalarFunction extends AsyncScalarFunction { // nothing to do } + /** No implementation method. */ + public static class MissingImplementationAsyncTableFunction + extends AsyncTableFunction<Integer> { + // nothing to do + } + /** Implementation method is private. */ public static class PrivateMethodAsyncScalarFunction extends AsyncScalarFunction { private void eval(CompletableFuture<Integer> future, int i) {} } + /** Implementation method is private. */ + public static class PrivateMethodAsyncTableFunction extends AsyncTableFunction<Integer> { + private void eval(CompletableFuture<Collection<Integer>> future, int i) {} + } + /** Implementation method isn't void. */ public static class NonVoidAsyncScalarFunction extends AsyncScalarFunction { public String eval(CompletableFuture<Integer> future, int i) { return ""; } } + /** Implementation method isn't void. */ + public static class NonVoidAsyncTableFunction extends AsyncScalarFunction { + public String eval(CompletableFuture<Collection<Integer>> future, int i) { + return ""; + } + } + /** Implementation method isn't void. */ public static class NoFutureAsyncScalarFunction extends AsyncScalarFunction { public void eval(int i) {} } + /** Implementation method isn't void. */ + public static class NoFutureAsyncTableFunction extends AsyncTableFunction<Integer> { + public void eval(int i) {} + } + + /** Implementation method isn't void. */ Review Comment: Updated comment to be correct ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRule.java: ########## @@ -71,24 +72,35 @@ public class AsyncCalcSplitRule { * org.apache.flink.table.functions.AsyncScalarFunction}. */ public static class AsyncRemoteCalcCallFinder implements RemoteCalcCallFinder { + + private final FunctionKind functionKind; + + public AsyncRemoteCalcCallFinder() { + this(FunctionKind.ASYNC_SCALAR); + } Review Comment: Good point. Removed the default constructor and replaced the existing with the explicit scalar constructor. ########## flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java: ########## @@ -373,4 +374,28 @@ public static void validateLambdaType(Class<?> baseClass, Type t) { + "Otherwise the type has to be specified explicitly using type information."); } } + + /** + * Will return true if the type of the given generic class type. Review Comment: Done ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRule.java: ########## @@ -71,24 +72,35 @@ public class AsyncCalcSplitRule { * org.apache.flink.table.functions.AsyncScalarFunction}. Review Comment: Good call. Updated. ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecAsyncCorrelate.java: ########## @@ -0,0 +1,93 @@ +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.flink.FlinkVersion; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata; +import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; +import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecAsyncCorrelate; +import org.apache.flink.table.runtime.operators.join.FlinkJoinType; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexNode; + +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecAsyncCorrelate.ASYNC_CORRELATE_TRANSFORMATION; + +/** + * Stream {@link ExecNode} which matches along with join a Java/Scala user defined table function. + */ +@ExecNodeMetadata( + name = "stream-exec-async-correlate", + version = 1, + producedTransformations = ASYNC_CORRELATE_TRANSFORMATION, + minPlanVersion = FlinkVersion.v1_19, + minStateVersion = FlinkVersion.v1_19) Review Comment: Ah, good catch. Updated to be V2_1, which I believe will include this change. ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCorrelateSplitRuleTest.java: ########## @@ -82,4 +90,31 @@ public void testCorrelateIndirectOtherWay() { String sqlQuery = "select * FROM MyTable, LATERAL TABLE(tableFunc(func1(ABS(a))))"; util.verifyRelPlan(sqlQuery); } + + @Test + public void testCorrelateWithSystem() { + String sqlQuery = "select * FROM MyTable, LATERAL TABLE(asyncTableFunc(ABS(a)))"; + util.verifyRelPlan(sqlQuery); + } + + @Test + public void testCorrelateWithScalar() { + String sqlQuery = "select * FROM MyTable, LATERAL TABLE(asyncTableFunc(scalar(a)))"; + util.verifyRelPlan(sqlQuery); + } + Review Comment: Added that test and the fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org