LadyForest commented on code in PR #23751: URL: https://github.com/apache/flink/pull/23751#discussion_r1399931555
########## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala: ########## @@ -22,14 +22,16 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1} import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo} import org.apache.flink.api.scala._ -import org.apache.flink.table.api.Types +import org.apache.flink.table.api.{DataTypes, Types} Review Comment: Nit: I think we can remove unused imports here ########## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala: ########## @@ -63,7 +63,7 @@ class TableScanTest extends TableTestBase { @Test def testDDLWithComputedColumn(): Unit = { // Create table with field as atom expression. - util.tableEnv.registerFunction("my_udf", Func0) + util.tableEnv.createTemporarySystemFunction("my_udf", Func0) Review Comment: Nit: I think `util.tableEnv.createTemporarySystemFunction` can be replaced to `util.addTemporarySystemFunction` ########## flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml: ########## @@ -321,12 +321,12 @@ Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRIN <![CDATA[ LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) -+- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.PojoTableFunc*($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age)]) ++- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.PojoTableFunc*($2)], rowType=[RecordType(INTEGER age, VARCHAR(2147483647) name)]) Review Comment: I think the schema should remain unchanged ########## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/UserDefinedFunctionTestUtils.scala: ########## @@ -275,21 +296,44 @@ object UserDefinedFunctionTestUtils { object MyPojoFunc extends ScalarFunction { def eval(s: MyPojo): Int = s.f2 - override def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] = - Array(MyToPojoFunc.getResultType(signature)) + override def getTypeInference(typeFactory: DataTypeFactory): TypeInference = { + TypeInference.newBuilder + .typedArguments( + DataTypes.STRUCTURED( + classOf[MyPojo], + DataTypes.FIELD("f1", DataTypes.INT()), + DataTypes.FIELD("f2", DataTypes.INT()))) + .outputTypeStrategy((call: CallContext) => Optional.of(DataTypes.INT().notNull())) + .build + } } @SerialVersionUID(1L) object MyToPojoFunc extends ScalarFunction { - def eval(s: Int): MyPojo = new MyPojo(s, s) - override def getResultType(signature: Array[Class[_]]): PojoTypeInfo[MyPojo] = { + def eval(s: Int) = new MyPojo(s, s) + + /*override def getResultType(signature: Array[Class[_]]): PojoTypeInfo[MyPojo] = { Review Comment: Nit: I think we can remove `getResultType`. ########## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.scala: ########## @@ -235,7 +235,7 @@ class DagOptimizationTest extends TableTestBase { RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, Boolean.box(true)) // test with non-deterministic udf - util.tableEnv.registerFunction("random_udf", new NonDeterministicUdf()) + util.tableEnv.createTemporarySystemFunction("random_udf", new NonDeterministicUdf()) Review Comment: Nit: I noticed that `util.addTemporarySystemFunction` and `util.tableEnv.createTemporarySystemFunction` are interchangeably used. What do you think about unifying the calling method? ########## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/SumAggFunction.scala: ########## @@ -20,7 +20,13 @@ package org.apache.flink.table.planner.utils import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.table.annotation.{DataTypeHint, FunctionHint} Review Comment: Nit: the unused imports can be removed ########## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/CountAggFunction.scala: ########## @@ -20,8 +20,11 @@ package org.apache.flink.table.planner.utils import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1} import org.apache.flink.api.java.typeutils.TupleTypeInfo -import org.apache.flink.table.api.Types +import org.apache.flink.table.annotation.DataTypeHint +import org.apache.flink.table.api.{DataTypes, Types} Review Comment: Nit: the unused imports can be removed ########## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CorrelateITCase.scala: ########## @@ -23,8 +23,12 @@ import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo, RowTypeInfo import org.apache.flink.api.scala._ import org.apache.flink.api.scala.typeutils.Types import org.apache.flink.configuration.Configuration +import org.apache.flink.table.annotation.{DataTypeHint, FunctionHint, HintFlag} Review Comment: Nit: remove unused imports ########## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/UserDefinedTableFunctions.scala: ########## @@ -429,12 +431,24 @@ abstract class SplittableTableFunction[A, B] extends TableFunction[Tuple3[String @SerialVersionUID(1L) class PojoTableFunc extends TableFunction[PojoUser] { - def eval(user: String) { + def eval(user: String): Unit = { if (user.contains("#")) { val splits = user.split("#") collect(new PojoUser(splits(0), splits(1).toInt)) } } + + override def getTypeInference(typeFactory: DataTypeFactory): TypeInference = { + TypeInference.newBuilder + .typedArguments(DataTypes.STRING()) + .outputTypeStrategy( + TypeStrategies.explicit( + DataTypes.STRUCTURED( + classOf[PojoUser], + DataTypes.FIELD("age", DataTypes.INT()), Review Comment: I think the output field should be ["name", "age"] based on the `PojoUser` constructor -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org