lihaosky commented on code in PR #26567:
URL: https://github.com/apache/flink/pull/26567#discussion_r2103359338


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java:
##########
@@ -443,6 +443,49 @@ public class ExecutionConfigOptions {
                             "The max number of async retry attempts to make 
before task "
                                     + "execution is failed.");
 
+    // ------------------------------------------------------------------------
+    //  Async Table Function
+    // ------------------------------------------------------------------------
+    @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+    public static final ConfigOption<Integer> 
TABLE_EXEC_ASYNC_TABLE_BUFFER_CAPACITY =
+            key("table.exec.async-table.buffer-capacity")
+                    .intType()
+                    .defaultValue(10)
+                    .withDescription(
+                            "The max number of async i/o operations that the 
async table function can trigger.");
+
+    @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+    public static final ConfigOption<Duration> TABLE_EXEC_ASYNC_TABLE_TIMEOUT =
+            key("table.exec.async-table.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofMinutes(3))
+                    .withDescription(
+                            "The async timeout for the asynchronous operation 
to complete.");

Review Comment:
   Is this the timeout for each retry or total timeout for all retries? I 
suppose it's for each retry



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java:
##########
@@ -373,4 +374,28 @@ public static void validateLambdaType(Class<?> baseClass, 
Type t) {
                             + "Otherwise the type has to be specified 
explicitly using type information.");
         }
     }
+
+    /**
+     * Will return true if the type of the given generic class type.
+     *
+     * @param clazz The generic class to check against
+     * @param type The type to be checked
+     */
+    public static boolean isGenericOfClass(Class<?> clazz, Type type) {
+        Optional<ParameterizedType> parameterized = getParameterizedType(type);
+        return clazz.equals(type)
+                || parameterized.isPresent() && 
clazz.equals(parameterized.get().getRawType());
+    }
+
+    /**
+     * Returns an optional of a ParameterizedType, if that's what the type is.
+     *
+     * @param type The type to check
+     * @return optional which is present if the type is a ParameterizedType
+     */
+    public static Optional<ParameterizedType> getParameterizedType(Type type) {

Review Comment:
   Add unit test for these two function?



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCorrelateSplitRuleTest.java:
##########
@@ -82,4 +90,31 @@ public void testCorrelateIndirectOtherWay() {
         String sqlQuery = "select * FROM MyTable, LATERAL 
TABLE(tableFunc(func1(ABS(a))))";
         util.verifyRelPlan(sqlQuery);
     }
+
+    @Test
+    public void testCorrelateWithSystem() {
+        String sqlQuery = "select * FROM MyTable, LATERAL 
TABLE(asyncTableFunc(ABS(a)))";
+        util.verifyRelPlan(sqlQuery);
+    }
+
+    @Test
+    public void testCorrelateWithScalar() {
+        String sqlQuery = "select * FROM MyTable, LATERAL 
TABLE(asyncTableFunc(scalar(a)))";
+        util.verifyRelPlan(sqlQuery);
+    }
+

Review Comment:
   Can you add 
   ```
       @Test
       public void testCorrelateWithCast() {
           String sqlQuery =
                   "select * FROM MyTable, LATERAL 
TABLE(asyncTableFunc(cast(cast(a as int) as int)))";
           util.verifyRelPlan(sqlQuery);
       }
   ```
   
   There was a fix I made internally around cast



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRule.java:
##########
@@ -71,24 +72,35 @@ public class AsyncCalcSplitRule {
      * org.apache.flink.table.functions.AsyncScalarFunction}.

Review Comment:
   as well as `AsyncTableFunction`?



##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java:
##########
@@ -373,4 +374,28 @@ public static void validateLambdaType(Class<?> baseClass, 
Type t) {
                             + "Otherwise the type has to be specified 
explicitly using type information.");
         }
     }
+
+    /**
+     * Will return true if the type of the given generic class type.

Review Comment:
   ```suggestion
        * Will return true if the type of the given generic class type matches 
clazz.
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecAsyncCorrelate.java:
##########
@@ -0,0 +1,93 @@
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecAsyncCorrelate;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.types.logical.RowType;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecAsyncCorrelate.ASYNC_CORRELATE_TRANSFORMATION;
+
+/**
+ * Stream {@link ExecNode} which matches along with join a Java/Scala user 
defined table function.
+ */
+@ExecNodeMetadata(
+        name = "stream-exec-async-correlate",
+        version = 1,
+        producedTransformations = ASYNC_CORRELATE_TRANSFORMATION,
+        minPlanVersion = FlinkVersion.v1_19,
+        minStateVersion = FlinkVersion.v1_19)

Review Comment:
   Not exist yet in 1.19?



##########
flink-table/flink-table-common/src/test/java/org/apache/flink/table/functions/UserDefinedFunctionHelperTest.java:
##########
@@ -295,32 +327,69 @@ public static class ValidAsyncScalarFunction extends 
AsyncScalarFunction {
         public void eval(CompletableFuture<Integer> future, int i) {}
     }
 
+    /** Valid table function. */
+    public static class ValidAsyncTableFunction extends 
AsyncTableFunction<Integer> {
+        public void eval(CompletableFuture<Collection<Integer>> future, int i) 
{}
+    }
+
     private static class PrivateAsyncScalarFunction extends 
AsyncScalarFunction {
         public void eval(CompletableFuture<Integer> future, int i) {}
     }
 
+    private static class PrivateAsyncTableFunction extends 
AsyncTableFunction<Integer> {
+        public void eval(CompletableFuture<Collection<Integer>> future, int i) 
{}
+    }
+
     /** No implementation method. */
     public static class MissingImplementationAsyncScalarFunction extends 
AsyncScalarFunction {
         // nothing to do
     }
 
+    /** No implementation method. */
+    public static class MissingImplementationAsyncTableFunction
+            extends AsyncTableFunction<Integer> {
+        // nothing to do
+    }
+
     /** Implementation method is private. */
     public static class PrivateMethodAsyncScalarFunction extends 
AsyncScalarFunction {
         private void eval(CompletableFuture<Integer> future, int i) {}
     }
 
+    /** Implementation method is private. */
+    public static class PrivateMethodAsyncTableFunction extends 
AsyncTableFunction<Integer> {
+        private void eval(CompletableFuture<Collection<Integer>> future, int 
i) {}
+    }
+
     /** Implementation method isn't void. */
     public static class NonVoidAsyncScalarFunction extends AsyncScalarFunction 
{
         public String eval(CompletableFuture<Integer> future, int i) {
             return "";
         }
     }
 
+    /** Implementation method isn't void. */
+    public static class NonVoidAsyncTableFunction extends AsyncScalarFunction {
+        public String eval(CompletableFuture<Collection<Integer>> future, int 
i) {
+            return "";
+        }
+    }
+
     /** Implementation method isn't void. */
     public static class NoFutureAsyncScalarFunction extends 
AsyncScalarFunction {
         public void eval(int i) {}
     }
 
+    /** Implementation method isn't void. */

Review Comment:
   first param isn't `CompletableFuture<Collection>`?



##########
flink-table/flink-table-common/src/test/java/org/apache/flink/table/functions/UserDefinedFunctionHelperTest.java:
##########
@@ -295,32 +327,69 @@ public static class ValidAsyncScalarFunction extends 
AsyncScalarFunction {
         public void eval(CompletableFuture<Integer> future, int i) {}
     }
 
+    /** Valid table function. */
+    public static class ValidAsyncTableFunction extends 
AsyncTableFunction<Integer> {
+        public void eval(CompletableFuture<Collection<Integer>> future, int i) 
{}
+    }
+
     private static class PrivateAsyncScalarFunction extends 
AsyncScalarFunction {
         public void eval(CompletableFuture<Integer> future, int i) {}
     }
 
+    private static class PrivateAsyncTableFunction extends 
AsyncTableFunction<Integer> {
+        public void eval(CompletableFuture<Collection<Integer>> future, int i) 
{}
+    }
+
     /** No implementation method. */
     public static class MissingImplementationAsyncScalarFunction extends 
AsyncScalarFunction {
         // nothing to do
     }
 
+    /** No implementation method. */
+    public static class MissingImplementationAsyncTableFunction
+            extends AsyncTableFunction<Integer> {
+        // nothing to do
+    }
+
     /** Implementation method is private. */
     public static class PrivateMethodAsyncScalarFunction extends 
AsyncScalarFunction {
         private void eval(CompletableFuture<Integer> future, int i) {}
     }
 
+    /** Implementation method is private. */
+    public static class PrivateMethodAsyncTableFunction extends 
AsyncTableFunction<Integer> {
+        private void eval(CompletableFuture<Collection<Integer>> future, int 
i) {}
+    }
+
     /** Implementation method isn't void. */
     public static class NonVoidAsyncScalarFunction extends AsyncScalarFunction 
{
         public String eval(CompletableFuture<Integer> future, int i) {
             return "";
         }
     }
 
+    /** Implementation method isn't void. */
+    public static class NonVoidAsyncTableFunction extends AsyncScalarFunction {
+        public String eval(CompletableFuture<Collection<Integer>> future, int 
i) {
+            return "";
+        }
+    }
+
     /** Implementation method isn't void. */
     public static class NoFutureAsyncScalarFunction extends 
AsyncScalarFunction {
         public void eval(int i) {}
     }
 
+    /** Implementation method isn't void. */
+    public static class NoFutureAsyncTableFunction extends 
AsyncTableFunction<Integer> {
+        public void eval(int i) {}
+    }
+
+    /** Implementation method isn't void. */

Review Comment:
   ditto



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRule.java:
##########
@@ -71,24 +72,35 @@ public class AsyncCalcSplitRule {
      * org.apache.flink.table.functions.AsyncScalarFunction}.
      */
     public static class AsyncRemoteCalcCallFinder implements 
RemoteCalcCallFinder {
+
+        private final FunctionKind functionKind;
+
+        public AsyncRemoteCalcCallFinder() {
+            this(FunctionKind.ASYNC_SCALAR);
+        }

Review Comment:
   nit: Not sure if we want to default to scalar function given both are 
supported now. Requiring functionKind is probably fine



##########
docs/layouts/shortcodes/generated/sink_configuration.html:
##########
@@ -0,0 +1,18 @@
+<table class="configuration table table-bordered">

Review Comment:
   This is unrelated the async table function?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to