LadyForest commented on code in PR #23751:
URL: https://github.com/apache/flink/pull/23751#discussion_r1399931555


##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala:
##########
@@ -22,14 +22,16 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1}
 import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo}
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.{DataTypes, Types}

Review Comment:
   Nit: I think we can remove unused imports here



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala:
##########
@@ -63,7 +63,7 @@ class TableScanTest extends TableTestBase {
   @Test
   def testDDLWithComputedColumn(): Unit = {
     // Create table with field as atom expression.
-    util.tableEnv.registerFunction("my_udf", Func0)
+    util.tableEnv.createTemporarySystemFunction("my_udf", Func0)

Review Comment:
   Nit: I think `util.tableEnv.createTemporarySystemFunction` can be replaced 
to `util.addTemporarySystemFunction`



##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml:
##########
@@ -321,12 +321,12 @@ 
Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRIN
       <![CDATA[
 LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
 :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
-+- 
LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.PojoTableFunc*($2)],
 rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age)])
++- 
LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.PojoTableFunc*($2)],
 rowType=[RecordType(INTEGER age, VARCHAR(2147483647) name)])

Review Comment:
   I think the schema should remain unchanged



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/UserDefinedFunctionTestUtils.scala:
##########
@@ -275,21 +296,44 @@ object UserDefinedFunctionTestUtils {
   object MyPojoFunc extends ScalarFunction {
     def eval(s: MyPojo): Int = s.f2
 
-    override def getParameterTypes(signature: Array[Class[_]]): 
Array[TypeInformation[_]] =
-      Array(MyToPojoFunc.getResultType(signature))
+    override def getTypeInference(typeFactory: DataTypeFactory): TypeInference 
= {
+      TypeInference.newBuilder
+        .typedArguments(
+          DataTypes.STRUCTURED(
+            classOf[MyPojo],
+            DataTypes.FIELD("f1", DataTypes.INT()),
+            DataTypes.FIELD("f2", DataTypes.INT())))
+        .outputTypeStrategy((call: CallContext) => 
Optional.of(DataTypes.INT().notNull()))
+        .build
+    }
   }
 
   @SerialVersionUID(1L)
   object MyToPojoFunc extends ScalarFunction {
-    def eval(s: Int): MyPojo = new MyPojo(s, s)
 
-    override def getResultType(signature: Array[Class[_]]): 
PojoTypeInfo[MyPojo] = {
+    def eval(s: Int) = new MyPojo(s, s)
+
+    /*override def getResultType(signature: Array[Class[_]]): 
PojoTypeInfo[MyPojo] = {

Review Comment:
   Nit: I think we can remove `getResultType`.



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.scala:
##########
@@ -235,7 +235,7 @@ class DagOptimizationTest extends TableTestBase {
       
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
       Boolean.box(true))
     // test with non-deterministic udf
-    util.tableEnv.registerFunction("random_udf", new NonDeterministicUdf())
+    util.tableEnv.createTemporarySystemFunction("random_udf", new 
NonDeterministicUdf())

Review Comment:
   Nit: I noticed that `util.addTemporarySystemFunction` and 
`util.tableEnv.createTemporarySystemFunction` are interchangeably used. What do 
you think about unifying the calling method?



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/SumAggFunction.scala:
##########
@@ -20,7 +20,13 @@ package org.apache.flink.table.planner.utils
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.table.annotation.{DataTypeHint, FunctionHint}

Review Comment:
   Nit: the unused imports can be removed



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/CountAggFunction.scala:
##########
@@ -20,8 +20,11 @@ package org.apache.flink.table.planner.utils
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1}
 import org.apache.flink.api.java.typeutils.TupleTypeInfo
-import org.apache.flink.table.api.Types
+import org.apache.flink.table.annotation.DataTypeHint
+import org.apache.flink.table.api.{DataTypes, Types}

Review Comment:
   Nit: the unused imports can be removed



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CorrelateITCase.scala:
##########
@@ -23,8 +23,12 @@ import org.apache.flink.api.java.typeutils.{PojoField, 
PojoTypeInfo, RowTypeInfo
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.typeutils.Types
 import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.annotation.{DataTypeHint, FunctionHint, HintFlag}

Review Comment:
   Nit: remove unused imports



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/UserDefinedTableFunctions.scala:
##########
@@ -429,12 +431,24 @@ abstract class SplittableTableFunction[A, B] extends 
TableFunction[Tuple3[String
 
 @SerialVersionUID(1L)
 class PojoTableFunc extends TableFunction[PojoUser] {
-  def eval(user: String) {
+  def eval(user: String): Unit = {
     if (user.contains("#")) {
       val splits = user.split("#")
       collect(new PojoUser(splits(0), splits(1).toInt))
     }
   }
+
+  override def getTypeInference(typeFactory: DataTypeFactory): TypeInference = 
{
+    TypeInference.newBuilder
+      .typedArguments(DataTypes.STRING())
+      .outputTypeStrategy(
+        TypeStrategies.explicit(
+          DataTypes.STRUCTURED(
+            classOf[PojoUser],
+            DataTypes.FIELD("age", DataTypes.INT()),

Review Comment:
   I think the output field should be ["name", "age"] based on the `PojoUser` 
constructor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to