twalthr commented on code in PR #23975:
URL: https://github.com/apache/flink/pull/23975#discussion_r1444587492


##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/RemoteCalcSplitRule.scala:
##########
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.plan.rules.logical
+
+import org.apache.flink.table.functions.ScalarFunction
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc
+import org.apache.flink.table.planner.plan.utils.{InputRefVisitor, 
RexDefaultVisitor}
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.rex.{RexBuilder, RexCall, RexCorrelVariable, 
RexFieldAccess, RexInputRef, RexLocalRef, RexNode, RexProgram}
+import org.apache.calcite.sql.validate.SqlValidatorUtil
+
+import java.util.function.Function
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/**
+ * Base rule that splits [[FlinkLogicalCalc]] into multiple 
[[FlinkLogicalCalc]]s. It is mainly to
+ * ensure that each [[FlinkLogicalCalc]] only contains Java/Scala 
[[ScalarFunction]]s or Python

Review Comment:
   Update comments in this class to be generic (i.e. not just Python)



##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.xml:
##########
@@ -0,0 +1,536 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+       <TestCase name="testSingleCall">
+               <Resource name="sql">
+                       <![CDATA[SELECT func1(a) FROM MyTable]]>
+               </Resource>
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(EXPR$0=[func1($0)])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c, d)]]])
+]]>
+               </Resource>
+               <Resource name="optimized rel plan">
+                       <![CDATA[
+AsyncCalc(select=[func1(a) AS EXPR$0])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
+]]>
+               </Resource>
+       </TestCase>
+       <TestCase name="testLiteralPlusTableSelect">
+               <Resource name="sql">
+                       <![CDATA[SELECT 'foo', func1(a) FROM MyTable]]>
+               </Resource>
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(EXPR$0=[_UTF-16LE'foo'], EXPR$1=[func1($0)])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c, d)]]])
+]]>
+               </Resource>
+               <Resource name="optimized rel plan">
+                       <![CDATA[
+Calc(select=['foo' AS EXPR$0, f0 AS EXPR$1])
++- AsyncCalc(select=[func1(a) AS f0])
+   +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
+]]>
+               </Resource>
+       </TestCase>
+       <TestCase name="testFieldPlusTableSelect">
+               <Resource name="sql">
+                       <![CDATA[SELECT a, func1(a) from MyTable]]>
+               </Resource>
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(a=[$0], EXPR$1=[func1($0)])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c, d)]]])
+]]>
+               </Resource>
+               <Resource name="optimized rel plan">
+                       <![CDATA[
+AsyncCalc(select=[a, func1(a) AS EXPR$1])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
+]]>
+               </Resource>
+       </TestCase>
+       <TestCase name="testTwoCalls">
+               <Resource name="sql">
+                       <![CDATA[SELECT func1(a), func1(a) from MyTable]]>
+               </Resource>
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(EXPR$0=[func1($0)], EXPR$1=[func1($0)])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c, d)]]])
+]]>
+               </Resource>
+               <Resource name="optimized rel plan">
+                       <![CDATA[
+AsyncCalc(select=[f0 AS EXPR$0, func1(a) AS EXPR$1])
++- AsyncCalc(select=[a, func1(a) AS f0])
+   +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
+]]>
+               </Resource>
+       </TestCase>
+       <TestCase name="testNestedCalls">
+               <Resource name="sql">
+                       <![CDATA[SELECT func1(func1(func1(a))) from MyTable]]>
+               </Resource>
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(EXPR$0=[func1(func1(func1($0)))])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c, d)]]])
+]]>
+               </Resource>
+               <Resource name="optimized rel plan">
+                       <![CDATA[
+AsyncCalc(select=[func1(f0) AS EXPR$0])
++- AsyncCalc(select=[func1(f0) AS f0])
+   +- AsyncCalc(select=[func1(a) AS f0])
+      +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
+]]>
+               </Resource>
+       </TestCase>
+       <TestCase name="testThreeNestedCalls">
+               <Resource name="sql">
+                       <![CDATA[SELECT func1(func1(a)), 
func1(func1(func1(a))), func1(a) from MyTable]]>

Review Comment:
   most tests just test a single field, can we test something like:
   ```
   SELECT func1(a), func2(b), func1(b), func2(a) from MyTable
   ```



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.java:
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.table.functions.AsyncScalarFunction;
+import 
org.apache.flink.table.planner.plan.optimize.program.BatchOptimizeContext;
+import 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram;
+import 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder;
+import 
org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE;
+import org.apache.flink.table.planner.plan.rules.FlinkStreamRuleSets;
+import org.apache.flink.table.planner.utils.TableTestBase;
+import org.apache.flink.table.planner.utils.TableTestUtil;
+import org.apache.flink.types.Row;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
+
+import org.apache.calcite.plan.hep.HepMatchOrder;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+/** Test for {@link AsyncCalcSplitRule}. */
+public class AsyncCalcSplitRuleTest extends TableTestBase {
+
+    private TableTestUtil util = streamTestUtil(TableConfig.getDefault());
+
+    @BeforeEach
+    public void setup() {
+        FlinkChainedProgram programs = new 
FlinkChainedProgram<BatchOptimizeContext>();
+        programs.addLast(
+                "logical_rewrite",
+                FlinkHepRuleSetProgramBuilder.newBuilder()
+                        
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
+                        .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+                        .add(FlinkStreamRuleSets.LOGICAL_REWRITE())
+                        .build());
+
+        util.addTableSource(

Review Comment:
   is there an alternative that doesn't use the ancient type system with 
TypeInformation?



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/AsyncCodeGeneratorTest.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.codegen;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.functions.AsyncScalarFunction;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.SqlToRexConverter;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.PlannerMocks;
+import org.apache.flink.table.runtime.generated.GeneratedFunction;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.CatalogManagerMocks;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link AsyncCodeGenerator}. */
+public class AsyncCodeGeneratorTest {
+
+    private static final RowType INPUT_TYPE =
+            RowType.of(new IntType(), new BigIntType(), new VarCharType());
+
+    private PlannerMocks plannerMocks;
+    private SqlToRexConverter converter;
+
+    private RelDataType tableRowType;
+
+    @BeforeEach
+    public void before() {
+        plannerMocks = PlannerMocks.create();
+        tableRowType =
+                plannerMocks
+                        .getPlannerContext()
+                        .getTypeFactory()
+                        .buildRelNodeRowType(
+                                
JavaScalaConversionUtil.toScala(ImmutableList.of("f1", "f2", "f3")),
+                                JavaScalaConversionUtil.toScala(
+                                        ImmutableList.of(
+                                                new IntType(),
+                                                new BigIntType(),
+                                                new VarCharType())));
+        converter =
+                plannerMocks
+                        .getPlanner()
+                        .createToRelContext()
+                        .getCluster()
+                        .getPlanner()
+                        .getContext()
+                        .unwrap(FlinkContext.class)
+                        .getRexFactory()
+                        .createSqlToRexConverter(tableRowType, null);
+
+        plannerMocks
+                .getFunctionCatalog()
+                .registerTemporaryCatalogFunction(
+                        UnresolvedIdentifier.of(
+                                CatalogManagerMocks.DEFAULT_CATALOG,
+                                CatalogManagerMocks.DEFAULT_DATABASE,
+                                "myFunc"),
+                        new AsyncFunc(),
+                        false);
+    }
+
+    @Test
+    public void testStringReturnType() throws Exception {
+        RowData rowData =
+                execute(
+                        "myFunc(f1, f2, f3)",
+                        RowType.of(new VarCharType()),
+                        GenericRowData.of(2, 3L, 
StringData.fromString("foo")));
+        
assertThat(rowData).isEqualTo(GenericRowData.of(StringData.fromString("complete 
foo 4 6")));
+    }
+
+    @Test
+    public void testTimestampReturnType() throws Exception {

Review Comment:
   We don't need tests for different types. This is ensured already through the 
data structure converter.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/AsyncCalcITCase.java:
##########
@@ -0,0 +1,561 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.table;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.functions.AsyncScalarFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT Case tests for {@link AsyncScalarFunction}. */
+public class AsyncCalcITCase extends AbstractTestBase {
+
+    private StreamExecutionEnvironment env;
+
+    private TableEnvironment tEnv;
+
+    @BeforeEach
+    public void before() throws Exception {
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.setMaxParallelism(1);
+        tEnv = StreamTableEnvironment.create(env, 
EnvironmentSettings.inStreamingMode());
+        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 1);
+        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_ASYNC_SCALAR_BUFFER_CAPACITY,
 1);
+        tEnv.getConfig()
+                .set(ExecutionConfigOptions.TABLE_EXEC_ASYNC_SCALAR_TIMEOUT, 
Duration.ofMinutes(1));
+    }
+
+    @Test
+    public void testSimpleTableSelect() {
+        Table t1 = tEnv.fromValues(1, 2, 3).as("f1");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryFunction("func", new AsyncFunc());
+        final List<Row> results = executeSql("select func(f1) from t1");
+        final List<Row> expectedRows =
+                ImmutableList.of(Row.of("val 1"), Row.of("val 2"), Row.of("val 
3"));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testLiteralPlusTableSelect() {
+        Table t1 = tEnv.fromValues(1, 2, 3).as("f1");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryFunction("func", new AsyncFunc());
+        final List<Row> results = executeSql("select 'foo', func(f1) from t1");
+        final List<Row> expectedRows =
+                ImmutableList.of(
+                        Row.of("foo", "val 1"), Row.of("foo", "val 2"), 
Row.of("foo", "val 3"));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testFieldPlusTableSelect() {
+        Table t1 = tEnv.fromValues(1, 2, 3).as("f1");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryFunction("func", new AsyncFunc());
+        final List<Row> results = executeSql("select f1, func(f1) from t1");
+        final List<Row> expectedRows =
+                ImmutableList.of(Row.of(1, "val 1"), Row.of(2, "val 2"), 
Row.of(3, "val 3"));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testTwoCalls() {
+        Table t1 = tEnv.fromValues(1, 2, 3).as("f1");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryFunction("func", new AsyncFunc());
+        final List<Row> results = executeSql("select func(f1), func(f1) from 
t1");
+        final List<Row> expectedRows =
+                ImmutableList.of(
+                        Row.of("val 1", "val 1"),
+                        Row.of("val 2", "val 2"),
+                        Row.of("val 3", "val 3"));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testNestedCalls() {
+        Table t1 = tEnv.fromValues(1, 2, 3).as("f1");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryFunction("func", new AsyncFuncAdd10());
+        final List<Row> results = executeSql("select func(func(func(f1))) from 
t1");
+        final List<Row> expectedRows = ImmutableList.of(Row.of(31), 
Row.of(32), Row.of(33));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testThreeNestedCalls() {
+        Table t1 = tEnv.fromValues(1, 2, 3).as("f1");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryFunction("func", new AsyncFuncAdd10());
+        final List<Row> results =
+                executeSql("select func(func(f1)), func(func(func(f1))), 
func(f1) from t1");
+        final List<Row> expectedRows =
+                ImmutableList.of(Row.of(21, 31, 11), Row.of(22, 32, 12), 
Row.of(23, 33, 13));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testPassedToOtherUDF() {
+        Table t1 = tEnv.fromValues(1, 2, 3).as("f1");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryFunction("func", new AsyncFunc());
+        final List<Row> results = executeSql("select Concat(func(f1), 'foo') 
from t1");
+        final List<Row> expectedRows =
+                ImmutableList.of(Row.of("val 1foo"), Row.of("val 2foo"), 
Row.of("val 3foo"));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testJustCall() {
+        Table t1 = tEnv.fromValues(1, 2, 3).as("f1");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryFunction("func", new AsyncFunc());
+        final List<Row> results = executeSql("select func(1)");
+        final List<Row> expectedRows = ImmutableList.of(Row.of("val 1"));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testWhereCondition() {
+        Table t1 = tEnv.fromValues(1, 2, 3).as("f1");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryFunction("func", new AsyncFunc());
+        final List<Row> results =
+                executeSql("select f1 from t1 where REGEXP(func(f1), 'val 
(2|3)')");
+        final List<Row> expectedRows = ImmutableList.of(Row.of(2), Row.of(3));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testWhereConditionAndProjection() {
+        Table t1 = tEnv.fromValues(1, 2, 3).as("f1");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryFunction("func", new AsyncFunc());
+        final List<Row> results =
+                executeSql("select func(f1) from t1 where REGEXP(func(f1), 
'val (2|3)')");
+        final List<Row> expectedRows = ImmutableList.of(Row.of("val 2"), 
Row.of("val 3"));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testWhereConditionWithInts() {
+        Table t1 = tEnv.fromValues(1, 2, 3).as("f1");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryFunction("func", new AsyncFuncAdd10());
+        final List<Row> results = executeSql("select f1 from t1 where func(f1) 
>= 12");
+        final List<Row> expectedRows = ImmutableList.of(Row.of(2), Row.of(3));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testAggregate() {
+        Table t1 = tEnv.fromValues(1, 2, 3, 1, 3, 4).as("f1");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryFunction("func", new AsyncFuncAdd10Long());
+        final List<Row> results = executeSql("select f1, func(count(*)) from 
t1 group by f1");
+        final List<Row> expectedRows =
+                ImmutableList.of(
+                        Row.of(1, 11L),
+                        Row.of(2, 11L),
+                        Row.of(3, 11L),
+                        Row.ofKind(RowKind.UPDATE_BEFORE, 1, 11L),
+                        Row.ofKind(RowKind.UPDATE_AFTER, 1, 12L),
+                        Row.ofKind(RowKind.UPDATE_BEFORE, 3, 11L),
+                        Row.ofKind(RowKind.UPDATE_AFTER, 3, 12L),
+                        Row.of(4, 11L));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testSelectCallWithIntArray() {
+        Table t1 = tEnv.fromValues(new int[] {1, 2}, new int[] {3, 
4}).as("f1");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryFunction("func", new AsyncFuncAdd10IntArray());
+        final List<Row> results = executeSql("select func(f1) from t1");
+        final List<Row> expectedRows =
+                ImmutableList.of(
+                        Row.of(new Object[] {new Integer[] {11, 12}}),
+                        Row.of(new Object[] {new Integer[] {13, 14}}));
+        // When run here, the plan is a union of the two AsyncCalcs so order is
+        // not retained!
+        assertThat(results).containsExactlyInAnyOrderElementsOf(expectedRows);
+    }
+
+    @Test
+    public void testInnerJoinWithFuncInOn() {
+        Table t1 = tEnv.fromValues(1, 2, 3, 4).as("f1");
+        Table t2 = tEnv.fromValues(2, 4).as("f2");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryView("t2", t2);
+        tEnv.createTemporaryFunction("func", new AsyncFunc());
+        final List<Row> results =
+                executeSql(
+                        "select f1 from t1 INNER JOIN t2 ON func(f1) = 
func(f2) AND REGEXP(func(f1), 'val (2|4)')");
+        final List<Row> expectedRows = ImmutableList.of(Row.of(2), Row.of(4));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testInnerJoinWithFuncProjection() {
+        Table t1 = tEnv.fromValues(1, 2, 3, 4).as("f1");
+        Table t2 = tEnv.fromValues(2, 4).as("f2");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryView("t2", t2);
+        tEnv.createTemporaryFunction("func", new AsyncFunc());
+        final List<Row> results = executeSql("select func(f1) from t1 INNER 
JOIN t2 ON f1 = f2");
+        final List<Row> expectedRows = ImmutableList.of(Row.of("val 2"), 
Row.of("val 4"));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testInnerJoinWithFuncInWhere() {
+        Table t1 = tEnv.fromValues(1, 2, 3, 4).as("f1");
+        Table t2 = tEnv.fromValues(2, 4).as("f2");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryView("t2", t2);
+        tEnv.createTemporaryFunction("func", new AsyncFunc());
+        final List<Row> results =
+                executeSql(
+                        "select f1 from t1 INNER JOIN t2 ON f1 = f2 WHERE 
REGEXP(func(f1), 'val (2|3)')");
+        final List<Row> expectedRows = ImmutableList.of(Row.of(2));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testLeftJoinWithFuncInOn() {
+        Table t1 = tEnv.fromValues(1, 2, 3, 4).as("f1");
+        Table t2 = tEnv.fromValues(2, 4).as("f2");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryView("t2", t2);
+        tEnv.createTemporaryFunction("func", new AsyncFunc());
+        final List<Row> results =
+                executeSql("select f1, f2 from t1 LEFT JOIN t2 ON func(f1) = 
func(f2)");
+        final List<Row> expectedRows =
+                ImmutableList.of(Row.of(1, null), Row.of(2, 2), Row.of(3, 
null), Row.of(4, 4));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testLeftJoinWithFuncInWhere() {
+        Table t1 = tEnv.fromValues(1, 2, 3, 4).as("f1");
+        Table t2 = tEnv.fromValues(2, 4).as("f2");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryView("t2", t2);
+        tEnv.createTemporaryFunction("func", new AsyncFunc());
+        final List<Row> results =
+                executeSql(
+                        "select f1, f2 from t1 LEFT JOIN t2 ON f1 = f2 WHERE 
REGEXP(func(f1), 'val (2|3)')");
+        final List<Row> expectedRows = ImmutableList.of(Row.of(2, 2), 
Row.of(3, null));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testRightJoinWithFuncInOn() {
+        Table t1 = tEnv.fromValues(1, 2, 3, 4).as("f1");
+        Table t2 = tEnv.fromValues(2, 4).as("f2");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryView("t2", t2);
+        tEnv.createTemporaryFunction("func", new AsyncFunc());
+        final List<Row> results =
+                executeSql("select f1, f2 from t1 FULL OUTER JOIN t2 ON 
func(f1) = func(f2)");
+        assertThat(results).hasSize(8);
+    }
+
+    @Test
+    public void testSelectWithConfigs() {
+        Table t1 = tEnv.fromValues(1, 2, 3).as("f1");
+        tEnv.getConfig()
+                
.set(ExecutionConfigOptions.TABLE_EXEC_ASYNC_SCALAR_BUFFER_CAPACITY.key(), 
"10");
+        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_ASYNC_SCALAR_TIMEOUT.key(),
 "1m");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryFunction("func", new AsyncFunc());
+        final List<Row> results = executeSql("select func(f1) from t1");
+        final List<Row> expectedRows =
+                ImmutableList.of(Row.of("val 1"), Row.of("val 2"), Row.of("val 
3"));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testProjectCallInSubquery() {
+        Table t1 = tEnv.fromValues(1, 2, 3).as("f1");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryFunction("func", new AsyncFunc());
+        final List<Row> results =
+                executeSql(
+                        "select blah FROM (select func(f1) as blah from t1) "
+                                + "WHERE REGEXP(blah, 'val (2|3)')");
+        final List<Row> expectedRows = ImmutableList.of(Row.of("val 2"), 
Row.of("val 3"));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testWhereConditionCallInSubquery() {
+        Table t1 = tEnv.fromValues(1, 2, 3).as("f1");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryFunction("func", new AsyncFunc());
+        final List<Row> results =
+                executeSql(
+                        "select blah FROM (select f1 as blah from t1 "
+                                + "WHERE REGEXP(func(f1), 'val (2|3)'))");
+        final List<Row> expectedRows = ImmutableList.of(Row.of(2), Row.of(3));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testWhereNotInSubquery() {

Review Comment:
   I counted 29 tests here. Let's reduce this significantly to at most 15 tests 
if not less.



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/calc/async/DelegatingAsyncResultFuture.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.calc.async;
+
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+/**
+ * Inspired by {@link 
org.apache.flink.table.runtime.operators.join.lookup.DelegatingResultFuture}
+ * for {@link org.apache.flink.table.functions.AsyncScalarFunction}.
+ */
+public class DelegatingAsyncResultFuture implements BiConsumer<Object, 
Throwable> {
+
+    private final ResultFuture<Object> delegatedResultFuture;
+    private final List<Object> synchronousResults = new ArrayList<>();
+    private Function<Object, RowData> outputFactory;
+    private CompletableFuture<Object> future;
+    private CompletableFuture<Object> convertedFuture;
+
+    public DelegatingAsyncResultFuture(ResultFuture<Object> 
delegatedResultFuture) {
+        this.delegatedResultFuture = delegatedResultFuture;
+    }
+
+    public synchronized void addSynchronousResult(Object object) {
+        synchronousResults.add(object);
+    }
+
+    public synchronized Object getSynchronousResult(int index) {
+        return synchronousResults.get(index);
+    }
+
+    public synchronized void setOutputFactory(Function<Object, RowData> 
outputFactory) {

Review Comment:
   Aren't there too many synchronzied methods in this class? Ideally, only 
access to `DelegatingResultFuture` need to be guarded, no?



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/AsyncCodeGeneratorTest.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.codegen;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.functions.AsyncScalarFunction;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.SqlToRexConverter;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.PlannerMocks;
+import org.apache.flink.table.runtime.generated.GeneratedFunction;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.CatalogManagerMocks;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link AsyncCodeGenerator}. */
+public class AsyncCodeGeneratorTest {
+
+    private static final RowType INPUT_TYPE =
+            RowType.of(new IntType(), new BigIntType(), new VarCharType());
+
+    private PlannerMocks plannerMocks;
+    private SqlToRexConverter converter;
+
+    private RelDataType tableRowType;
+
+    @BeforeEach
+    public void before() {
+        plannerMocks = PlannerMocks.create();
+        tableRowType =
+                plannerMocks
+                        .getPlannerContext()
+                        .getTypeFactory()
+                        .buildRelNodeRowType(
+                                
JavaScalaConversionUtil.toScala(ImmutableList.of("f1", "f2", "f3")),
+                                JavaScalaConversionUtil.toScala(
+                                        ImmutableList.of(
+                                                new IntType(),
+                                                new BigIntType(),
+                                                new VarCharType())));
+        converter =
+                plannerMocks
+                        .getPlanner()
+                        .createToRelContext()
+                        .getCluster()
+                        .getPlanner()
+                        .getContext()
+                        .unwrap(FlinkContext.class)
+                        .getRexFactory()
+                        .createSqlToRexConverter(tableRowType, null);
+
+        plannerMocks
+                .getFunctionCatalog()
+                .registerTemporaryCatalogFunction(

Review Comment:
   can be a system function



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PythonCalcSplitRule.scala:
##########
@@ -293,222 +65,39 @@ object PythonCalcSplitPandasInProjectionRule
   }
 }
 
-/**
- * Rule that expands the RexFieldAccess inputs of Python functions contained 
in the projection of
- * [[FlinkLogicalCalc]]s.
- */
-object PythonCalcExpandProjectRule extends 
PythonCalcSplitRuleBase("PythonCalcExpandProjectRule") {
-
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val calc: FlinkLogicalCalc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
-    val projects = 
calc.getProgram.getProjectList.map(calc.getProgram.expandLocalRef)
-
-    projects.exists(containsPythonCall(_)) && 
projects.exists(containsFieldAccessInputs)
+class PythonRemoteCalcCallFinder extends RemoteCalcCallFinder {
+  override def containsRemoteCall(node: RexNode): Boolean = {
+    PythonUtil.containsPythonCall(node)
   }
 
-  override def needConvert(program: RexProgram, node: RexNode): Boolean =
-    node.isInstanceOf[RexFieldAccess]
-
-  override def split(
-      program: RexProgram,
-      splitter: ScalarFunctionSplitter): (Option[RexNode], Option[RexNode], 
Seq[RexNode]) = {
-    (
-      Option(program.getCondition).map(program.expandLocalRef),
-      None,
-      program.getProjectList.map(program.expandLocalRef(_).accept(splitter)))
-  }
-
-  private def containsFieldAccessInputs(node: RexNode): Boolean = {
-    node match {
-      case call: RexCall => call.getOperands.exists(containsFieldAccessInputs)
-      case _: RexFieldAccess => true
-      case _ => false
-    }
+  override def containsNonRemoteCall(node: RexNode): Boolean = {
+    PythonUtil.containsNonPythonCall(node)
   }
-}
-
-/**
- * Rule that pushes the condition of [[FlinkLogicalCalc]]s before it for the 
[[FlinkLogicalCalc]]s
- * which contain Python functions in the projection.
- */
-object PythonCalcPushConditionRule extends 
PythonCalcSplitRuleBase("PythonCalcPushConditionRule") {
-
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val calc: FlinkLogicalCalc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
-    val projects = 
calc.getProgram.getProjectList.map(calc.getProgram.expandLocalRef)
 
-    // matches if all the following conditions hold true:
-    // 1) the condition is not null
-    // 2) it contains Python functions in the projection
-    calc.getProgram.getCondition != null && 
projects.exists(containsPythonCall(_))
+  override def isRemoteCall(node: RexNode): Boolean = {
+    PythonUtil.isPythonCall(node)
   }
 
-  override def needConvert(program: RexProgram, node: RexNode): Boolean = 
isNonPythonCall(node)
-
-  override def split(
-      program: RexProgram,
-      splitter: ScalarFunctionSplitter): (Option[RexNode], Option[RexNode], 
Seq[RexNode]) = {
-    (
-      Option(program.getCondition).map(program.expandLocalRef),
-      None,
-      program.getProjectList.map(program.expandLocalRef))
+  override def isNonRemoteCall(node: RexNode): Boolean = {
+    PythonUtil.isNonPythonCall(node)
   }
 }
 
-/**
- * Rule that ensures that it only contains [[RexInputRef]]s at the beginning 
of the project list and
- * [[RexCall]]s at the end of the project list for [[FlinkLogicalCalc]]s which 
contain Python
- * functions in the projection. This rule exists to keep DataStreamPythonCalc 
as simple as possible
- * and ensures that it only needs to handle the Python function execution.
- */
-object PythonCalcRewriteProjectionRule
-  extends PythonCalcSplitRuleBase("PythonCalcRewriteProjectionRule") {
-
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val calc: FlinkLogicalCalc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
-    val projects = 
calc.getProgram.getProjectList.map(calc.getProgram.expandLocalRef)
-
-    // matches if all the following conditions hold true:
-    // 1) it contains Python functions in the projection
-    // 2) it contains RexNodes besides RexInputRef and RexCall or
-    //    not all the RexCalls lying at the end of the project list
-    projects.exists(containsPythonCall(_)) &&
-    (projects.exists(expr => !expr.isInstanceOf[RexCall] && 
!expr.isInstanceOf[RexInputRef]) ||
-      projects.indexWhere(_.isInstanceOf[RexCall]) <
-      projects.lastIndexWhere(_.isInstanceOf[RexInputRef]))
-  }
-
-  override def needConvert(program: RexProgram, node: RexNode): Boolean = 
isPythonCall(node)
-
-  override def split(
-      program: RexProgram,
-      splitter: ScalarFunctionSplitter): (Option[RexNode], Option[RexNode], 
Seq[RexNode]) = {
-    (None, None, 
program.getProjectList.map(program.expandLocalRef(_).accept(splitter)))
-  }
-}
-
-private class ScalarFunctionSplitter(
-    program: RexProgram,
-    rexBuilder: RexBuilder,
-    extractedFunctionOffset: Int,
-    extractedRexNodes: mutable.ArrayBuffer[RexNode],
-    needConvert: Function[RexNode, Boolean])
-  extends RexDefaultVisitor[RexNode] {
-
-  private var fieldsRexCall: Map[Int, Int] = Map[Int, Int]()
-
-  override def visitCall(call: RexCall): RexNode = {
-    if (needConvert(call)) {
-      getExtractedRexNode(call)
-    } else {
-      call.clone(call.getType, call.getOperands.asScala.map(_.accept(this)))
-    }
-  }
-
-  override def visitFieldAccess(fieldAccess: RexFieldAccess): RexNode = {
-    if (needConvert(fieldAccess)) {
-      val expr = fieldAccess.getReferenceExpr
-      expr match {
-        case localRef: RexLocalRef if 
containsPythonCall(program.expandLocalRef(localRef)) =>
-          getExtractedRexFieldAccess(fieldAccess, localRef.getIndex)
-        case _: RexCorrelVariable =>
-          val field = fieldAccess.getField
-          new RexInputRef(field.getIndex, field.getType)
-        case _ =>
-          val newFieldAccess =
-            rexBuilder.makeFieldAccess(expr.accept(this), 
fieldAccess.getField.getIndex)
-          getExtractedRexNode(newFieldAccess)
-      }
-    } else {
-      fieldAccess
-    }
-  }
-
-  override def visitLocalRef(localRef: RexLocalRef): RexNode = {
-    program.getExprList.get(localRef.getIndex).accept(this)
-  }
-
-  override def visitNode(rexNode: RexNode): RexNode = rexNode
-
-  private def getExtractedRexNode(node: RexNode): RexNode = {
-    val newNode = new RexInputRef(extractedFunctionOffset + 
extractedRexNodes.length, node.getType)
-    extractedRexNodes.append(node)
-    newNode
-  }
-
-  private def getExtractedRexFieldAccess(node: RexFieldAccess, rexCallIndex: 
Int): RexNode = {
-    val pythonCall: RexCall =
-      
program.expandLocalRef(node.getReferenceExpr.asInstanceOf[RexLocalRef]).asInstanceOf[RexCall]
-    if (!fieldsRexCall.contains(rexCallIndex)) {
-      extractedRexNodes.append(pythonCall)
-      fieldsRexCall += rexCallIndex -> (extractedFunctionOffset + 
extractedRexNodes.length - 1)
-    }
-    rexBuilder.makeFieldAccess(
-      new RexInputRef(fieldsRexCall(rexCallIndex), pythonCall.getType),
-      node.getField.getIndex)
-  }
-}
-
-/**
- * Rewrite field accesses of a RexNode as not all the fields from the original 
input are forwarded:
- * 1) Fields of index greater than or equal to extractedFunctionOffset refer 
to the extracted
- * function. 2) Fields of index less than extractedFunctionOffset refer to the 
original input field.
- *
- * @param rexBuilder
- *   the RexBuilder
- * @param extractedFunctionOffset
- *   the original start offset of the extracted functions
- * @param accessedFields
- *   the accessed fields which will be forwarded
- */
-private class ExtractedFunctionInputRewriter(
-    rexBuilder: RexBuilder,
-    extractedFunctionOffset: Int,
-    accessedFields: Array[Int])
-  extends RexDefaultVisitor[RexNode] {
-
-  /** old input fields ref index -> new input fields ref index mappings */
-  private val fieldMap: Map[Int, Int] = accessedFields.zipWithIndex.toMap
-
-  override def visitInputRef(inputRef: RexInputRef): RexNode = {
-    if (inputRef.getIndex >= extractedFunctionOffset) {
-      new RexInputRef(
-        inputRef.getIndex - extractedFunctionOffset + accessedFields.length,
-        inputRef.getType)
-    } else {
-      new RexInputRef(
-        fieldMap.getOrElse(
-          inputRef.getIndex,
-          throw new IllegalArgumentException("input field contains invalid 
index")),
-        inputRef.getType)
-    }
-  }
-
-  override def visitCall(call: RexCall): RexNode = {
-    call.clone(call.getType, call.getOperands.asScala.map(_.accept(this)))
-  }
-
-  override def visitFieldAccess(fieldAccess: RexFieldAccess): RexNode = {
-    rexBuilder.makeFieldAccess(
-      fieldAccess.getReferenceExpr.accept(this),
-      fieldAccess.getField.getIndex)
-  }
-
-  override def visitNode(rexNode: RexNode): RexNode = rexNode
-}
-
 object PythonCalcSplitRule {
 
   /**
    * These rules should be applied sequentially in the order of 
SPLIT_CONDITION, SPLIT_PROJECT,
    * SPLIT_PANDAS_IN_PROJECT, EXPAND_PROJECT, PUSH_CONDITION and 
REWRITE_PROJECT.
    */
-  val SPLIT_CONDITION: RelOptRule = PythonCalcSplitConditionRule
-  val SPLIT_PROJECT: RelOptRule = PythonCalcSplitProjectionRule
-  val SPLIT_PANDAS_IN_PROJECT: RelOptRule = 
PythonCalcSplitPandasInProjectionRule
-  val SPLIT_PROJECTION_REX_FIELD: RelOptRule = 
PythonCalcSplitProjectionRexFieldRule
-  val SPLIT_CONDITION_REX_FIELD: RelOptRule = 
PythonCalcSplitConditionRexFieldRule
-  val EXPAND_PROJECT: RelOptRule = PythonCalcExpandProjectRule
-  val PUSH_CONDITION: RelOptRule = PythonCalcPushConditionRule
-  val REWRITE_PROJECT: RelOptRule = PythonCalcRewriteProjectionRule
+  private val callFinder = new PythonRemoteCalcCallFinder()
+  val SPLIT_CONDITION: RelOptRule = new 
RemoteCalcSplitConditionRule(callFinder) {}

Review Comment:
   why `{}` at the end?  RemoteCalcSplitConditionRule could also be non 
abstract?



##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.xml:
##########
@@ -0,0 +1,536 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+       <TestCase name="testSingleCall">
+               <Resource name="sql">
+                       <![CDATA[SELECT func1(a) FROM MyTable]]>
+               </Resource>
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(EXPR$0=[func1($0)])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c, d)]]])
+]]>
+               </Resource>
+               <Resource name="optimized rel plan">
+                       <![CDATA[
+AsyncCalc(select=[func1(a) AS EXPR$0])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
+]]>
+               </Resource>
+       </TestCase>
+       <TestCase name="testLiteralPlusTableSelect">
+               <Resource name="sql">
+                       <![CDATA[SELECT 'foo', func1(a) FROM MyTable]]>
+               </Resource>
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(EXPR$0=[_UTF-16LE'foo'], EXPR$1=[func1($0)])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c, d)]]])
+]]>
+               </Resource>
+               <Resource name="optimized rel plan">
+                       <![CDATA[
+Calc(select=['foo' AS EXPR$0, f0 AS EXPR$1])
++- AsyncCalc(select=[func1(a) AS f0])
+   +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
+]]>
+               </Resource>
+       </TestCase>
+       <TestCase name="testFieldPlusTableSelect">
+               <Resource name="sql">
+                       <![CDATA[SELECT a, func1(a) from MyTable]]>
+               </Resource>
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(a=[$0], EXPR$1=[func1($0)])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c, d)]]])
+]]>
+               </Resource>
+               <Resource name="optimized rel plan">
+                       <![CDATA[
+AsyncCalc(select=[a, func1(a) AS EXPR$1])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
+]]>
+               </Resource>
+       </TestCase>
+       <TestCase name="testTwoCalls">
+               <Resource name="sql">
+                       <![CDATA[SELECT func1(a), func1(a) from MyTable]]>
+               </Resource>
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(EXPR$0=[func1($0)], EXPR$1=[func1($0)])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c, d)]]])
+]]>
+               </Resource>
+               <Resource name="optimized rel plan">
+                       <![CDATA[
+AsyncCalc(select=[f0 AS EXPR$0, func1(a) AS EXPR$1])

Review Comment:
   As a followup we should discuss whether to run multiple async calls as part 
of one Async operator. We don't leverage the full potential of async calls if 
in the end single calls get evaluated sequentially. I suggest to open a 
followup ticket under the FLIP Jira ticket. This PR should link to a subtask 
under the FLIP ticket instead.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/AsyncCalcITCase.java:
##########
@@ -0,0 +1,561 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.table;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.functions.AsyncScalarFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT Case tests for {@link AsyncScalarFunction}. */
+public class AsyncCalcITCase extends AbstractTestBase {
+
+    private StreamExecutionEnvironment env;
+
+    private TableEnvironment tEnv;
+
+    @BeforeEach
+    public void before() throws Exception {
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.setMaxParallelism(1);
+        tEnv = StreamTableEnvironment.create(env, 
EnvironmentSettings.inStreamingMode());
+        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 1);
+        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_ASYNC_SCALAR_BUFFER_CAPACITY,
 1);
+        tEnv.getConfig()
+                .set(ExecutionConfigOptions.TABLE_EXEC_ASYNC_SCALAR_TIMEOUT, 
Duration.ofMinutes(1));
+    }
+
+    @Test
+    public void testSimpleTableSelect() {
+        Table t1 = tEnv.fromValues(1, 2, 3).as("f1");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryFunction("func", new AsyncFunc());

Review Comment:
   can be a system function



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/AsyncCodeGeneratorTest.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.codegen;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.functions.AsyncScalarFunction;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.SqlToRexConverter;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.PlannerMocks;
+import org.apache.flink.table.runtime.generated.GeneratedFunction;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.CatalogManagerMocks;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link AsyncCodeGenerator}. */
+public class AsyncCodeGeneratorTest {
+
+    private static final RowType INPUT_TYPE =
+            RowType.of(new IntType(), new BigIntType(), new VarCharType());
+
+    private PlannerMocks plannerMocks;
+    private SqlToRexConverter converter;
+
+    private RelDataType tableRowType;
+
+    @BeforeEach
+    public void before() {
+        plannerMocks = PlannerMocks.create();
+        tableRowType =
+                plannerMocks
+                        .getPlannerContext()
+                        .getTypeFactory()
+                        .buildRelNodeRowType(
+                                
JavaScalaConversionUtil.toScala(ImmutableList.of("f1", "f2", "f3")),
+                                JavaScalaConversionUtil.toScala(
+                                        ImmutableList.of(
+                                                new IntType(),
+                                                new BigIntType(),
+                                                new VarCharType())));
+        converter =
+                plannerMocks
+                        .getPlanner()
+                        .createToRelContext()
+                        .getCluster()
+                        .getPlanner()
+                        .getContext()
+                        .unwrap(FlinkContext.class)
+                        .getRexFactory()
+                        .createSqlToRexConverter(tableRowType, null);
+
+        plannerMocks
+                .getFunctionCatalog()
+                .registerTemporaryCatalogFunction(
+                        UnresolvedIdentifier.of(
+                                CatalogManagerMocks.DEFAULT_CATALOG,
+                                CatalogManagerMocks.DEFAULT_DATABASE,
+                                "myFunc"),
+                        new AsyncFunc(),
+                        false);
+    }
+
+    @Test
+    public void testStringReturnType() throws Exception {
+        RowData rowData =
+                execute(
+                        "myFunc(f1, f2, f3)",
+                        RowType.of(new VarCharType()),
+                        GenericRowData.of(2, 3L, 
StringData.fromString("foo")));
+        
assertThat(rowData).isEqualTo(GenericRowData.of(StringData.fromString("complete 
foo 4 6")));
+    }
+
+    @Test
+    public void testTimestampReturnType() throws Exception {
+        RowData rowData =
+                execute(
+                        "myFunc(f2)",
+                        RowType.of(new TimestampType()),
+                        GenericRowData.of(2, 3L, 
StringData.fromString("foo")));
+        
assertThat(rowData).isEqualTo(GenericRowData.of(TimestampData.fromEpochMillis(3)));
+    }
+
+    @Test
+    public void testDecimalReturnType() throws Exception {
+        RowData rowData =
+                execute(
+                        "myFunc(f1)",
+                        RowType.of(new DecimalType()),
+                        GenericRowData.of(2, 3L, 
StringData.fromString("foo")));
+        assertThat(rowData)
+                .isEqualTo(
+                        GenericRowData.of(
+                                
DecimalData.fromBigDecimal(BigDecimal.valueOf(1024), 12, 3)));
+    }
+
+    @Test
+    public void testTwoReturnTypes_passThroughFirst() throws Exception {
+        RowData rowData =
+                execute(
+                        ImmutableList.of("f2", "myFunc(f1, f2, f3)"),
+                        RowType.of(new VarCharType(), new BigIntType()),
+                        GenericRowData.of(2, 3L, 
StringData.fromString("foo")));
+        assertThat(rowData)
+                .isEqualTo(GenericRowData.of(3L, 
StringData.fromString("complete foo 4 6")));
+    }
+
+    @Test
+    public void testTwoReturnTypes_passThroughSecond() throws Exception {
+        RowData rowData =
+                execute(
+                        ImmutableList.of("myFunc(f1, f2, f3)", "f2"),
+                        RowType.of(new VarCharType(), new BigIntType()),
+                        GenericRowData.of(2, 3L, 
StringData.fromString("foo")));
+        assertThat(rowData)
+                .isEqualTo(GenericRowData.of(StringData.fromString("complete 
foo 4 6"), 3L));
+    }
+
+    private RowData execute(String sqlExpression, RowType resultType, RowData 
input)
+            throws Exception {
+        return execute(ImmutableList.of(sqlExpression), resultType, input);
+    }
+
+    private RowData execute(List<String> sqlExpressions, RowType resultType, 
RowData input)
+            throws Exception {
+        List<RexNode> nodes =
+                sqlExpressions.stream()
+                        .map(sql -> converter.convertToRexNode(sql))
+                        .collect(Collectors.toList());
+        GeneratedFunction<AsyncFunction<RowData, RowData>> function =
+                AsyncCodeGenerator.generateFunction(
+                        "name",
+                        INPUT_TYPE,
+                        resultType,
+                        GenericRowData.class,
+                        nodes,
+                        true,
+                        new Configuration(),
+                        Thread.currentThread().getContextClassLoader());
+        AsyncFunction<RowData, RowData> asyncFunction =
+                
function.newInstance(Thread.currentThread().getContextClassLoader());
+        TestResultFuture resultFuture = new TestResultFuture();
+        asyncFunction.asyncInvoke(input, resultFuture);
+        Collection<RowData> result = resultFuture.getResult().get();
+        assertThat(result).hasSize(1);
+        return result.iterator().next();
+    }
+
+    /** Test function. */
+    public static final class AsyncFunc extends AsyncScalarFunction {
+        public void eval(CompletableFuture<String> f, Integer i, Long l, 
String s) {
+            f.complete("complete " + s + " " + (i * i) + " " + (2 * l));
+        }
+
+        public void eval(CompletableFuture<Instant> f, Long l) {
+            f.complete(Instant.ofEpochMilli(l));
+        }
+
+        public void eval(
+                @DataTypeHint("DECIMAL(12, 3)") CompletableFuture<BigDecimal> 
f, Integer i) {
+            f.complete(BigDecimal.valueOf(i).pow(10));
+        }
+    }
+
+    /** Test result future. */
+    public static final class TestResultFuture implements 
ResultFuture<RowData> {
+
+        CompletableFuture<Collection<RowData>> data = new 
CompletableFuture<>();
+
+        @Override
+        public void complete(Collection<RowData> result) {
+            data.complete(result);
+        }
+
+        @Override
+        public void completeExceptionally(Throwable error) {

Review Comment:
   is there a test for an error?



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/AsyncCalcITCase.java:
##########
@@ -0,0 +1,561 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.table;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.functions.AsyncScalarFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT Case tests for {@link AsyncScalarFunction}. */
+public class AsyncCalcITCase extends AbstractTestBase {
+
+    private StreamExecutionEnvironment env;
+
+    private TableEnvironment tEnv;
+
+    @BeforeEach
+    public void before() throws Exception {
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.setMaxParallelism(1);
+        tEnv = StreamTableEnvironment.create(env, 
EnvironmentSettings.inStreamingMode());
+        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 1);
+        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_ASYNC_SCALAR_BUFFER_CAPACITY,
 1);
+        tEnv.getConfig()
+                .set(ExecutionConfigOptions.TABLE_EXEC_ASYNC_SCALAR_TIMEOUT, 
Duration.ofMinutes(1));
+    }
+
+    @Test
+    public void testSimpleTableSelect() {
+        Table t1 = tEnv.fromValues(1, 2, 3).as("f1");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryFunction("func", new AsyncFunc());
+        final List<Row> results = executeSql("select func(f1) from t1");
+        final List<Row> expectedRows =
+                ImmutableList.of(Row.of("val 1"), Row.of("val 2"), Row.of("val 
3"));

Review Comment:
   use Arrays.asList instead of deps to Guava



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/AsyncCalcITCase.java:
##########
@@ -0,0 +1,561 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.table;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.functions.AsyncScalarFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT Case tests for {@link AsyncScalarFunction}. */
+public class AsyncCalcITCase extends AbstractTestBase {
+
+    private StreamExecutionEnvironment env;
+
+    private TableEnvironment tEnv;
+
+    @BeforeEach
+    public void before() throws Exception {
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);

Review Comment:
   Don't we wanna test a higher parallelism?



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/AsyncCalcITCase.java:
##########
@@ -0,0 +1,561 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.table;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.functions.AsyncScalarFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT Case tests for {@link AsyncScalarFunction}. */
+public class AsyncCalcITCase extends AbstractTestBase {
+
+    private StreamExecutionEnvironment env;
+
+    private TableEnvironment tEnv;
+
+    @BeforeEach
+    public void before() throws Exception {
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.setMaxParallelism(1);
+        tEnv = StreamTableEnvironment.create(env, 
EnvironmentSettings.inStreamingMode());
+        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 1);
+        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_ASYNC_SCALAR_BUFFER_CAPACITY,
 1);
+        tEnv.getConfig()
+                .set(ExecutionConfigOptions.TABLE_EXEC_ASYNC_SCALAR_TIMEOUT, 
Duration.ofMinutes(1));
+    }
+
+    @Test
+    public void testSimpleTableSelect() {
+        Table t1 = tEnv.fromValues(1, 2, 3).as("f1");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryFunction("func", new AsyncFunc());
+        final List<Row> results = executeSql("select func(f1) from t1");
+        final List<Row> expectedRows =
+                ImmutableList.of(Row.of("val 1"), Row.of("val 2"), Row.of("val 
3"));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testLiteralPlusTableSelect() {
+        Table t1 = tEnv.fromValues(1, 2, 3).as("f1");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryFunction("func", new AsyncFunc());
+        final List<Row> results = executeSql("select 'foo', func(f1) from t1");
+        final List<Row> expectedRows =
+                ImmutableList.of(
+                        Row.of("foo", "val 1"), Row.of("foo", "val 2"), 
Row.of("foo", "val 3"));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testFieldPlusTableSelect() {
+        Table t1 = tEnv.fromValues(1, 2, 3).as("f1");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryFunction("func", new AsyncFunc());
+        final List<Row> results = executeSql("select f1, func(f1) from t1");
+        final List<Row> expectedRows =
+                ImmutableList.of(Row.of(1, "val 1"), Row.of(2, "val 2"), 
Row.of(3, "val 3"));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testTwoCalls() {
+        Table t1 = tEnv.fromValues(1, 2, 3).as("f1");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryFunction("func", new AsyncFunc());
+        final List<Row> results = executeSql("select func(f1), func(f1) from 
t1");
+        final List<Row> expectedRows =
+                ImmutableList.of(
+                        Row.of("val 1", "val 1"),
+                        Row.of("val 2", "val 2"),
+                        Row.of("val 3", "val 3"));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testNestedCalls() {
+        Table t1 = tEnv.fromValues(1, 2, 3).as("f1");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryFunction("func", new AsyncFuncAdd10());
+        final List<Row> results = executeSql("select func(func(func(f1))) from 
t1");
+        final List<Row> expectedRows = ImmutableList.of(Row.of(31), 
Row.of(32), Row.of(33));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testThreeNestedCalls() {
+        Table t1 = tEnv.fromValues(1, 2, 3).as("f1");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryFunction("func", new AsyncFuncAdd10());
+        final List<Row> results =
+                executeSql("select func(func(f1)), func(func(func(f1))), 
func(f1) from t1");
+        final List<Row> expectedRows =
+                ImmutableList.of(Row.of(21, 31, 11), Row.of(22, 32, 12), 
Row.of(23, 33, 13));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testPassedToOtherUDF() {
+        Table t1 = tEnv.fromValues(1, 2, 3).as("f1");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryFunction("func", new AsyncFunc());
+        final List<Row> results = executeSql("select Concat(func(f1), 'foo') 
from t1");
+        final List<Row> expectedRows =
+                ImmutableList.of(Row.of("val 1foo"), Row.of("val 2foo"), 
Row.of("val 3foo"));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testJustCall() {
+        Table t1 = tEnv.fromValues(1, 2, 3).as("f1");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryFunction("func", new AsyncFunc());
+        final List<Row> results = executeSql("select func(1)");
+        final List<Row> expectedRows = ImmutableList.of(Row.of("val 1"));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testWhereCondition() {
+        Table t1 = tEnv.fromValues(1, 2, 3).as("f1");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryFunction("func", new AsyncFunc());
+        final List<Row> results =
+                executeSql("select f1 from t1 where REGEXP(func(f1), 'val 
(2|3)')");
+        final List<Row> expectedRows = ImmutableList.of(Row.of(2), Row.of(3));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testWhereConditionAndProjection() {
+        Table t1 = tEnv.fromValues(1, 2, 3).as("f1");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryFunction("func", new AsyncFunc());
+        final List<Row> results =
+                executeSql("select func(f1) from t1 where REGEXP(func(f1), 
'val (2|3)')");
+        final List<Row> expectedRows = ImmutableList.of(Row.of("val 2"), 
Row.of("val 3"));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testWhereConditionWithInts() {
+        Table t1 = tEnv.fromValues(1, 2, 3).as("f1");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryFunction("func", new AsyncFuncAdd10());
+        final List<Row> results = executeSql("select f1 from t1 where func(f1) 
>= 12");
+        final List<Row> expectedRows = ImmutableList.of(Row.of(2), Row.of(3));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testAggregate() {
+        Table t1 = tEnv.fromValues(1, 2, 3, 1, 3, 4).as("f1");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryFunction("func", new AsyncFuncAdd10Long());
+        final List<Row> results = executeSql("select f1, func(count(*)) from 
t1 group by f1");
+        final List<Row> expectedRows =
+                ImmutableList.of(
+                        Row.of(1, 11L),
+                        Row.of(2, 11L),
+                        Row.of(3, 11L),
+                        Row.ofKind(RowKind.UPDATE_BEFORE, 1, 11L),
+                        Row.ofKind(RowKind.UPDATE_AFTER, 1, 12L),
+                        Row.ofKind(RowKind.UPDATE_BEFORE, 3, 11L),
+                        Row.ofKind(RowKind.UPDATE_AFTER, 3, 12L),
+                        Row.of(4, 11L));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testSelectCallWithIntArray() {
+        Table t1 = tEnv.fromValues(new int[] {1, 2}, new int[] {3, 
4}).as("f1");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryFunction("func", new AsyncFuncAdd10IntArray());
+        final List<Row> results = executeSql("select func(f1) from t1");
+        final List<Row> expectedRows =
+                ImmutableList.of(
+                        Row.of(new Object[] {new Integer[] {11, 12}}),
+                        Row.of(new Object[] {new Integer[] {13, 14}}));
+        // When run here, the plan is a union of the two AsyncCalcs so order is
+        // not retained!
+        assertThat(results).containsExactlyInAnyOrderElementsOf(expectedRows);
+    }
+
+    @Test
+    public void testInnerJoinWithFuncInOn() {
+        Table t1 = tEnv.fromValues(1, 2, 3, 4).as("f1");
+        Table t2 = tEnv.fromValues(2, 4).as("f2");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryView("t2", t2);
+        tEnv.createTemporaryFunction("func", new AsyncFunc());
+        final List<Row> results =
+                executeSql(
+                        "select f1 from t1 INNER JOIN t2 ON func(f1) = 
func(f2) AND REGEXP(func(f1), 'val (2|4)')");
+        final List<Row> expectedRows = ImmutableList.of(Row.of(2), Row.of(4));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testInnerJoinWithFuncProjection() {
+        Table t1 = tEnv.fromValues(1, 2, 3, 4).as("f1");
+        Table t2 = tEnv.fromValues(2, 4).as("f2");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryView("t2", t2);
+        tEnv.createTemporaryFunction("func", new AsyncFunc());
+        final List<Row> results = executeSql("select func(f1) from t1 INNER 
JOIN t2 ON f1 = f2");
+        final List<Row> expectedRows = ImmutableList.of(Row.of("val 2"), 
Row.of("val 4"));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testInnerJoinWithFuncInWhere() {
+        Table t1 = tEnv.fromValues(1, 2, 3, 4).as("f1");
+        Table t2 = tEnv.fromValues(2, 4).as("f2");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryView("t2", t2);
+        tEnv.createTemporaryFunction("func", new AsyncFunc());
+        final List<Row> results =
+                executeSql(
+                        "select f1 from t1 INNER JOIN t2 ON f1 = f2 WHERE 
REGEXP(func(f1), 'val (2|3)')");
+        final List<Row> expectedRows = ImmutableList.of(Row.of(2));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testLeftJoinWithFuncInOn() {
+        Table t1 = tEnv.fromValues(1, 2, 3, 4).as("f1");
+        Table t2 = tEnv.fromValues(2, 4).as("f2");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryView("t2", t2);
+        tEnv.createTemporaryFunction("func", new AsyncFunc());
+        final List<Row> results =
+                executeSql("select f1, f2 from t1 LEFT JOIN t2 ON func(f1) = 
func(f2)");
+        final List<Row> expectedRows =
+                ImmutableList.of(Row.of(1, null), Row.of(2, 2), Row.of(3, 
null), Row.of(4, 4));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testLeftJoinWithFuncInWhere() {
+        Table t1 = tEnv.fromValues(1, 2, 3, 4).as("f1");
+        Table t2 = tEnv.fromValues(2, 4).as("f2");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryView("t2", t2);
+        tEnv.createTemporaryFunction("func", new AsyncFunc());
+        final List<Row> results =
+                executeSql(
+                        "select f1, f2 from t1 LEFT JOIN t2 ON f1 = f2 WHERE 
REGEXP(func(f1), 'val (2|3)')");
+        final List<Row> expectedRows = ImmutableList.of(Row.of(2, 2), 
Row.of(3, null));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testRightJoinWithFuncInOn() {
+        Table t1 = tEnv.fromValues(1, 2, 3, 4).as("f1");
+        Table t2 = tEnv.fromValues(2, 4).as("f2");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryView("t2", t2);
+        tEnv.createTemporaryFunction("func", new AsyncFunc());
+        final List<Row> results =
+                executeSql("select f1, f2 from t1 FULL OUTER JOIN t2 ON 
func(f1) = func(f2)");
+        assertThat(results).hasSize(8);
+    }
+
+    @Test
+    public void testSelectWithConfigs() {
+        Table t1 = tEnv.fromValues(1, 2, 3).as("f1");
+        tEnv.getConfig()
+                
.set(ExecutionConfigOptions.TABLE_EXEC_ASYNC_SCALAR_BUFFER_CAPACITY.key(), 
"10");
+        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_ASYNC_SCALAR_TIMEOUT.key(),
 "1m");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryFunction("func", new AsyncFunc());
+        final List<Row> results = executeSql("select func(f1) from t1");
+        final List<Row> expectedRows =
+                ImmutableList.of(Row.of("val 1"), Row.of("val 2"), Row.of("val 
3"));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testProjectCallInSubquery() {
+        Table t1 = tEnv.fromValues(1, 2, 3).as("f1");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryFunction("func", new AsyncFunc());
+        final List<Row> results =
+                executeSql(
+                        "select blah FROM (select func(f1) as blah from t1) "
+                                + "WHERE REGEXP(blah, 'val (2|3)')");
+        final List<Row> expectedRows = ImmutableList.of(Row.of("val 2"), 
Row.of("val 3"));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testWhereConditionCallInSubquery() {
+        Table t1 = tEnv.fromValues(1, 2, 3).as("f1");
+        tEnv.createTemporaryView("t1", t1);
+        tEnv.createTemporaryFunction("func", new AsyncFunc());
+        final List<Row> results =
+                executeSql(
+                        "select blah FROM (select f1 as blah from t1 "
+                                + "WHERE REGEXP(func(f1), 'val (2|3)'))");
+        final List<Row> expectedRows = ImmutableList.of(Row.of(2), Row.of(3));
+        assertThat(results).containsSequence(expectedRows);
+    }
+
+    @Test
+    public void testWhereNotInSubquery() {

Review Comment:
   I feel there are too many tests in this ITCase. We need to watch out for the 
overall Flink build time. Only test what you implemented. I don't think that a 
subquery directly affects the code that you have written?
   Having 1 or two complex tests is fine. But subquery tests, join tests should 
test the optimizer and those should be covered by the plan tests already.
   
   If plan tests and code gen tests work correctly, you actually only need 2-3 
ITCase tests in the end.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to