AlanConfluent commented on code in PR #26567: URL: https://github.com/apache/flink/pull/26567#discussion_r2132922150
########## docs/layouts/shortcodes/generated/execution_config_configuration.html: ########## @@ -62,6 +62,36 @@ <td>Boolean</td> <td>Set whether to use the SQL/Table operators based on the asynchronous state api. Default value is false.</td> </tr> + <tr> + <td><h5>table.exec.async-table.buffer-capacity</h5><br> <span class="label label-primary">Streaming</span></td> Review Comment: Changed. I chose `max-concurrent-operations` since async is already in the name, if that works for you. ########## docs/layouts/shortcodes/generated/execution_config_configuration.html: ########## @@ -62,6 +62,36 @@ <td>Boolean</td> <td>Set whether to use the SQL/Table operators based on the asynchronous state api. Default value is false.</td> </tr> + <tr> + <td><h5>table.exec.async-table.buffer-capacity</h5><br> <span class="label label-primary">Streaming</span></td> + <td style="word-wrap: break-word;">10</td> + <td>Integer</td> + <td>The max number of async i/o operations that the async table function can trigger.</td> + </tr> + <tr> + <td><h5>table.exec.async-table.max-attempts</h5><br> <span class="label label-primary">Streaming</span></td> Review Comment: You're right. It's not the total tries, but just the retries. Changing the name. ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionMappingExtractor.java: ########## @@ -220,11 +222,24 @@ static MethodVerification createParameterAndCompletableFutureVerification(Class< final Class<?> resultClass = result.toClass(); Review Comment: I agree with that. It seems a bit overly defensive. I think it's there to verify that it's validating an output template rather than a state template. Obviously, in the place where this is being called, it should never have state. Maybe a better `MethodVerification` interface could have methods to validate them separately. ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionMappingExtractor.java: ########## @@ -208,7 +209,8 @@ static MethodVerification createParameterVerification(boolean requireAccumulator * Verification that checks a method by parameters (arguments only) with mandatory {@link * CompletableFuture}. */ - static MethodVerification createParameterAndCompletableFutureVerification(Class<?> baseClass) { + static MethodVerification createParameterAndCompletableFutureVerification( Review Comment: I don't have any plans beyond `Collection`. I may have made this more generic than needed. Changed it to be `verifyFutureContainsCollection`. ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BridgingFunctionGenUtil.scala: ########## @@ -205,25 +206,36 @@ object BridgingFunctionGenUtil { private def generateAsyncTableFunctionCall( functionTerm: String, externalOperands: Seq[GeneratedExpression], - outputType: LogicalType): GeneratedExpression = { + returnType: LogicalType, + outputDataType: DataType): GeneratedExpression = { + + val DELEGATE_ASYNC_TABLE = className[DelegatingAsyncTableResultFuture] + val outputType = outputDataType.getLogicalType - val DELEGATE = className[DelegatingResultFuture[_]] + // If we need to wrap data in a row, it's done in the delegating class. + val needsWrapping = !isCompositeType(outputType) + val isInternal = DataTypeUtils.isInternal(outputDataType); + val arguments = Seq( + s""" + |delegates.getCompletableFuture() + |""".stripMargin + ) ++ externalOperands.map(_.resultTerm) + val anyNull = externalOperands.map(_.nullTerm) ++ Seq("false") val functionCallCode = s""" |${externalOperands.map(_.code).mkString("\n")} - |if (${externalOperands.map(_.nullTerm).mkString(" || ")}) { + |if (${anyNull.mkString(" || ")}) { Review Comment: > Does it mean if all the input arguments are null, the AsyncTableFunction will return an empty result collection? Not sure if this is an expected behavior. If any are null, it will generate code which returns an empty collection which will result in no output. I think this is equivalent to what a table function does by doing nothing. If I fail to call the future, it will consider it incomplete and consider the call pending indefinitely, which we don't want. What is the expected behavior here? >There is a skipIfArgsNull flag. It is honored in TableFunction and ProcessTableFunction. Is there any reason to not honor it here? BTW, it seems that flag is always set to false. Not sure how I missed `skipIfArgsNull`. Added support for that here too. ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/correlate/async/AsyncCorrelateRunner.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.correlate.async; + +import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.functions.async.CollectionSupplier; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.conversion.DataStructureConverter; +import org.apache.flink.table.data.utils.JoinedRowData; +import org.apache.flink.table.runtime.generated.GeneratedFunction; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +/** + * Async function runner for {@link org.apache.flink.table.functions.AsyncTableFunction}. It invokes + * the UDF for each of the input rows, joining the responses with the input. + */ +public class AsyncCorrelateRunner extends RichAsyncFunction<RowData, RowData> { + + private static final long serialVersionUID = 7004349814516992850L; Review Comment: Ok, sounds good. ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/RemoteCorrelateSplitRule.java: ########## @@ -137,7 +137,7 @@ public RexNode visitCall(RexCall call) { call.getOperands().stream() .map(x -> x.accept(this)) .collect(Collectors.toList()); - return rexBuilder.makeCall(call.getOperator(), newProjects); + return rexBuilder.makeCall(call.getType(), call.getOperator(), newProjects); Review Comment: It's related to fixing a bug with using a cast with async correlates (and possibly python as well). Hao reminded me [here](https://github.com/apache/flink/pull/26567#discussion_r2103426554), so I included it, though it could have been done as a followup. ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/AsyncCorrelateCodeGeneratorTest.java: ########## @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.codegen; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.functions.async.CollectionSupplier; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.FunctionHint; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.functions.AsyncTableFunction; +import org.apache.flink.table.planner.calcite.SqlToRexConverter; +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; +import org.apache.flink.table.planner.utils.PlannerMocks; +import org.apache.flink.table.planner.utils.ShortcutUtils; +import org.apache.flink.table.runtime.generated.GeneratedFunction; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.types.Row; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexCall; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link AsyncCorrelateCodeGenerator}. */ +public class AsyncCorrelateCodeGeneratorTest { + + private static final RowType INPUT_TYPE = + RowType.of(new IntType(), new BigIntType(), new VarCharType()); + + private PlannerMocks plannerMocks; + private SqlToRexConverter converter; + + private RelDataType tableRowType; + + @BeforeEach + public void before() { + plannerMocks = PlannerMocks.create(); + tableRowType = + plannerMocks + .getPlannerContext() + .getTypeFactory() + .buildRelNodeRowType( + JavaScalaConversionUtil.toScala(Arrays.asList("f1", "f2", "f3")), + JavaScalaConversionUtil.toScala( + Arrays.asList( + new IntType(), + new BigIntType(), + new VarCharType()))); + ShortcutUtils.unwrapContext(plannerMocks.getPlanner().createToRelContext().getCluster()); + plannerMocks + .getFunctionCatalog() + .registerTemporarySystemFunction("myfunc", new AsyncFunc(), false); + plannerMocks + .getFunctionCatalog() + .registerTemporarySystemFunction("myfunc2", new AsyncRowFunc(), false); + plannerMocks + .getFunctionCatalog() + .registerTemporarySystemFunction("myfunc_error", new AsyncFuncError(), false); + + converter = + ShortcutUtils.unwrapContext( + plannerMocks.getPlanner().createToRelContext().getCluster()) + .getRexFactory() + .createSqlToRexConverter( + tableRowType, + plannerMocks + .getPlannerContext() + .getTypeFactory() + .createFieldTypeFromLogicalType( + RowType.of(VarCharType.STRING_TYPE))); + } + + @Test + public void testStringReturnType() throws Exception { + List<Object> objects = + execute( + "myFunc(f1, f2, f3)", + RowType.of(VarCharType.STRING_TYPE), + GenericRowData.of(2, 3L, StringData.fromString("foo"))); + assertThat(objects) + .containsExactly(Row.of("complete1 foo 4 6"), Row.of("complete2 foo 4 6")); + } + + @Test + public void testRowReturnType() throws Exception { Review Comment: Makes sense. Added one for RowData. ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalCorrelateRule.scala: ########## @@ -79,7 +81,9 @@ class StreamPhysicalCorrelateRule(config: Config) extends ConverterRule(config) val newCalc = getMergedCalc(calc) convertToCorrelate( tableScan, - Some(newCalc.getProgram.expandLocalRef(newCalc.getProgram.getCondition))) + if (newCalc.getProgram.getCondition == null) None + else Some(newCalc.getProgram.expandLocalRef(newCalc.getProgram.getCondition)) Review Comment: When I tested the case of a calc being used with no condition, as I was testing the async case, it results in a NullPointerException I believe, so I just fixed it here. ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionMappingExtractor.java: ########## @@ -220,11 +222,24 @@ static MethodVerification createParameterAndCompletableFutureVerification(Class< final Class<?> resultClass = result.toClass(); Type genericType = method.getGenericParameterTypes()[0]; genericType = resolveVariableWithClassContext(baseClass, genericType); - if (!(genericType instanceof ParameterizedType)) { + Optional<ParameterizedType> parameterized = getParameterizedType(genericType); + if (!parameterized.isPresent()) { throw extractionError( "The method '%s' needs generic parameters for the CompletableFuture at position %d.", method.getName(), 0); } + // If nestedArgumentClass is given, it is assumed to be a generic parameters of + // argumentClass, also at the position genericPos + if (nestedArgumentClass != null) { + genericType = parameterized.get().getActualTypeArguments()[0]; Review Comment: You're right, I can avoid reusing the variables if I just move up `returnType`, which is better. ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunctionHelper.java: ########## @@ -558,18 +565,31 @@ private static void validateAsyncImplementationMethod( if (method.getParameterCount() >= 1) { Type firstParam = method.getGenericParameterTypes()[0]; firstParam = ExtractionUtils.resolveVariableWithClassContext(clazz, firstParam); - if (CompletableFuture.class.equals(firstParam) - || firstParam instanceof ParameterizedType - && CompletableFuture.class.equals( - ((ParameterizedType) firstParam).getRawType())) { - foundParam = true; + if (isGenericOfClass(CompletableFuture.class, firstParam)) { + Optional<ParameterizedType> parameterized = getParameterizedType(firstParam); + if (!verifyFutureContainsCollection) { + foundParam = true; + } else if (parameterized.isPresent() + && parameterized.get().getActualTypeArguments().length > 0) { + firstParam = parameterized.get().getActualTypeArguments()[0]; Review Comment: Good call. Changed. ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRule.java: ########## @@ -68,27 +70,35 @@ public class AsyncCalcSplitRule { /** * An Async implementation of {@link RemoteCalcCallFinder} which finds uses of {@link - * org.apache.flink.table.functions.AsyncScalarFunction}. + * org.apache.flink.table.functions.AsyncScalarFunction} and {@link + * org.apache.flink.table.functions.AsyncTableFunction}. */ public static class AsyncRemoteCalcCallFinder implements RemoteCalcCallFinder { Review Comment: I do use AsyncRemoteCallFinder in a few places now, so it makes sense to move it to a more general place. Moved to `AsyncUtils` as you suggested. > The interface of RemoteCalcCallFinder should also be renamed to RemoteCallFinder. Done ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalAsyncCorrelateRule.java: ########## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.stream; + +import org.apache.flink.table.functions.FunctionKind; +import org.apache.flink.table.planner.plan.nodes.FlinkConventions; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalAsyncCorrelate; +import org.apache.flink.table.planner.plan.utils.AsyncUtil; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.plan.volcano.RelSubset; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rex.RexNode; + +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * A physical rule for identifying logical correlates containing {@link + * org.apache.flink.table.functions.AsyncTableFunction} calls and converting them to physical {@link + * StreamPhysicalAsyncCorrelate} RelNodes. + */ +public class StreamPhysicalAsyncCorrelateRule extends ConverterRule { + + public static final RelOptRule INSTANCE = + new StreamPhysicalAsyncCorrelateRule( + Config.INSTANCE.withConversion( + FlinkLogicalCorrelate.class, + FlinkConventions.LOGICAL(), + FlinkConventions.STREAM_PHYSICAL(), + "StreamPhysicalAsyncCorrelateRule")); + + protected StreamPhysicalAsyncCorrelateRule(Config config) { + super(config); + } + + // find only calc and table function + private boolean findAsyncTableFunction(FlinkLogicalCalc calc) { + RelNode child = ((RelSubset) calc.getInput()).getOriginal(); + if (child instanceof FlinkLogicalTableFunctionScan) { + FlinkLogicalTableFunctionScan scan = (FlinkLogicalTableFunctionScan) child; + return AsyncUtil.isAsyncCall(scan.getCall(), FunctionKind.ASYNC_TABLE); + } else if (child instanceof FlinkLogicalCalc) { + FlinkLogicalCalc childCalc = (FlinkLogicalCalc) child; + return findAsyncTableFunction(childCalc); + } + return false; + } + + @Override + public boolean matches(RelOptRuleCall call) { + FlinkLogicalCorrelate correlate = call.rel(0); + RelNode right = ((RelSubset) correlate.getRight()).getOriginal(); + if (right instanceof FlinkLogicalTableFunctionScan) { + // right node is a table function + FlinkLogicalTableFunctionScan scan = (FlinkLogicalTableFunctionScan) right; + return AsyncUtil.isAsyncCall(scan.getCall(), FunctionKind.ASYNC_TABLE); + } else if (right instanceof FlinkLogicalCalc) { + // a filter is pushed above the table function + return findAsyncTableFunction((FlinkLogicalCalc) right); + } + return false; + } + + @Override + public RelNode convert(RelNode rel) { + FlinkLogicalCorrelate correlate = (FlinkLogicalCorrelate) rel; + RelTraitSet traitSet = rel.getTraitSet().replace(FlinkConventions.STREAM_PHYSICAL()); + RelNode convInput = + RelOptRule.convert(correlate.getInput(0), FlinkConventions.STREAM_PHYSICAL()); + RelNode right = correlate.getInput(1); + return convertToCorrelate( + right, correlate, traitSet, convInput, Optional.empty(), Optional.empty()); + } + + public RelNode convertToCorrelate( + RelNode relNode, + FlinkLogicalCorrelate correlate, + RelTraitSet traitSet, + RelNode convInput, + Optional<List<RexNode>> projections, + Optional<RexNode> condition) { + if (relNode instanceof RelSubset) { + RelSubset rel = (RelSubset) relNode; + return convertToCorrelate( + rel.getRelList().get(0), + correlate, + traitSet, + convInput, + projections, + condition); + } else if (relNode instanceof FlinkLogicalCalc) { + FlinkLogicalCalc calc = (FlinkLogicalCalc) relNode; + RelNode tableScan = StreamPhysicalCorrelateRule.getTableScan(calc); + FlinkLogicalCalc newCalc = StreamPhysicalCorrelateRule.getMergedCalc(calc); + // The projections are not handled here or in the base version, so currently we match + // that functionality. + return convertToCorrelate( + tableScan, + correlate, + traitSet, + convInput, + Optional.ofNullable( + newCalc.getProgram().getProjectList() == null + ? null + : newCalc.getProgram().getProjectList().stream() + .map(newCalc.getProgram()::expandLocalRef) + .collect(Collectors.toList())), + Optional.ofNullable( + newCalc.getProgram().getCondition() == null + ? null + : newCalc.getProgram() Review Comment: Yes, correct. It will throw an error. This is tested in `AsyncCorrelateITCase`. ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BridgingFunctionGenUtil.scala: ########## @@ -205,25 +206,36 @@ object BridgingFunctionGenUtil { private def generateAsyncTableFunctionCall( functionTerm: String, externalOperands: Seq[GeneratedExpression], - outputType: LogicalType): GeneratedExpression = { + returnType: LogicalType, + outputDataType: DataType): GeneratedExpression = { + + val DELEGATE_ASYNC_TABLE = className[DelegatingAsyncTableResultFuture] + val outputType = outputDataType.getLogicalType - val DELEGATE = className[DelegatingResultFuture[_]] + // If we need to wrap data in a row, it's done in the delegating class. + val needsWrapping = !isCompositeType(outputType) + val isInternal = DataTypeUtils.isInternal(outputDataType); + val arguments = Seq( + s""" + |delegates.getCompletableFuture() + |""".stripMargin + ) ++ externalOperands.map(_.resultTerm) + val anyNull = externalOperands.map(_.nullTerm) ++ Seq("false") val functionCallCode = s""" |${externalOperands.map(_.code).mkString("\n")} - |if (${externalOperands.map(_.nullTerm).mkString(" || ")}) { + |if (${anyNull.mkString(" || ")}) { | $DEFAULT_COLLECTOR_TERM.complete(java.util.Collections.emptyList()); Review Comment: I think this comes from `FunctionCodeGenerator` where `collectorTerm` is used to mean any return variable. I would have to change some lookup join code, and possibly other async functions as well. I don't want to do too much of a refactoring here to keep things as simple as possible, if that works for you. ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecAsyncCorrelate.java: ########## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.common; + +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.conversion.DataStructureConverter; +import org.apache.flink.table.data.conversion.DataStructureConverters; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.codegen.AsyncCorrelateCodeGenerator; +import org.apache.flink.table.planner.delegation.PlannerBase; +import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext; +import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; +import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator; +import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; +import org.apache.flink.table.planner.plan.utils.AsyncUtil; +import org.apache.flink.table.runtime.generated.GeneratedFunction; +import org.apache.flink.table.runtime.operators.correlate.async.AsyncCorrelateRunner; +import org.apache.flink.table.runtime.operators.join.FlinkJoinType; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import org.apache.calcite.rex.RexCall; + +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** Base class for exec Async Correlate. */ +public class CommonExecAsyncCorrelate extends ExecNodeBase<RowData> + implements SingleTransformationTranslator<RowData> { + + public static final String ASYNC_CORRELATE_TRANSFORMATION = "async-correlate"; + + public static final String FIELD_NAME_JOIN_TYPE = "joinType"; + public static final String FIELD_NAME_FUNCTION_CALL = "functionCall"; + + @JsonProperty(FIELD_NAME_JOIN_TYPE) + private final FlinkJoinType joinType; + + @JsonProperty(FIELD_NAME_FUNCTION_CALL) + private final RexCall invocation; + + public CommonExecAsyncCorrelate( + int id, + ExecNodeContext context, + ReadableConfig persistedConfig, + FlinkJoinType joinType, + RexCall invocation, + List<InputProperty> inputProperties, + RowType outputType, + String description) { + super(id, context, persistedConfig, inputProperties, outputType, description); + checkArgument(inputProperties.size() == 1); + this.joinType = joinType; Review Comment: I have a followup where I do use it. It's a small fix, but didn't want to fold everything into one big PR. Happy to remove it for now if you prefer. ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BridgingFunctionGenUtil.scala: ########## @@ -205,25 +206,36 @@ object BridgingFunctionGenUtil { private def generateAsyncTableFunctionCall( functionTerm: String, externalOperands: Seq[GeneratedExpression], - outputType: LogicalType): GeneratedExpression = { + returnType: LogicalType, + outputDataType: DataType): GeneratedExpression = { + + val DELEGATE_ASYNC_TABLE = className[DelegatingAsyncTableResultFuture] + val outputType = outputDataType.getLogicalType - val DELEGATE = className[DelegatingResultFuture[_]] + // If we need to wrap data in a row, it's done in the delegating class. + val needsWrapping = !isCompositeType(outputType) + val isInternal = DataTypeUtils.isInternal(outputDataType); + val arguments = Seq( + s""" + |delegates.getCompletableFuture() + |""".stripMargin + ) ++ externalOperands.map(_.resultTerm) + val anyNull = externalOperands.map(_.nullTerm) ++ Seq("false") Review Comment: Without that, if there are no arguments, `${anyNull.mkString(" || ")}` is empty and the if statement below fails to compile. ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalAsyncCorrelateRule.java: ########## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.stream; + +import org.apache.flink.table.functions.FunctionKind; +import org.apache.flink.table.planner.plan.nodes.FlinkConventions; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalAsyncCorrelate; +import org.apache.flink.table.planner.plan.utils.AsyncUtil; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.plan.volcano.RelSubset; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rex.RexNode; + +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * A physical rule for identifying logical correlates containing {@link + * org.apache.flink.table.functions.AsyncTableFunction} calls and converting them to physical {@link + * StreamPhysicalAsyncCorrelate} RelNodes. + */ +public class StreamPhysicalAsyncCorrelateRule extends ConverterRule { + + public static final RelOptRule INSTANCE = + new StreamPhysicalAsyncCorrelateRule( + Config.INSTANCE.withConversion( + FlinkLogicalCorrelate.class, + FlinkConventions.LOGICAL(), + FlinkConventions.STREAM_PHYSICAL(), + "StreamPhysicalAsyncCorrelateRule")); + + protected StreamPhysicalAsyncCorrelateRule(Config config) { + super(config); + } + + // find only calc and table function + private boolean findAsyncTableFunction(FlinkLogicalCalc calc) { + RelNode child = ((RelSubset) calc.getInput()).getOriginal(); + if (child instanceof FlinkLogicalTableFunctionScan) { + FlinkLogicalTableFunctionScan scan = (FlinkLogicalTableFunctionScan) child; + return AsyncUtil.isAsyncCall(scan.getCall(), FunctionKind.ASYNC_TABLE); + } else if (child instanceof FlinkLogicalCalc) { + FlinkLogicalCalc childCalc = (FlinkLogicalCalc) child; + return findAsyncTableFunction(childCalc); + } + return false; + } + + @Override + public boolean matches(RelOptRuleCall call) { + FlinkLogicalCorrelate correlate = call.rel(0); + RelNode right = ((RelSubset) correlate.getRight()).getOriginal(); + if (right instanceof FlinkLogicalTableFunctionScan) { + // right node is a table function + FlinkLogicalTableFunctionScan scan = (FlinkLogicalTableFunctionScan) right; + return AsyncUtil.isAsyncCall(scan.getCall(), FunctionKind.ASYNC_TABLE); + } else if (right instanceof FlinkLogicalCalc) { + // a filter is pushed above the table function + return findAsyncTableFunction((FlinkLogicalCalc) right); + } + return false; Review Comment: You're right. I didn't notice they were effectively the same block of code. Done! ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/AsyncCorrelateITCase.java: ########## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.runtime.stream.table; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.functions.AsyncTableFunction; +import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.AsyncSumScalarFunction; +import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.AsyncTestTableFunction; +import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.SumScalarFunction; +import org.apache.flink.table.planner.runtime.utils.StreamingTestBase; +import org.apache.flink.types.Row; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** IT Case tests for correlate queries using {@link AsyncTableFunction}. */ +public class AsyncCorrelateITCase extends StreamingTestBase { Review Comment: I added tests for the concurrent operations, and number of retries. The timeout and delay configurations are a bit harder to test with an IT case, and are probably better left to the `AsyncWaitOperator` tests, though open to suggestions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org