AlanConfluent commented on code in PR #26567:
URL: https://github.com/apache/flink/pull/26567#discussion_r2132922150


##########
docs/layouts/shortcodes/generated/execution_config_configuration.html:
##########
@@ -62,6 +62,36 @@
             <td>Boolean</td>
             <td>Set whether to use the SQL/Table operators based on the 
asynchronous state api. Default value is false.</td>
         </tr>
+        <tr>
+            <td><h5>table.exec.async-table.buffer-capacity</h5><br> <span 
class="label label-primary">Streaming</span></td>

Review Comment:
   Changed.  I chose `max-concurrent-operations` since async is already in the 
name, if that works for you.



##########
docs/layouts/shortcodes/generated/execution_config_configuration.html:
##########
@@ -62,6 +62,36 @@
             <td>Boolean</td>
             <td>Set whether to use the SQL/Table operators based on the 
asynchronous state api. Default value is false.</td>
         </tr>
+        <tr>
+            <td><h5>table.exec.async-table.buffer-capacity</h5><br> <span 
class="label label-primary">Streaming</span></td>
+            <td style="word-wrap: break-word;">10</td>
+            <td>Integer</td>
+            <td>The max number of async i/o operations that the async table 
function can trigger.</td>
+        </tr>
+        <tr>
+            <td><h5>table.exec.async-table.max-attempts</h5><br> <span 
class="label label-primary">Streaming</span></td>

Review Comment:
   You're right.  It's not the total tries, but just the retries.  Changing the 
name.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionMappingExtractor.java:
##########
@@ -220,11 +222,24 @@ static MethodVerification 
createParameterAndCompletableFutureVerification(Class<
             final Class<?> resultClass = result.toClass();

Review Comment:
   I agree with that.  It seems a bit overly defensive. I think it's there to 
verify that it's validating an output template rather than a state template.  
Obviously, in the place where this is being called, it should never have state. 
 Maybe a better `MethodVerification` interface could have methods to validate 
them separately.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionMappingExtractor.java:
##########
@@ -208,7 +209,8 @@ static MethodVerification 
createParameterVerification(boolean requireAccumulator
      * Verification that checks a method by parameters (arguments only) with 
mandatory {@link
      * CompletableFuture}.
      */
-    static MethodVerification 
createParameterAndCompletableFutureVerification(Class<?> baseClass) {
+    static MethodVerification createParameterAndCompletableFutureVerification(

Review Comment:
   I don't have any plans beyond `Collection`.  I may have made this more 
generic than needed.  Changed it to be `verifyFutureContainsCollection`.
   



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BridgingFunctionGenUtil.scala:
##########
@@ -205,25 +206,36 @@ object BridgingFunctionGenUtil {
   private def generateAsyncTableFunctionCall(
       functionTerm: String,
       externalOperands: Seq[GeneratedExpression],
-      outputType: LogicalType): GeneratedExpression = {
+      returnType: LogicalType,
+      outputDataType: DataType): GeneratedExpression = {
+
+    val DELEGATE_ASYNC_TABLE = className[DelegatingAsyncTableResultFuture]
+    val outputType = outputDataType.getLogicalType
 
-    val DELEGATE = className[DelegatingResultFuture[_]]
+    // If we need to wrap data in a row, it's done in the delegating class.
+    val needsWrapping = !isCompositeType(outputType)
+    val isInternal = DataTypeUtils.isInternal(outputDataType);
+    val arguments = Seq(
+      s"""
+         |delegates.getCompletableFuture()
+         |""".stripMargin
+    ) ++ externalOperands.map(_.resultTerm)
+    val anyNull = externalOperands.map(_.nullTerm) ++ Seq("false")
 
     val functionCallCode =
       s"""
          |${externalOperands.map(_.code).mkString("\n")}
-         |if (${externalOperands.map(_.nullTerm).mkString(" || ")}) {
+         |if (${anyNull.mkString(" || ")}) {

Review Comment:
   > Does it mean if all the input arguments are null, the AsyncTableFunction 
will return an empty result collection? Not sure if this is an expected 
behavior.
   
   If any are null, it will generate code which returns an empty collection 
which will result in no output.  I think this is equivalent to what a table 
function does by doing nothing.  If I fail to call the future, it will consider 
it incomplete and consider the call pending indefinitely, which we don't want.  
What is the expected behavior here?
   
   >There is a skipIfArgsNull flag. It is honored in TableFunction and 
ProcessTableFunction. Is there any reason to not honor it here? BTW, it seems 
that flag is always set to false. 
   
   Not sure how I missed `skipIfArgsNull`.  Added support for that here too.



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/correlate/async/AsyncCorrelateRunner.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.correlate.async;
+
+import org.apache.flink.api.common.functions.OpenContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.CollectionSupplier;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.runtime.generated.GeneratedFunction;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Async function runner for {@link 
org.apache.flink.table.functions.AsyncTableFunction}. It invokes
+ * the UDF for each of the input rows, joining the responses with the input.
+ */
+public class AsyncCorrelateRunner extends RichAsyncFunction<RowData, RowData> {
+
+    private static final long serialVersionUID = 7004349814516992850L;

Review Comment:
   Ok, sounds good.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/RemoteCorrelateSplitRule.java:
##########
@@ -137,7 +137,7 @@ public RexNode visitCall(RexCall call) {
                                 call.getOperands().stream()
                                         .map(x -> x.accept(this))
                                         .collect(Collectors.toList());
-                        return rexBuilder.makeCall(call.getOperator(), 
newProjects);
+                        return rexBuilder.makeCall(call.getType(), 
call.getOperator(), newProjects);

Review Comment:
   It's related to fixing a bug with using a cast with async correlates (and 
possibly python as well).  Hao reminded me 
[here](https://github.com/apache/flink/pull/26567#discussion_r2103426554), so I 
included it, though it could have been done as a followup.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/AsyncCorrelateCodeGeneratorTest.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.codegen;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.CollectionSupplier;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.FunctionHint;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.planner.calcite.SqlToRexConverter;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.PlannerMocks;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+import org.apache.flink.table.runtime.generated.GeneratedFunction;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.types.Row;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link AsyncCorrelateCodeGenerator}. */
+public class AsyncCorrelateCodeGeneratorTest {
+
+    private static final RowType INPUT_TYPE =
+            RowType.of(new IntType(), new BigIntType(), new VarCharType());
+
+    private PlannerMocks plannerMocks;
+    private SqlToRexConverter converter;
+
+    private RelDataType tableRowType;
+
+    @BeforeEach
+    public void before() {
+        plannerMocks = PlannerMocks.create();
+        tableRowType =
+                plannerMocks
+                        .getPlannerContext()
+                        .getTypeFactory()
+                        .buildRelNodeRowType(
+                                
JavaScalaConversionUtil.toScala(Arrays.asList("f1", "f2", "f3")),
+                                JavaScalaConversionUtil.toScala(
+                                        Arrays.asList(
+                                                new IntType(),
+                                                new BigIntType(),
+                                                new VarCharType())));
+        
ShortcutUtils.unwrapContext(plannerMocks.getPlanner().createToRelContext().getCluster());
+        plannerMocks
+                .getFunctionCatalog()
+                .registerTemporarySystemFunction("myfunc", new AsyncFunc(), 
false);
+        plannerMocks
+                .getFunctionCatalog()
+                .registerTemporarySystemFunction("myfunc2", new 
AsyncRowFunc(), false);
+        plannerMocks
+                .getFunctionCatalog()
+                .registerTemporarySystemFunction("myfunc_error", new 
AsyncFuncError(), false);
+
+        converter =
+                ShortcutUtils.unwrapContext(
+                                
plannerMocks.getPlanner().createToRelContext().getCluster())
+                        .getRexFactory()
+                        .createSqlToRexConverter(
+                                tableRowType,
+                                plannerMocks
+                                        .getPlannerContext()
+                                        .getTypeFactory()
+                                        .createFieldTypeFromLogicalType(
+                                                
RowType.of(VarCharType.STRING_TYPE)));
+    }
+
+    @Test
+    public void testStringReturnType() throws Exception {
+        List<Object> objects =
+                execute(
+                        "myFunc(f1, f2, f3)",
+                        RowType.of(VarCharType.STRING_TYPE),
+                        GenericRowData.of(2, 3L, 
StringData.fromString("foo")));
+        assertThat(objects)
+                .containsExactly(Row.of("complete1 foo 4 6"), 
Row.of("complete2 foo 4 6"));
+    }
+
+    @Test
+    public void testRowReturnType() throws Exception {

Review Comment:
   Makes sense.  Added one for RowData.



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalCorrelateRule.scala:
##########
@@ -79,7 +81,9 @@ class StreamPhysicalCorrelateRule(config: Config) extends 
ConverterRule(config)
           val newCalc = getMergedCalc(calc)
           convertToCorrelate(
             tableScan,
-            
Some(newCalc.getProgram.expandLocalRef(newCalc.getProgram.getCondition)))
+            if (newCalc.getProgram.getCondition == null) None
+            else 
Some(newCalc.getProgram.expandLocalRef(newCalc.getProgram.getCondition))

Review Comment:
   When I tested the case of a calc being used with no condition, as I was 
testing the async case, it results in a NullPointerException I believe, so I 
just fixed it here.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionMappingExtractor.java:
##########
@@ -220,11 +222,24 @@ static MethodVerification 
createParameterAndCompletableFutureVerification(Class<
             final Class<?> resultClass = result.toClass();
             Type genericType = method.getGenericParameterTypes()[0];
             genericType = resolveVariableWithClassContext(baseClass, 
genericType);
-            if (!(genericType instanceof ParameterizedType)) {
+            Optional<ParameterizedType> parameterized = 
getParameterizedType(genericType);
+            if (!parameterized.isPresent()) {
                 throw extractionError(
                         "The method '%s' needs generic parameters for the 
CompletableFuture at position %d.",
                         method.getName(), 0);
             }
+            // If nestedArgumentClass is given, it is assumed to be a generic 
parameters of
+            // argumentClass, also at the position genericPos
+            if (nestedArgumentClass != null) {
+                genericType = parameterized.get().getActualTypeArguments()[0];

Review Comment:
   You're right, I can avoid reusing the variables if I just move up 
`returnType`, which is better.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunctionHelper.java:
##########
@@ -558,18 +565,31 @@ private static void validateAsyncImplementationMethod(
             if (method.getParameterCount() >= 1) {
                 Type firstParam = method.getGenericParameterTypes()[0];
                 firstParam = 
ExtractionUtils.resolveVariableWithClassContext(clazz, firstParam);
-                if (CompletableFuture.class.equals(firstParam)
-                        || firstParam instanceof ParameterizedType
-                                && CompletableFuture.class.equals(
-                                        ((ParameterizedType) 
firstParam).getRawType())) {
-                    foundParam = true;
+                if (isGenericOfClass(CompletableFuture.class, firstParam)) {
+                    Optional<ParameterizedType> parameterized = 
getParameterizedType(firstParam);
+                    if (!verifyFutureContainsCollection) {
+                        foundParam = true;
+                    } else if (parameterized.isPresent()
+                            && 
parameterized.get().getActualTypeArguments().length > 0) {
+                        firstParam = 
parameterized.get().getActualTypeArguments()[0];

Review Comment:
   Good call.  Changed.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRule.java:
##########
@@ -68,27 +70,35 @@ public class AsyncCalcSplitRule {
 
     /**
      * An Async implementation of {@link RemoteCalcCallFinder} which finds 
uses of {@link
-     * org.apache.flink.table.functions.AsyncScalarFunction}.
+     * org.apache.flink.table.functions.AsyncScalarFunction} and {@link
+     * org.apache.flink.table.functions.AsyncTableFunction}.
      */
     public static class AsyncRemoteCalcCallFinder implements 
RemoteCalcCallFinder {

Review Comment:
   I do use AsyncRemoteCallFinder in a few places now, so it makes sense to 
move it to a more general place.  Moved to `AsyncUtils` as you suggested.
   
   > The interface of RemoteCalcCallFinder should also be renamed to 
RemoteCallFinder.
   
   Done



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalAsyncCorrelateRule.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.stream;
+
+import org.apache.flink.table.functions.FunctionKind;
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalAsyncCorrelate;
+import org.apache.flink.table.planner.plan.utils.AsyncUtil;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rex.RexNode;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * A physical rule for identifying logical correlates containing {@link
+ * org.apache.flink.table.functions.AsyncTableFunction} calls and converting 
them to physical {@link
+ * StreamPhysicalAsyncCorrelate} RelNodes.
+ */
+public class StreamPhysicalAsyncCorrelateRule extends ConverterRule {
+
+    public static final RelOptRule INSTANCE =
+            new StreamPhysicalAsyncCorrelateRule(
+                    Config.INSTANCE.withConversion(
+                            FlinkLogicalCorrelate.class,
+                            FlinkConventions.LOGICAL(),
+                            FlinkConventions.STREAM_PHYSICAL(),
+                            "StreamPhysicalAsyncCorrelateRule"));
+
+    protected StreamPhysicalAsyncCorrelateRule(Config config) {
+        super(config);
+    }
+
+    // find only calc and table function
+    private boolean findAsyncTableFunction(FlinkLogicalCalc calc) {
+        RelNode child = ((RelSubset) calc.getInput()).getOriginal();
+        if (child instanceof FlinkLogicalTableFunctionScan) {
+            FlinkLogicalTableFunctionScan scan = 
(FlinkLogicalTableFunctionScan) child;
+            return AsyncUtil.isAsyncCall(scan.getCall(), 
FunctionKind.ASYNC_TABLE);
+        } else if (child instanceof FlinkLogicalCalc) {
+            FlinkLogicalCalc childCalc = (FlinkLogicalCalc) child;
+            return findAsyncTableFunction(childCalc);
+        }
+        return false;
+    }
+
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+        FlinkLogicalCorrelate correlate = call.rel(0);
+        RelNode right = ((RelSubset) correlate.getRight()).getOriginal();
+        if (right instanceof FlinkLogicalTableFunctionScan) {
+            // right node is a table function
+            FlinkLogicalTableFunctionScan scan = 
(FlinkLogicalTableFunctionScan) right;
+            return AsyncUtil.isAsyncCall(scan.getCall(), 
FunctionKind.ASYNC_TABLE);
+        } else if (right instanceof FlinkLogicalCalc) {
+            // a filter is pushed above the table function
+            return findAsyncTableFunction((FlinkLogicalCalc) right);
+        }
+        return false;
+    }
+
+    @Override
+    public RelNode convert(RelNode rel) {
+        FlinkLogicalCorrelate correlate = (FlinkLogicalCorrelate) rel;
+        RelTraitSet traitSet = 
rel.getTraitSet().replace(FlinkConventions.STREAM_PHYSICAL());
+        RelNode convInput =
+                RelOptRule.convert(correlate.getInput(0), 
FlinkConventions.STREAM_PHYSICAL());
+        RelNode right = correlate.getInput(1);
+        return convertToCorrelate(
+                right, correlate, traitSet, convInput, Optional.empty(), 
Optional.empty());
+    }
+
+    public RelNode convertToCorrelate(
+            RelNode relNode,
+            FlinkLogicalCorrelate correlate,
+            RelTraitSet traitSet,
+            RelNode convInput,
+            Optional<List<RexNode>> projections,
+            Optional<RexNode> condition) {
+        if (relNode instanceof RelSubset) {
+            RelSubset rel = (RelSubset) relNode;
+            return convertToCorrelate(
+                    rel.getRelList().get(0),
+                    correlate,
+                    traitSet,
+                    convInput,
+                    projections,
+                    condition);
+        } else if (relNode instanceof FlinkLogicalCalc) {
+            FlinkLogicalCalc calc = (FlinkLogicalCalc) relNode;
+            RelNode tableScan = StreamPhysicalCorrelateRule.getTableScan(calc);
+            FlinkLogicalCalc newCalc = 
StreamPhysicalCorrelateRule.getMergedCalc(calc);
+            // The projections are not handled here or in the base version, so 
currently we match
+            // that functionality.
+            return convertToCorrelate(
+                    tableScan,
+                    correlate,
+                    traitSet,
+                    convInput,
+                    Optional.ofNullable(
+                            newCalc.getProgram().getProjectList() == null
+                                    ? null
+                                    : 
newCalc.getProgram().getProjectList().stream()
+                                            
.map(newCalc.getProgram()::expandLocalRef)
+                                            .collect(Collectors.toList())),
+                    Optional.ofNullable(
+                            newCalc.getProgram().getCondition() == null
+                                    ? null
+                                    : newCalc.getProgram()

Review Comment:
   Yes, correct.  It will throw an error.  This is tested in 
`AsyncCorrelateITCase`.



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BridgingFunctionGenUtil.scala:
##########
@@ -205,25 +206,36 @@ object BridgingFunctionGenUtil {
   private def generateAsyncTableFunctionCall(
       functionTerm: String,
       externalOperands: Seq[GeneratedExpression],
-      outputType: LogicalType): GeneratedExpression = {
+      returnType: LogicalType,
+      outputDataType: DataType): GeneratedExpression = {
+
+    val DELEGATE_ASYNC_TABLE = className[DelegatingAsyncTableResultFuture]
+    val outputType = outputDataType.getLogicalType
 
-    val DELEGATE = className[DelegatingResultFuture[_]]
+    // If we need to wrap data in a row, it's done in the delegating class.
+    val needsWrapping = !isCompositeType(outputType)
+    val isInternal = DataTypeUtils.isInternal(outputDataType);
+    val arguments = Seq(
+      s"""
+         |delegates.getCompletableFuture()
+         |""".stripMargin
+    ) ++ externalOperands.map(_.resultTerm)
+    val anyNull = externalOperands.map(_.nullTerm) ++ Seq("false")
 
     val functionCallCode =
       s"""
          |${externalOperands.map(_.code).mkString("\n")}
-         |if (${externalOperands.map(_.nullTerm).mkString(" || ")}) {
+         |if (${anyNull.mkString(" || ")}) {
          |  
$DEFAULT_COLLECTOR_TERM.complete(java.util.Collections.emptyList());

Review Comment:
   I think this comes from `FunctionCodeGenerator` where `collectorTerm` is 
used to mean any return variable.  I would have to change some lookup join 
code, and possibly other async functions as well.  I don't want to do too much 
of a refactoring here to keep things as simple as possible, if that works for 
you.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecAsyncCorrelate.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.common;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.codegen.AsyncCorrelateCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import 
org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.planner.plan.utils.AsyncUtil;
+import org.apache.flink.table.runtime.generated.GeneratedFunction;
+import 
org.apache.flink.table.runtime.operators.correlate.async.AsyncCorrelateRunner;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.apache.calcite.rex.RexCall;
+
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Base class for exec Async Correlate. */
+public class CommonExecAsyncCorrelate extends ExecNodeBase<RowData>
+        implements SingleTransformationTranslator<RowData> {
+
+    public static final String ASYNC_CORRELATE_TRANSFORMATION = 
"async-correlate";
+
+    public static final String FIELD_NAME_JOIN_TYPE = "joinType";
+    public static final String FIELD_NAME_FUNCTION_CALL = "functionCall";
+
+    @JsonProperty(FIELD_NAME_JOIN_TYPE)
+    private final FlinkJoinType joinType;
+
+    @JsonProperty(FIELD_NAME_FUNCTION_CALL)
+    private final RexCall invocation;
+
+    public CommonExecAsyncCorrelate(
+            int id,
+            ExecNodeContext context,
+            ReadableConfig persistedConfig,
+            FlinkJoinType joinType,
+            RexCall invocation,
+            List<InputProperty> inputProperties,
+            RowType outputType,
+            String description) {
+        super(id, context, persistedConfig, inputProperties, outputType, 
description);
+        checkArgument(inputProperties.size() == 1);
+        this.joinType = joinType;

Review Comment:
   I have a followup where I do use it.  It's a small fix, but didn't want to 
fold everything into one big PR.  Happy to remove it for now if you prefer.



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BridgingFunctionGenUtil.scala:
##########
@@ -205,25 +206,36 @@ object BridgingFunctionGenUtil {
   private def generateAsyncTableFunctionCall(
       functionTerm: String,
       externalOperands: Seq[GeneratedExpression],
-      outputType: LogicalType): GeneratedExpression = {
+      returnType: LogicalType,
+      outputDataType: DataType): GeneratedExpression = {
+
+    val DELEGATE_ASYNC_TABLE = className[DelegatingAsyncTableResultFuture]
+    val outputType = outputDataType.getLogicalType
 
-    val DELEGATE = className[DelegatingResultFuture[_]]
+    // If we need to wrap data in a row, it's done in the delegating class.
+    val needsWrapping = !isCompositeType(outputType)
+    val isInternal = DataTypeUtils.isInternal(outputDataType);
+    val arguments = Seq(
+      s"""
+         |delegates.getCompletableFuture()
+         |""".stripMargin
+    ) ++ externalOperands.map(_.resultTerm)
+    val anyNull = externalOperands.map(_.nullTerm) ++ Seq("false")

Review Comment:
   Without that, if there are no arguments, `${anyNull.mkString(" || ")}` is 
empty and the if statement below fails to compile.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalAsyncCorrelateRule.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.stream;
+
+import org.apache.flink.table.functions.FunctionKind;
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalAsyncCorrelate;
+import org.apache.flink.table.planner.plan.utils.AsyncUtil;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rex.RexNode;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * A physical rule for identifying logical correlates containing {@link
+ * org.apache.flink.table.functions.AsyncTableFunction} calls and converting 
them to physical {@link
+ * StreamPhysicalAsyncCorrelate} RelNodes.
+ */
+public class StreamPhysicalAsyncCorrelateRule extends ConverterRule {
+
+    public static final RelOptRule INSTANCE =
+            new StreamPhysicalAsyncCorrelateRule(
+                    Config.INSTANCE.withConversion(
+                            FlinkLogicalCorrelate.class,
+                            FlinkConventions.LOGICAL(),
+                            FlinkConventions.STREAM_PHYSICAL(),
+                            "StreamPhysicalAsyncCorrelateRule"));
+
+    protected StreamPhysicalAsyncCorrelateRule(Config config) {
+        super(config);
+    }
+
+    // find only calc and table function
+    private boolean findAsyncTableFunction(FlinkLogicalCalc calc) {
+        RelNode child = ((RelSubset) calc.getInput()).getOriginal();
+        if (child instanceof FlinkLogicalTableFunctionScan) {
+            FlinkLogicalTableFunctionScan scan = 
(FlinkLogicalTableFunctionScan) child;
+            return AsyncUtil.isAsyncCall(scan.getCall(), 
FunctionKind.ASYNC_TABLE);
+        } else if (child instanceof FlinkLogicalCalc) {
+            FlinkLogicalCalc childCalc = (FlinkLogicalCalc) child;
+            return findAsyncTableFunction(childCalc);
+        }
+        return false;
+    }
+
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+        FlinkLogicalCorrelate correlate = call.rel(0);
+        RelNode right = ((RelSubset) correlate.getRight()).getOriginal();
+        if (right instanceof FlinkLogicalTableFunctionScan) {
+            // right node is a table function
+            FlinkLogicalTableFunctionScan scan = 
(FlinkLogicalTableFunctionScan) right;
+            return AsyncUtil.isAsyncCall(scan.getCall(), 
FunctionKind.ASYNC_TABLE);
+        } else if (right instanceof FlinkLogicalCalc) {
+            // a filter is pushed above the table function
+            return findAsyncTableFunction((FlinkLogicalCalc) right);
+        }
+        return false;

Review Comment:
   You're right.  I didn't notice they were effectively the same block of code. 
 Done!



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/AsyncCorrelateITCase.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.table;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.AsyncSumScalarFunction;
+import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.AsyncTestTableFunction;
+import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.SumScalarFunction;
+import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
+import org.apache.flink.types.Row;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** IT Case tests for correlate queries using {@link AsyncTableFunction}. */
+public class AsyncCorrelateITCase extends StreamingTestBase {

Review Comment:
   I added tests for the concurrent operations, and number of retries.  The 
timeout and delay configurations are a bit harder to test with an IT case, and 
are probably better left to the `AsyncWaitOperator` tests, though open to 
suggestions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to