becketqin commented on code in PR #26567: URL: https://github.com/apache/flink/pull/26567#discussion_r2114302806
########## docs/layouts/shortcodes/generated/execution_config_configuration.html: ########## @@ -62,6 +62,36 @@ <td>Boolean</td> <td>Set whether to use the SQL/Table operators based on the asynchronous state api. Default value is false.</td> </tr> + <tr> + <td><h5>table.exec.async-table.buffer-capacity</h5><br> <span class="label label-primary">Streaming</span></td> Review Comment: I know this is an existing naming convention, but the name "buffer-capacity" is based on the internal implementation of having a buffer, and may be confusing to the end users. A better naming might be something like `max-concurrent-async-operations`. ########## docs/layouts/shortcodes/generated/execution_config_configuration.html: ########## @@ -62,6 +62,36 @@ <td>Boolean</td> <td>Set whether to use the SQL/Table operators based on the asynchronous state api. Default value is false.</td> </tr> + <tr> + <td><h5>table.exec.async-table.buffer-capacity</h5><br> <span class="label label-primary">Streaming</span></td> + <td style="word-wrap: break-word;">10</td> + <td>Integer</td> + <td>The max number of async i/o operations that the async table function can trigger.</td> + </tr> + <tr> + <td><h5>table.exec.async-table.max-attempts</h5><br> <span class="label label-primary">Streaming</span></td> Review Comment: From the description, here the attempts means the "retry attempts", i.e. the first invocation is not accounted. If we are using the term "retry" in other configs, might as well use `max-retries` here as well. Otherwise, people might think this includes the first invocation. ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BridgingFunctionGenUtil.scala: ########## @@ -205,25 +206,36 @@ object BridgingFunctionGenUtil { private def generateAsyncTableFunctionCall( functionTerm: String, externalOperands: Seq[GeneratedExpression], - outputType: LogicalType): GeneratedExpression = { + returnType: LogicalType, + outputDataType: DataType): GeneratedExpression = { + + val DELEGATE_ASYNC_TABLE = className[DelegatingAsyncTableResultFuture] + val outputType = outputDataType.getLogicalType - val DELEGATE = className[DelegatingResultFuture[_]] + // If we need to wrap data in a row, it's done in the delegating class. + val needsWrapping = !isCompositeType(outputType) + val isInternal = DataTypeUtils.isInternal(outputDataType); + val arguments = Seq( + s""" + |delegates.getCompletableFuture() + |""".stripMargin + ) ++ externalOperands.map(_.resultTerm) + val anyNull = externalOperands.map(_.nullTerm) ++ Seq("false") Review Comment: Why do we need the `Seq(false)` here? ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/correlate/async/AsyncCorrelateRunner.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.correlate.async; + +import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.functions.async.CollectionSupplier; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.conversion.DataStructureConverter; +import org.apache.flink.table.data.utils.JoinedRowData; +import org.apache.flink.table.runtime.generated.GeneratedFunction; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +/** + * Async function runner for {@link org.apache.flink.table.functions.AsyncTableFunction}. It invokes + * the UDF for each of the input rows, joining the responses with the input. + */ +public class AsyncCorrelateRunner extends RichAsyncFunction<RowData, RowData> { + + private static final long serialVersionUID = 7004349814516992850L; Review Comment: I think the convention is to start from 1L. ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalAsyncCorrelateRule.java: ########## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.stream; + +import org.apache.flink.table.functions.FunctionKind; +import org.apache.flink.table.planner.plan.nodes.FlinkConventions; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalAsyncCorrelate; +import org.apache.flink.table.planner.plan.utils.AsyncUtil; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.plan.volcano.RelSubset; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rex.RexNode; + +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * A physical rule for identifying logical correlates containing {@link + * org.apache.flink.table.functions.AsyncTableFunction} calls and converting them to physical {@link + * StreamPhysicalAsyncCorrelate} RelNodes. + */ +public class StreamPhysicalAsyncCorrelateRule extends ConverterRule { + + public static final RelOptRule INSTANCE = + new StreamPhysicalAsyncCorrelateRule( + Config.INSTANCE.withConversion( + FlinkLogicalCorrelate.class, + FlinkConventions.LOGICAL(), + FlinkConventions.STREAM_PHYSICAL(), + "StreamPhysicalAsyncCorrelateRule")); + + protected StreamPhysicalAsyncCorrelateRule(Config config) { + super(config); + } + + // find only calc and table function + private boolean findAsyncTableFunction(FlinkLogicalCalc calc) { + RelNode child = ((RelSubset) calc.getInput()).getOriginal(); + if (child instanceof FlinkLogicalTableFunctionScan) { + FlinkLogicalTableFunctionScan scan = (FlinkLogicalTableFunctionScan) child; + return AsyncUtil.isAsyncCall(scan.getCall(), FunctionKind.ASYNC_TABLE); + } else if (child instanceof FlinkLogicalCalc) { + FlinkLogicalCalc childCalc = (FlinkLogicalCalc) child; + return findAsyncTableFunction(childCalc); + } + return false; + } + + @Override + public boolean matches(RelOptRuleCall call) { + FlinkLogicalCorrelate correlate = call.rel(0); + RelNode right = ((RelSubset) correlate.getRight()).getOriginal(); + if (right instanceof FlinkLogicalTableFunctionScan) { + // right node is a table function + FlinkLogicalTableFunctionScan scan = (FlinkLogicalTableFunctionScan) right; + return AsyncUtil.isAsyncCall(scan.getCall(), FunctionKind.ASYNC_TABLE); + } else if (right instanceof FlinkLogicalCalc) { + // a filter is pushed above the table function + return findAsyncTableFunction((FlinkLogicalCalc) right); + } + return false; Review Comment: If `findAsyncTableFunction` is changed to: ``` private boolean findAsyncTableFunction(RelNode node) { if (node instanceof FlinkLogicalTableFunctionScan) { FlinkLogicalTableFunctionScan scan = (FlinkLogicalTableFunctionScan) node; return AsyncUtil.isAsyncCall(scan.getCall(), FunctionKind.ASYNC_TABLE); } else if (node instanceof FlinkLogicalCalc) { RelNode child = ((RelSubset) node.getInput(0)).getOriginal(); return findAsyncTableFunction(child); } return false; } ``` This can be simplified to `return findAsyncTableFunction(right)`? ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BridgingFunctionGenUtil.scala: ########## @@ -205,25 +206,36 @@ object BridgingFunctionGenUtil { private def generateAsyncTableFunctionCall( functionTerm: String, externalOperands: Seq[GeneratedExpression], - outputType: LogicalType): GeneratedExpression = { + returnType: LogicalType, + outputDataType: DataType): GeneratedExpression = { + + val DELEGATE_ASYNC_TABLE = className[DelegatingAsyncTableResultFuture] + val outputType = outputDataType.getLogicalType - val DELEGATE = className[DelegatingResultFuture[_]] + // If we need to wrap data in a row, it's done in the delegating class. + val needsWrapping = !isCompositeType(outputType) + val isInternal = DataTypeUtils.isInternal(outputDataType); + val arguments = Seq( + s""" + |delegates.getCompletableFuture() + |""".stripMargin + ) ++ externalOperands.map(_.resultTerm) + val anyNull = externalOperands.map(_.nullTerm) ++ Seq("false") val functionCallCode = s""" |${externalOperands.map(_.code).mkString("\n")} - |if (${externalOperands.map(_.nullTerm).mkString(" || ")}) { + |if (${anyNull.mkString(" || ")}) { | $DEFAULT_COLLECTOR_TERM.complete(java.util.Collections.emptyList()); Review Comment: Existing code, but it seems we should use `$DEFAULT_DELEGATION_FUTURE_TERM` instead of `$DEFAULT_COLLECTOR_TERM` here. ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRule.java: ########## @@ -68,27 +70,35 @@ public class AsyncCalcSplitRule { /** * An Async implementation of {@link RemoteCalcCallFinder} which finds uses of {@link - * org.apache.flink.table.functions.AsyncScalarFunction}. + * org.apache.flink.table.functions.AsyncScalarFunction} and {@link + * org.apache.flink.table.functions.AsyncTableFunction}. */ public static class AsyncRemoteCalcCallFinder implements RemoteCalcCallFinder { Review Comment: Maybe move this class into the `AsyncUtils` and rename it to `AsyncRemoteCallFinder`. The interface of `RemoteCalcCallFinder` should also be renamed to `RemoteCallFinder`. ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/AsyncCorrelateCodeGeneratorTest.java: ########## @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.codegen; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.functions.async.CollectionSupplier; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.FunctionHint; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.functions.AsyncTableFunction; +import org.apache.flink.table.planner.calcite.SqlToRexConverter; +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; +import org.apache.flink.table.planner.utils.PlannerMocks; +import org.apache.flink.table.planner.utils.ShortcutUtils; +import org.apache.flink.table.runtime.generated.GeneratedFunction; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.types.Row; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexCall; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link AsyncCorrelateCodeGenerator}. */ +public class AsyncCorrelateCodeGeneratorTest { + + private static final RowType INPUT_TYPE = + RowType.of(new IntType(), new BigIntType(), new VarCharType()); + + private PlannerMocks plannerMocks; + private SqlToRexConverter converter; + + private RelDataType tableRowType; + + @BeforeEach + public void before() { + plannerMocks = PlannerMocks.create(); + tableRowType = + plannerMocks + .getPlannerContext() + .getTypeFactory() + .buildRelNodeRowType( + JavaScalaConversionUtil.toScala(Arrays.asList("f1", "f2", "f3")), + JavaScalaConversionUtil.toScala( + Arrays.asList( + new IntType(), + new BigIntType(), + new VarCharType()))); + ShortcutUtils.unwrapContext(plannerMocks.getPlanner().createToRelContext().getCluster()); + plannerMocks + .getFunctionCatalog() + .registerTemporarySystemFunction("myfunc", new AsyncFunc(), false); + plannerMocks + .getFunctionCatalog() + .registerTemporarySystemFunction("myfunc2", new AsyncRowFunc(), false); + plannerMocks + .getFunctionCatalog() + .registerTemporarySystemFunction("myfunc_error", new AsyncFuncError(), false); + + converter = + ShortcutUtils.unwrapContext( + plannerMocks.getPlanner().createToRelContext().getCluster()) + .getRexFactory() + .createSqlToRexConverter( + tableRowType, + plannerMocks + .getPlannerContext() + .getTypeFactory() + .createFieldTypeFromLogicalType( + RowType.of(VarCharType.STRING_TYPE))); + } + + @Test + public void testStringReturnType() throws Exception { + List<Object> objects = + execute( + "myFunc(f1, f2, f3)", + RowType.of(VarCharType.STRING_TYPE), + GenericRowData.of(2, 3L, StringData.fromString("foo"))); + assertThat(objects) + .containsExactly(Row.of("complete1 foo 4 6"), Row.of("complete2 foo 4 6")); + } + + @Test + public void testRowReturnType() throws Exception { Review Comment: May also worth adding a test for `RowData` for internal types. ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunctionHelper.java: ########## @@ -558,18 +565,31 @@ private static void validateAsyncImplementationMethod( if (method.getParameterCount() >= 1) { Type firstParam = method.getGenericParameterTypes()[0]; firstParam = ExtractionUtils.resolveVariableWithClassContext(clazz, firstParam); - if (CompletableFuture.class.equals(firstParam) - || firstParam instanceof ParameterizedType - && CompletableFuture.class.equals( - ((ParameterizedType) firstParam).getRawType())) { - foundParam = true; + if (isGenericOfClass(CompletableFuture.class, firstParam)) { + Optional<ParameterizedType> parameterized = getParameterizedType(firstParam); + if (!verifyFutureContainsCollection) { + foundParam = true; + } else if (parameterized.isPresent() + && parameterized.get().getActualTypeArguments().length > 0) { + firstParam = parameterized.get().getActualTypeArguments()[0]; Review Comment: nit: From readability perspective, it would be good to avoid reuse `firstParam` here. Maybe create a new variable like `firstTypeArgument`. ########## docs/layouts/shortcodes/generated/execution_config_configuration.html: ########## @@ -30,7 +30,7 @@ <td><h5>table.exec.async-scalar.buffer-capacity</h5><br> <span class="label label-primary">Streaming</span></td> <td style="word-wrap: break-word;">10</td> <td>Integer</td> - <td>The max number of async i/o operation that the async lookup join can trigger.</td> + <td>The max number of async i/o operation that the async scalar function can trigger.</td> Review Comment: scalar function => table function ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionMappingExtractor.java: ########## @@ -220,11 +222,24 @@ static MethodVerification createParameterAndCompletableFutureVerification(Class< final Class<?> resultClass = result.toClass(); Review Comment: nit: Existing code. I am curious why we put an assertion here as this is an argument passed by our code. If we put assertion here, then the question is why not put it in other places as well. ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionMappingExtractor.java: ########## @@ -220,11 +222,24 @@ static MethodVerification createParameterAndCompletableFutureVerification(Class< final Class<?> resultClass = result.toClass(); Type genericType = method.getGenericParameterTypes()[0]; genericType = resolveVariableWithClassContext(baseClass, genericType); - if (!(genericType instanceof ParameterizedType)) { + Optional<ParameterizedType> parameterized = getParameterizedType(genericType); + if (!parameterized.isPresent()) { throw extractionError( "The method '%s' needs generic parameters for the CompletableFuture at position %d.", method.getName(), 0); } + // If nestedArgumentClass is given, it is assumed to be a generic parameters of + // argumentClass, also at the position genericPos + if (nestedArgumentClass != null) { + genericType = parameterized.get().getActualTypeArguments()[0]; Review Comment: Similar comment as the other one. It would be good to not repurpose a variable. A slight modification of the code would make it more readable. ``` // If nestedArgumentClass is given, it is assumed to be a generic parameters of // argumentClass, also at the position genericPos final Type returnType; if (nestedArgumentClass != null) { Type nestedGenericType = parameterized.get().getActualTypeArguments()[0]; Optional<ParameterizedType> nestedParameterized = getParameterizedType(nestedGenericType); if (!nestedParameterized.isPresent() || !parameterized.get().getRawType().equals(nestedArgumentClass)) { throw extractionError( "The method '%s' expects nested generic type CompletableFuture<%s> for the %d arg.", method.getName(), nestedArgumentClass.getName(), 0); } returnType = nestedParameterized.get().getActualTypeArguments()[0]; } else { returnType = parameterized.get().getActualTypeArguments()[0]; } ``` ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionMappingExtractor.java: ########## @@ -208,7 +209,8 @@ static MethodVerification createParameterVerification(boolean requireAccumulator * Verification that checks a method by parameters (arguments only) with mandatory {@link * CompletableFuture}. */ - static MethodVerification createParameterAndCompletableFutureVerification(Class<?> baseClass) { + static MethodVerification createParameterAndCompletableFutureVerification( Review Comment: Do we expect the `nestedArgumentClass` to be anything other than `Collection`? The current logic seems has a strong assumption of that. If so, we can change this to a boolean `verifyFutureContainsCollection`. This makes it align with the style in `UserDefinedFunctionHelper#validateAsyncImplementationMethod` as well. ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BridgingFunctionGenUtil.scala: ########## @@ -205,25 +206,36 @@ object BridgingFunctionGenUtil { private def generateAsyncTableFunctionCall( functionTerm: String, externalOperands: Seq[GeneratedExpression], - outputType: LogicalType): GeneratedExpression = { + returnType: LogicalType, + outputDataType: DataType): GeneratedExpression = { + + val DELEGATE_ASYNC_TABLE = className[DelegatingAsyncTableResultFuture] + val outputType = outputDataType.getLogicalType - val DELEGATE = className[DelegatingResultFuture[_]] + // If we need to wrap data in a row, it's done in the delegating class. + val needsWrapping = !isCompositeType(outputType) + val isInternal = DataTypeUtils.isInternal(outputDataType); + val arguments = Seq( + s""" + |delegates.getCompletableFuture() + |""".stripMargin + ) ++ externalOperands.map(_.resultTerm) + val anyNull = externalOperands.map(_.nullTerm) ++ Seq("false") val functionCallCode = s""" |${externalOperands.map(_.code).mkString("\n")} - |if (${externalOperands.map(_.nullTerm).mkString(" || ")}) { + |if (${anyNull.mkString(" || ")}) { Review Comment: Does it mean if all the input arguments are null, the AsyncTableFunction will return an empty result collection? Not sure if this is an expected behavior. There is a `skipIfArgsNull` flag. It is honored in TableFunction and ProcessTableFunction. Is there any reason to not honor it here? BTW, it seems that flag is always set to false. ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/AsyncUtil.java: ########## @@ -47,7 +48,14 @@ public class AsyncUtil { * @return true if it contains an async function call in the specified node. */ public static boolean containsAsyncCall(RexNode node) { Review Comment: The class java doc should be updated. It is used for AsyncTableFunction as well now. ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecAsyncCorrelate.java: ########## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.common; + +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.conversion.DataStructureConverter; +import org.apache.flink.table.data.conversion.DataStructureConverters; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.codegen.AsyncCorrelateCodeGenerator; +import org.apache.flink.table.planner.delegation.PlannerBase; +import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext; +import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; +import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator; +import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; +import org.apache.flink.table.planner.plan.utils.AsyncUtil; +import org.apache.flink.table.runtime.generated.GeneratedFunction; +import org.apache.flink.table.runtime.operators.correlate.async.AsyncCorrelateRunner; +import org.apache.flink.table.runtime.operators.join.FlinkJoinType; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import org.apache.calcite.rex.RexCall; + +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** Base class for exec Async Correlate. */ +public class CommonExecAsyncCorrelate extends ExecNodeBase<RowData> + implements SingleTransformationTranslator<RowData> { + + public static final String ASYNC_CORRELATE_TRANSFORMATION = "async-correlate"; + + public static final String FIELD_NAME_JOIN_TYPE = "joinType"; + public static final String FIELD_NAME_FUNCTION_CALL = "functionCall"; + + @JsonProperty(FIELD_NAME_JOIN_TYPE) + private final FlinkJoinType joinType; + + @JsonProperty(FIELD_NAME_FUNCTION_CALL) + private final RexCall invocation; + + public CommonExecAsyncCorrelate( + int id, + ExecNodeContext context, + ReadableConfig persistedConfig, + FlinkJoinType joinType, + RexCall invocation, + List<InputProperty> inputProperties, + RowType outputType, + String description) { + super(id, context, persistedConfig, inputProperties, outputType, description); + checkArgument(inputProperties.size() == 1); + this.joinType = joinType; Review Comment: Is JoinType used any where? ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/RemoteCorrelateSplitRule.java: ########## @@ -137,7 +137,7 @@ public RexNode visitCall(RexCall call) { call.getOperands().stream() .map(x -> x.accept(this)) .collect(Collectors.toList()); - return rexBuilder.makeCall(call.getOperator(), newProjects); + return rexBuilder.makeCall(call.getType(), call.getOperator(), newProjects); Review Comment: Just curious, is this change orthogonal to FLIP-498? ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalCorrelateRule.scala: ########## @@ -79,7 +81,9 @@ class StreamPhysicalCorrelateRule(config: Config) extends ConverterRule(config) val newCalc = getMergedCalc(calc) convertToCorrelate( tableScan, - Some(newCalc.getProgram.expandLocalRef(newCalc.getProgram.getCondition))) + if (newCalc.getProgram.getCondition == null) None + else Some(newCalc.getProgram.expandLocalRef(newCalc.getProgram.getCondition)) Review Comment: Not sure I fully understand this change. Can you elaborate? ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalAsyncCorrelateRule.java: ########## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.stream; + +import org.apache.flink.table.functions.FunctionKind; +import org.apache.flink.table.planner.plan.nodes.FlinkConventions; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalAsyncCorrelate; +import org.apache.flink.table.planner.plan.utils.AsyncUtil; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.plan.volcano.RelSubset; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rex.RexNode; + +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * A physical rule for identifying logical correlates containing {@link + * org.apache.flink.table.functions.AsyncTableFunction} calls and converting them to physical {@link + * StreamPhysicalAsyncCorrelate} RelNodes. + */ +public class StreamPhysicalAsyncCorrelateRule extends ConverterRule { + + public static final RelOptRule INSTANCE = + new StreamPhysicalAsyncCorrelateRule( + Config.INSTANCE.withConversion( + FlinkLogicalCorrelate.class, + FlinkConventions.LOGICAL(), + FlinkConventions.STREAM_PHYSICAL(), + "StreamPhysicalAsyncCorrelateRule")); + + protected StreamPhysicalAsyncCorrelateRule(Config config) { + super(config); + } + + // find only calc and table function + private boolean findAsyncTableFunction(FlinkLogicalCalc calc) { + RelNode child = ((RelSubset) calc.getInput()).getOriginal(); + if (child instanceof FlinkLogicalTableFunctionScan) { + FlinkLogicalTableFunctionScan scan = (FlinkLogicalTableFunctionScan) child; + return AsyncUtil.isAsyncCall(scan.getCall(), FunctionKind.ASYNC_TABLE); + } else if (child instanceof FlinkLogicalCalc) { + FlinkLogicalCalc childCalc = (FlinkLogicalCalc) child; + return findAsyncTableFunction(childCalc); + } + return false; + } + + @Override + public boolean matches(RelOptRuleCall call) { + FlinkLogicalCorrelate correlate = call.rel(0); + RelNode right = ((RelSubset) correlate.getRight()).getOriginal(); + if (right instanceof FlinkLogicalTableFunctionScan) { + // right node is a table function + FlinkLogicalTableFunctionScan scan = (FlinkLogicalTableFunctionScan) right; + return AsyncUtil.isAsyncCall(scan.getCall(), FunctionKind.ASYNC_TABLE); + } else if (right instanceof FlinkLogicalCalc) { + // a filter is pushed above the table function + return findAsyncTableFunction((FlinkLogicalCalc) right); + } + return false; + } + + @Override + public RelNode convert(RelNode rel) { + FlinkLogicalCorrelate correlate = (FlinkLogicalCorrelate) rel; + RelTraitSet traitSet = rel.getTraitSet().replace(FlinkConventions.STREAM_PHYSICAL()); + RelNode convInput = + RelOptRule.convert(correlate.getInput(0), FlinkConventions.STREAM_PHYSICAL()); + RelNode right = correlate.getInput(1); + return convertToCorrelate( + right, correlate, traitSet, convInput, Optional.empty(), Optional.empty()); + } + + public RelNode convertToCorrelate( + RelNode relNode, + FlinkLogicalCorrelate correlate, + RelTraitSet traitSet, + RelNode convInput, + Optional<List<RexNode>> projections, + Optional<RexNode> condition) { + if (relNode instanceof RelSubset) { + RelSubset rel = (RelSubset) relNode; + return convertToCorrelate( + rel.getRelList().get(0), + correlate, + traitSet, + convInput, + projections, + condition); + } else if (relNode instanceof FlinkLogicalCalc) { + FlinkLogicalCalc calc = (FlinkLogicalCalc) relNode; + RelNode tableScan = StreamPhysicalCorrelateRule.getTableScan(calc); + FlinkLogicalCalc newCalc = StreamPhysicalCorrelateRule.getMergedCalc(calc); + // The projections are not handled here or in the base version, so currently we match + // that functionality. + return convertToCorrelate( + tableScan, + correlate, + traitSet, + convInput, + Optional.ofNullable( + newCalc.getProgram().getProjectList() == null + ? null + : newCalc.getProgram().getProjectList().stream() + .map(newCalc.getProgram()::expandLocalRef) + .collect(Collectors.toList())), + Optional.ofNullable( + newCalc.getProgram().getCondition() == null + ? null + : newCalc.getProgram() Review Comment: Will this cause exception to be thrown in `StreamPhysicalAsyncCorrelate` as it does not handle projections or conditions. ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/AsyncCorrelateITCase.java: ########## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.runtime.stream.table; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.functions.AsyncTableFunction; +import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.AsyncSumScalarFunction; +import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.AsyncTestTableFunction; +import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.SumScalarFunction; +import org.apache.flink.table.planner.runtime.utils.StreamingTestBase; +import org.apache.flink.types.Row; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** IT Case tests for correlate queries using {@link AsyncTableFunction}. */ +public class AsyncCorrelateITCase extends StreamingTestBase { Review Comment: In addition to testing the variance of invocation syntax, it would be good to also have test for the configurations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org