godfreyhe commented on a change in pull request #17118: URL: https://github.com/apache/flink/pull/17118#discussion_r704247305
########## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalJoin.scala ########## @@ -21,16 +21,14 @@ package org.apache.flink.table.planner.plan.nodes.physical.common import org.apache.flink.table.api.TableException import org.apache.flink.table.planner.plan.nodes.exec.spec.JoinSpec import org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel -import org.apache.flink.table.planner.plan.utils.JoinUtil +import org.apache.flink.table.planner.plan.utils.{FlinkRexUtil, JoinUtil} import org.apache.flink.table.planner.plan.utils.PythonUtil.containsPythonCall import org.apache.flink.table.planner.plan.utils.RelExplainUtil.preferExpressionFormat - Review comment: please revert these changes and check your scala checkstyle setting in your idea ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java ########## @@ -119,4 +122,22 @@ public void apply(DynamicTableSource tableSource, SourceAbilityContext context) tableSource.getClass().getName())); } } + + @Override + public String getDigests(SourceAbilityContext context) { + final List<String> expressionStrs = new ArrayList<>(); + final RowType souorceRowType = context.getSourceRowType(); Review comment: typo: souorce ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/PartitionPushDownSpec.java ########## @@ -60,4 +60,12 @@ public void apply(DynamicTableSource tableSource, SourceAbilityContext context) tableSource.getClass().getName())); } } + + @Override + public String getDigests(SourceAbilityContext context) { + return "partitions=[" + + String.join( + ", ", this.partitions.stream().map(Object::toString).toArray(String[]::new)) Review comment: nit: we can use Collectors.joining() to simply this line ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.java ########## @@ -183,4 +185,25 @@ public void onPeriodicEmit(WatermarkOutput output) { } } } + + @Override + public String getDigests(SourceAbilityContext context) { Review comment: nit: it's better to move this method close the `apply` method ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.java ########## @@ -183,4 +185,25 @@ public void onPeriodicEmit(WatermarkOutput output) { } } } + + @Override + public String getDigests(SourceAbilityContext context) { + final String expressionStr = + FlinkRexUtil.getExpressionString( + watermarkExpr, + JavaScalaConversionUtil.toScala( + context.getSourceRowType().getFieldNames())); + + // final String expressionStr = + // FlinkRexUtil.getExpressionString( + // watermarkExpr, + // + // JavaScalaConversionUtil.toScala(getProducedType().get().getFieldNames())); + + if (idleTimeoutMillis == -1L) { Review comment: idleTimeoutMillis > 0 ########## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TableSourceTable.scala ########## @@ -65,8 +68,28 @@ class TableSourceTable( override def getQualifiedName: util.List[String] = { val builder = ImmutableList.builder[String]() - .addAll(super.getQualifiedName) - extraDigests.foreach(builder.add) + .addAll(super.getQualifiedName) + + if(abilitySpecs != null && abilitySpecs.length != 0){ +// var newProducedType = catalogTable +// .getResolvedSchema +// .toSourceRowDataType +// .getLogicalType +// .asInstanceOf[RowType] Review comment: remove the useless code ########## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala ########## @@ -19,27 +19,27 @@ package org.apache.flink.table.planner.plan.metadata import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo} -import org.apache.flink.table.api.{DataTypes, TableException, TableSchema} +import org.apache.flink.table.api.{DataTypes, TableConfig, TableException, TableSchema} import org.apache.flink.table.catalog.{CatalogTable, Column, ObjectIdentifier, ResolvedCatalogTable, ResolvedSchema, UniqueConstraint} import org.apache.flink.table.connector.ChangelogMode import org.apache.flink.table.connector.source.{DynamicTableSource, ScanTableSource} import org.apache.flink.table.plan.stats.{ColumnStats, TableStats} -import org.apache.flink.table.planner.calcite.{FlinkTypeFactory, FlinkTypeSystem} +import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkContextImpl, FlinkTypeFactory, FlinkTypeSystem} import org.apache.flink.table.planner.plan.schema.{FlinkPreparingTableBase, TableSourceTable} import org.apache.flink.table.planner.plan.stats.FlinkStatistic import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType import org.apache.flink.table.types.logical.{BigIntType, DoubleType, IntType, LocalZonedTimestampType, LogicalType, TimestampKind, TimestampType, VarCharType} - import org.apache.calcite.config.CalciteConnectionConfig import org.apache.calcite.jdbc.CalciteSchema import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} import org.apache.calcite.schema.Schema.TableType import org.apache.calcite.schema.{Schema, SchemaPlus, Table} import org.apache.calcite.sql.{SqlCall, SqlNode} - import java.util import java.util.Collections +import org.apache.flink.table.utils.CatalogManagerMocks Review comment: nit: move this import close to other `org.apache.flink.table.xx` ########## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala ########## @@ -284,7 +284,12 @@ object MetadataTestUtil { new TestTableSource(), true, new ResolvedCatalogTable(catalogTable, resolvedSchema), - Array("project=[a, c, d]")) + new FlinkContextImpl( Review comment: create variable field to replace all `new FlinkContextImpl(...)` ########## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TableSourceTable.scala ########## @@ -65,8 +68,28 @@ class TableSourceTable( override def getQualifiedName: util.List[String] = { val builder = ImmutableList.builder[String]() - .addAll(super.getQualifiedName) - extraDigests.foreach(builder.add) + .addAll(super.getQualifiedName) + + if(abilitySpecs != null && abilitySpecs.length != 0){ +// var newProducedType = catalogTable +// .getResolvedSchema +// .toSourceRowDataType +// .getLogicalType +// .asInstanceOf[RowType] + + var newProducedType = DynamicSourceUtils.createProducedType( + catalogTable.getResolvedSchema, + tableSource) + + for (spec <- abilitySpecs) { + val sourceAbilityContext = new SourceAbilityContext(flinkContext, newProducedType) + + builder.add(spec.getDigests(sourceAbilityContext)) + if (spec.getProducedType.isPresent) { + newProducedType = spec.getProducedType.get + } Review comment: nit: these lines can be simplified as ` newProducedType = spec.getProducedType.orElse(newProducedType)` ########## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TableSourceTable.scala ########## @@ -41,8 +44,8 @@ import java.util * @param tableSource The [[DynamicTableSource]] for which is converted to a Calcite Table * @param isStreamingMode A flag that tells if the current table is in stream mode * @param catalogTable Resolved catalog table where this table source table comes from - * @param extraDigests The extra digests which will be added into `getQualifiedName` - * as a part of table digest + * @param flinkContext The flink context + * @param abilitySpecs The abilitySpec applied to the source */ Review comment: abilitySpec -> abilitySpecs ########## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TableSourceTable.scala ########## @@ -41,8 +44,8 @@ import java.util * @param tableSource The [[DynamicTableSource]] for which is converted to a Calcite Table * @param isStreamingMode A flag that tells if the current table is in stream mode * @param catalogTable Resolved catalog table where this table source table comes from - * @param extraDigests The extra digests which will be added into `getQualifiedName` - * as a part of table digest + * @param flinkContext The flink context Review comment: it's better we can explain why we need flinkContext here ########## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TableSourceTable.scala ########## @@ -20,15 +20,18 @@ package org.apache.flink.table.planner.plan.schema import org.apache.flink.table.catalog.{ObjectIdentifier, ResolvedCatalogTable} import org.apache.flink.table.connector.source.DynamicTableSource -import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec +import org.apache.flink.table.planner.plan.abilities.source.{SourceAbilityContext, SourceAbilitySpec} import org.apache.flink.table.planner.plan.stats.FlinkStatistic - import com.google.common.collect.ImmutableList import org.apache.calcite.plan.RelOptSchema import org.apache.calcite.rel.`type`.RelDataType - import java.util +import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkTypeFactory} Review comment: remove the useless imports ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.java ########## @@ -183,4 +185,25 @@ public void onPeriodicEmit(WatermarkOutput output) { } } } + + @Override + public String getDigests(SourceAbilityContext context) { + final String expressionStr = + FlinkRexUtil.getExpressionString( + watermarkExpr, + JavaScalaConversionUtil.toScala( + context.getSourceRowType().getFieldNames())); + + // final String expressionStr = + // FlinkRexUtil.getExpressionString( + // watermarkExpr, + // + // JavaScalaConversionUtil.toScala(getProducedType().get().getFieldNames())); Review comment: remove these code -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org