twalthr commented on code in PR #25054: URL: https://github.com/apache/flink/pull/25054#discussion_r1669852050
########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableAsUtil.java: ########## @@ -83,6 +96,19 @@ public Schema mergeSchemas(SqlCreateTableAs sqlCreateTableAs, ResolvedSchema sou sqlCreateTableAs.getColumnList(), Schema.newBuilder().fromResolvedSchema(sourceSchema).build().getColumns()); + if (sqlCreateTableAs.getWatermark().isPresent()) { + schemaBuilder.setWatermark(sqlCreateTableAs.getWatermark().get()); + } + + Optional<SqlTableConstraint> primaryKey = + sqlCreateTableAs.getFullConstraints().stream() Review Comment: Throw a helpful exception if something unexpected is contained in this list. ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableAsUtil.java: ########## @@ -83,6 +96,19 @@ public Schema mergeSchemas(SqlCreateTableAs sqlCreateTableAs, ResolvedSchema sou sqlCreateTableAs.getColumnList(), Schema.newBuilder().fromResolvedSchema(sourceSchema).build().getColumns()); + if (sqlCreateTableAs.getWatermark().isPresent()) { + schemaBuilder.setWatermark(sqlCreateTableAs.getWatermark().get()); + } + + Optional<SqlTableConstraint> primaryKey = + sqlCreateTableAs.getFullConstraints().stream() Review Comment: Throw a helpful exception if something unexpected is contained in this list. ########## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala: ########## @@ -399,6 +399,47 @@ class TableSinkITCase(mode: StateBackendMode) extends StreamingWithStateTestBase assertThat(actualUseStatement.sorted).isEqualTo(expected.sorted) } + @TestTemplate + def testCreateTableAsSelectWithPrimaryKey(): Unit = { Review Comment: not sure if this test is really useful. the primary key is not used in this test. even if it were lost in translation, the test would not even notice it. a non-IT case should be enough that accesses the catalog table and this is done by `SqlDdlToOperationConverterTest` already. so let's drop this one. ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java: ########## @@ -312,113 +302,36 @@ private void appendDerivedPrimaryKey(@Nullable SqlTableConstraint derivedPrimary "The base table already has a primary key. You might " + "want to specify EXCLUDING CONSTRAINTS."); } else if (derivedPrimaryKey != null) { - List<String> primaryKeyColumns = new ArrayList<>(); - for (SqlNode primaryKeyNode : derivedPrimaryKey.getColumns()) { - String primaryKey = ((SqlIdentifier) primaryKeyNode).getSimple(); - if (!columns.containsKey(primaryKey)) { - throw new ValidationException( - String.format( - "Primary key column '%s' is not defined in the schema at %s", - primaryKey, primaryKeyNode.getParserPosition())); - } - if (!(columns.get(primaryKey) instanceof UnresolvedPhysicalColumn)) { - throw new ValidationException( - String.format( - "Could not create a PRIMARY KEY with column '%s' at %s.\n" - + "A PRIMARY KEY constraint must be declared on physical columns.", - primaryKey, primaryKeyNode.getParserPosition())); - } - primaryKeyColumns.add(primaryKey); - } - primaryKey = - new UnresolvedPrimaryKey( - derivedPrimaryKey - .getConstraintName() - .orElseGet( - () -> - primaryKeyColumns.stream() - .collect( - Collectors.joining( - "_", "PK_", ""))), - primaryKeyColumns); + setPrimaryKey(derivedPrimaryKey); } } private void appendDerivedWatermarks( Map<FeatureOption, MergingStrategy> mergingStrategies, List<SqlWatermark> derivedWatermarkSpecs) { - for (SqlWatermark derivedWatermarkSpec : derivedWatermarkSpecs) { - SqlIdentifier eventTimeColumnName = derivedWatermarkSpec.getEventTimeColumnName(); - - HashMap<String, RelDataType> nameToTypeMap = new LinkedHashMap<>(); - nameToTypeMap.putAll(physicalFieldNamesToTypes); - nameToTypeMap.putAll(metadataFieldNamesToTypes); - nameToTypeMap.putAll(computedFieldNamesToTypes); - - verifyRowtimeAttribute(mergingStrategies, eventTimeColumnName, nameToTypeMap); - String rowtimeAttribute = eventTimeColumnName.toString(); - - SqlNode expression = derivedWatermarkSpec.getWatermarkStrategy(); - - // this will validate and expand function identifiers. - SqlNode validated = - sqlValidator.validateParameterizedExpression(expression, nameToTypeMap); - - watermarkSpecs.put( - rowtimeAttribute, - new UnresolvedWatermarkSpec( - rowtimeAttribute, - new SqlCallExpression(escapeExpressions.apply(validated)))); - } - } - - private void verifyRowtimeAttribute( - Map<FeatureOption, MergingStrategy> mergingStrategies, - SqlIdentifier eventTimeColumnName, - Map<String, RelDataType> allFieldsTypes) { - String fullRowtimeExpression = eventTimeColumnName.toString(); - boolean specAlreadyExists = watermarkSpecs.containsKey(fullRowtimeExpression); - - if (specAlreadyExists - && mergingStrategies.get(FeatureOption.WATERMARKS) - != MergingStrategy.OVERWRITING) { - throw new ValidationException( - String.format( - "There already exists a watermark spec for column '%s' in the base table. You " - + "might want to specify EXCLUDING WATERMARKS or OVERWRITING WATERMARKS.", - fullRowtimeExpression)); - } - - List<String> components = eventTimeColumnName.names; - if (!allFieldsTypes.containsKey(components.get(0))) { - throw new ValidationException( - String.format( - "The rowtime attribute field '%s' is not defined in the table schema, at %s\n" - + "Available fields: [%s]", - fullRowtimeExpression, - eventTimeColumnName.getParserPosition(), - allFieldsTypes.keySet().stream() - .collect(Collectors.joining("', '", "'", "'")))); - } - - if (components.size() > 1) { - RelDataType componentType = allFieldsTypes.get(components.get(0)); - for (int i = 1; i < components.size(); i++) { - RelDataTypeField field = componentType.getField(components.get(i), true, false); - if (field == null) { + if (mergingStrategies.get(SqlTableLike.FeatureOption.WATERMARKS) + != SqlTableLike.MergingStrategy.OVERWRITING) { + for (SqlWatermark derivedWatermarkSpec : derivedWatermarkSpecs) { + SqlIdentifier eventTimeColumnName = + derivedWatermarkSpec.getEventTimeColumnName(); + String rowtimeAttribute = eventTimeColumnName.toString(); + + if (watermarkSpecs.containsKey(rowtimeAttribute)) { throw new ValidationException( String.format( - "The rowtime attribute field '%s' is not defined in the table schema, at %s\n" - + "Nested field '%s' was not found in a composite type: %s.", - fullRowtimeExpression, - eventTimeColumnName.getComponent(i).getParserPosition(), - components.get(i), - FlinkTypeFactory.toLogicalType( - allFieldsTypes.get(components.get(0))))); + "There already exists a watermark spec for column '%s' in the base table. You " + + "might want to specify EXCLUDING WATERMARKS or OVERWRITING WATERMARKS.", + rowtimeAttribute)); } - componentType = field.getType(); } } + + HashMap<String, RelDataType> nameToTypeMap = new LinkedHashMap<>(); + nameToTypeMap.putAll(physicalFieldNamesToTypes); + nameToTypeMap.putAll(metadataFieldNamesToTypes); + nameToTypeMap.putAll(computedFieldNamesToTypes); Review Comment: No, we cannot use compute columns in other computed columns. But metadata columns can be used. The evaluation order is physical -> metadata -> computed -> watermarks. ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java: ########## @@ -312,113 +302,36 @@ private void appendDerivedPrimaryKey(@Nullable SqlTableConstraint derivedPrimary "The base table already has a primary key. You might " + "want to specify EXCLUDING CONSTRAINTS."); } else if (derivedPrimaryKey != null) { - List<String> primaryKeyColumns = new ArrayList<>(); - for (SqlNode primaryKeyNode : derivedPrimaryKey.getColumns()) { - String primaryKey = ((SqlIdentifier) primaryKeyNode).getSimple(); - if (!columns.containsKey(primaryKey)) { - throw new ValidationException( - String.format( - "Primary key column '%s' is not defined in the schema at %s", - primaryKey, primaryKeyNode.getParserPosition())); - } - if (!(columns.get(primaryKey) instanceof UnresolvedPhysicalColumn)) { - throw new ValidationException( - String.format( - "Could not create a PRIMARY KEY with column '%s' at %s.\n" - + "A PRIMARY KEY constraint must be declared on physical columns.", - primaryKey, primaryKeyNode.getParserPosition())); - } - primaryKeyColumns.add(primaryKey); - } - primaryKey = - new UnresolvedPrimaryKey( - derivedPrimaryKey - .getConstraintName() - .orElseGet( - () -> - primaryKeyColumns.stream() - .collect( - Collectors.joining( - "_", "PK_", ""))), - primaryKeyColumns); + setPrimaryKey(derivedPrimaryKey); } } private void appendDerivedWatermarks( Map<FeatureOption, MergingStrategy> mergingStrategies, List<SqlWatermark> derivedWatermarkSpecs) { - for (SqlWatermark derivedWatermarkSpec : derivedWatermarkSpecs) { - SqlIdentifier eventTimeColumnName = derivedWatermarkSpec.getEventTimeColumnName(); - - HashMap<String, RelDataType> nameToTypeMap = new LinkedHashMap<>(); - nameToTypeMap.putAll(physicalFieldNamesToTypes); - nameToTypeMap.putAll(metadataFieldNamesToTypes); - nameToTypeMap.putAll(computedFieldNamesToTypes); - - verifyRowtimeAttribute(mergingStrategies, eventTimeColumnName, nameToTypeMap); - String rowtimeAttribute = eventTimeColumnName.toString(); - - SqlNode expression = derivedWatermarkSpec.getWatermarkStrategy(); - - // this will validate and expand function identifiers. - SqlNode validated = - sqlValidator.validateParameterizedExpression(expression, nameToTypeMap); - - watermarkSpecs.put( - rowtimeAttribute, - new UnresolvedWatermarkSpec( - rowtimeAttribute, - new SqlCallExpression(escapeExpressions.apply(validated)))); - } - } - - private void verifyRowtimeAttribute( - Map<FeatureOption, MergingStrategy> mergingStrategies, - SqlIdentifier eventTimeColumnName, - Map<String, RelDataType> allFieldsTypes) { - String fullRowtimeExpression = eventTimeColumnName.toString(); - boolean specAlreadyExists = watermarkSpecs.containsKey(fullRowtimeExpression); - - if (specAlreadyExists - && mergingStrategies.get(FeatureOption.WATERMARKS) - != MergingStrategy.OVERWRITING) { - throw new ValidationException( - String.format( - "There already exists a watermark spec for column '%s' in the base table. You " - + "might want to specify EXCLUDING WATERMARKS or OVERWRITING WATERMARKS.", - fullRowtimeExpression)); - } - - List<String> components = eventTimeColumnName.names; - if (!allFieldsTypes.containsKey(components.get(0))) { - throw new ValidationException( - String.format( - "The rowtime attribute field '%s' is not defined in the table schema, at %s\n" - + "Available fields: [%s]", - fullRowtimeExpression, - eventTimeColumnName.getParserPosition(), - allFieldsTypes.keySet().stream() - .collect(Collectors.joining("', '", "'", "'")))); - } - - if (components.size() > 1) { - RelDataType componentType = allFieldsTypes.get(components.get(0)); - for (int i = 1; i < components.size(); i++) { - RelDataTypeField field = componentType.getField(components.get(i), true, false); - if (field == null) { + if (mergingStrategies.get(SqlTableLike.FeatureOption.WATERMARKS) Review Comment: nit: static import on FeatureOption and MergingStrategy to keep the code more readable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org