godfreyhe commented on a change in pull request #17118:
URL: https://github.com/apache/flink/pull/17118#discussion_r704247305



##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalJoin.scala
##########
@@ -21,16 +21,14 @@ package 
org.apache.flink.table.planner.plan.nodes.physical.common
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.planner.plan.nodes.exec.spec.JoinSpec
 import org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel
-import org.apache.flink.table.planner.plan.utils.JoinUtil
+import org.apache.flink.table.planner.plan.utils.{FlinkRexUtil, JoinUtil}
 import org.apache.flink.table.planner.plan.utils.PythonUtil.containsPythonCall
 import 
org.apache.flink.table.planner.plan.utils.RelExplainUtil.preferExpressionFormat
-

Review comment:
       please revert these changes and check your scala checkstyle setting in 
your idea

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
##########
@@ -119,4 +122,22 @@ public void apply(DynamicTableSource tableSource, 
SourceAbilityContext context)
                             tableSource.getClass().getName()));
         }
     }
+
+    @Override
+    public String getDigests(SourceAbilityContext context) {
+        final List<String> expressionStrs = new ArrayList<>();
+        final RowType souorceRowType = context.getSourceRowType();

Review comment:
       typo: souorce

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/PartitionPushDownSpec.java
##########
@@ -60,4 +60,12 @@ public void apply(DynamicTableSource tableSource, 
SourceAbilityContext context)
                             tableSource.getClass().getName()));
         }
     }
+
+    @Override
+    public String getDigests(SourceAbilityContext context) {
+        return "partitions=["
+                + String.join(
+                        ", ", 
this.partitions.stream().map(Object::toString).toArray(String[]::new))

Review comment:
       nit: we can use Collectors.joining() to simply this line

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.java
##########
@@ -183,4 +185,25 @@ public void onPeriodicEmit(WatermarkOutput output) {
             }
         }
     }
+
+    @Override
+    public String getDigests(SourceAbilityContext context) {

Review comment:
       nit: it's better to move this method close the `apply` method

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.java
##########
@@ -183,4 +185,25 @@ public void onPeriodicEmit(WatermarkOutput output) {
             }
         }
     }
+
+    @Override
+    public String getDigests(SourceAbilityContext context) {
+        final String expressionStr =
+                FlinkRexUtil.getExpressionString(
+                        watermarkExpr,
+                        JavaScalaConversionUtil.toScala(
+                                context.getSourceRowType().getFieldNames()));
+
+        //        final String expressionStr =
+        //                FlinkRexUtil.getExpressionString(
+        //                        watermarkExpr,
+        //
+        // 
JavaScalaConversionUtil.toScala(getProducedType().get().getFieldNames()));
+
+        if (idleTimeoutMillis == -1L) {

Review comment:
       idleTimeoutMillis > 0

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TableSourceTable.scala
##########
@@ -65,8 +68,28 @@ class TableSourceTable(
 
   override def getQualifiedName: util.List[String] = {
     val builder = ImmutableList.builder[String]()
-        .addAll(super.getQualifiedName)
-    extraDigests.foreach(builder.add)
+      .addAll(super.getQualifiedName)
+
+    if(abilitySpecs != null && abilitySpecs.length != 0){
+//      var newProducedType = catalogTable
+//        .getResolvedSchema
+//        .toSourceRowDataType
+//        .getLogicalType
+//        .asInstanceOf[RowType]

Review comment:
       remove the useless code

##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
##########
@@ -19,27 +19,27 @@
 package org.apache.flink.table.planner.plan.metadata
 
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo}
-import org.apache.flink.table.api.{DataTypes, TableException, TableSchema}
+import org.apache.flink.table.api.{DataTypes, TableConfig, TableException, 
TableSchema}
 import org.apache.flink.table.catalog.{CatalogTable, Column, ObjectIdentifier, 
ResolvedCatalogTable, ResolvedSchema, UniqueConstraint}
 import org.apache.flink.table.connector.ChangelogMode
 import org.apache.flink.table.connector.source.{DynamicTableSource, 
ScanTableSource}
 import org.apache.flink.table.plan.stats.{ColumnStats, TableStats}
-import org.apache.flink.table.planner.calcite.{FlinkTypeFactory, 
FlinkTypeSystem}
+import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkContextImpl, 
FlinkTypeFactory, FlinkTypeSystem}
 import org.apache.flink.table.planner.plan.schema.{FlinkPreparingTableBase, 
TableSourceTable}
 import org.apache.flink.table.planner.plan.stats.FlinkStatistic
 import 
org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
 import org.apache.flink.table.types.logical.{BigIntType, DoubleType, IntType, 
LocalZonedTimestampType, LogicalType, TimestampKind, TimestampType, VarCharType}
-
 import org.apache.calcite.config.CalciteConnectionConfig
 import org.apache.calcite.jdbc.CalciteSchema
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
 import org.apache.calcite.schema.Schema.TableType
 import org.apache.calcite.schema.{Schema, SchemaPlus, Table}
 import org.apache.calcite.sql.{SqlCall, SqlNode}
-
 import java.util
 import java.util.Collections
 
+import org.apache.flink.table.utils.CatalogManagerMocks

Review comment:
       nit: move this import close to other `org.apache.flink.table.xx`

##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
##########
@@ -284,7 +284,12 @@ object MetadataTestUtil {
       new TestTableSource(),
       true,
       new ResolvedCatalogTable(catalogTable, resolvedSchema),
-      Array("project=[a, c, d]"))
+      new FlinkContextImpl(

Review comment:
       create variable field to replace all `new FlinkContextImpl(...)`

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TableSourceTable.scala
##########
@@ -65,8 +68,28 @@ class TableSourceTable(
 
   override def getQualifiedName: util.List[String] = {
     val builder = ImmutableList.builder[String]()
-        .addAll(super.getQualifiedName)
-    extraDigests.foreach(builder.add)
+      .addAll(super.getQualifiedName)
+
+    if(abilitySpecs != null && abilitySpecs.length != 0){
+//      var newProducedType = catalogTable
+//        .getResolvedSchema
+//        .toSourceRowDataType
+//        .getLogicalType
+//        .asInstanceOf[RowType]
+
+      var newProducedType = DynamicSourceUtils.createProducedType(
+        catalogTable.getResolvedSchema,
+        tableSource)
+
+      for (spec <- abilitySpecs) {
+        val sourceAbilityContext = new SourceAbilityContext(flinkContext, 
newProducedType)
+
+        builder.add(spec.getDigests(sourceAbilityContext))
+        if (spec.getProducedType.isPresent) {
+          newProducedType = spec.getProducedType.get
+        }

Review comment:
       nit: these lines can be simplified as ` newProducedType = 
spec.getProducedType.orElse(newProducedType)`

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TableSourceTable.scala
##########
@@ -41,8 +44,8 @@ import java.util
  * @param tableSource The [[DynamicTableSource]] for which is converted to a 
Calcite Table
  * @param isStreamingMode A flag that tells if the current table is in stream 
mode
  * @param catalogTable Resolved catalog table where this table source table 
comes from
- * @param extraDigests The extra digests which will be added into 
`getQualifiedName`
- *                     as a part of table digest
+ * @param flinkContext The flink context
+ * @param abilitySpecs The abilitySpec applied to the source
  */

Review comment:
       abilitySpec -> abilitySpecs

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TableSourceTable.scala
##########
@@ -41,8 +44,8 @@ import java.util
  * @param tableSource The [[DynamicTableSource]] for which is converted to a 
Calcite Table
  * @param isStreamingMode A flag that tells if the current table is in stream 
mode
  * @param catalogTable Resolved catalog table where this table source table 
comes from
- * @param extraDigests The extra digests which will be added into 
`getQualifiedName`
- *                     as a part of table digest
+ * @param flinkContext The flink context

Review comment:
       it's better we can explain why we need flinkContext here

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TableSourceTable.scala
##########
@@ -20,15 +20,18 @@ package org.apache.flink.table.planner.plan.schema
 
 import org.apache.flink.table.catalog.{ObjectIdentifier, ResolvedCatalogTable}
 import org.apache.flink.table.connector.source.DynamicTableSource
-import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec
+import 
org.apache.flink.table.planner.plan.abilities.source.{SourceAbilityContext, 
SourceAbilitySpec}
 import org.apache.flink.table.planner.plan.stats.FlinkStatistic
-
 import com.google.common.collect.ImmutableList
 import org.apache.calcite.plan.RelOptSchema
 import org.apache.calcite.rel.`type`.RelDataType
-
 import java.util
 
+import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkTypeFactory}

Review comment:
       remove the useless imports

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.java
##########
@@ -183,4 +185,25 @@ public void onPeriodicEmit(WatermarkOutput output) {
             }
         }
     }
+
+    @Override
+    public String getDigests(SourceAbilityContext context) {
+        final String expressionStr =
+                FlinkRexUtil.getExpressionString(
+                        watermarkExpr,
+                        JavaScalaConversionUtil.toScala(
+                                context.getSourceRowType().getFieldNames()));
+
+        //        final String expressionStr =
+        //                FlinkRexUtil.getExpressionString(
+        //                        watermarkExpr,
+        //
+        // 
JavaScalaConversionUtil.toScala(getProducedType().get().getFieldNames()));

Review comment:
       remove these code




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to