spena commented on code in PR #25054:
URL: https://github.com/apache/flink/pull/25054#discussion_r1669360054


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java:
##########
@@ -312,113 +302,36 @@ private void appendDerivedPrimaryKey(@Nullable 
SqlTableConstraint derivedPrimary
                         "The base table already has a primary key. You might "
                                 + "want to specify EXCLUDING CONSTRAINTS.");
             } else if (derivedPrimaryKey != null) {
-                List<String> primaryKeyColumns = new ArrayList<>();
-                for (SqlNode primaryKeyNode : derivedPrimaryKey.getColumns()) {
-                    String primaryKey = ((SqlIdentifier) 
primaryKeyNode).getSimple();
-                    if (!columns.containsKey(primaryKey)) {
-                        throw new ValidationException(
-                                String.format(
-                                        "Primary key column '%s' is not 
defined in the schema at %s",
-                                        primaryKey, 
primaryKeyNode.getParserPosition()));
-                    }
-                    if (!(columns.get(primaryKey) instanceof 
UnresolvedPhysicalColumn)) {
-                        throw new ValidationException(
-                                String.format(
-                                        "Could not create a PRIMARY KEY with 
column '%s' at %s.\n"
-                                                + "A PRIMARY KEY constraint 
must be declared on physical columns.",
-                                        primaryKey, 
primaryKeyNode.getParserPosition()));
-                    }
-                    primaryKeyColumns.add(primaryKey);
-                }
-                primaryKey =
-                        new UnresolvedPrimaryKey(
-                                derivedPrimaryKey
-                                        .getConstraintName()
-                                        .orElseGet(
-                                                () ->
-                                                        
primaryKeyColumns.stream()
-                                                                .collect(
-                                                                        
Collectors.joining(
-                                                                               
 "_", "PK_", ""))),
-                                primaryKeyColumns);
+                setPrimaryKey(derivedPrimaryKey);
             }
         }
 
         private void appendDerivedWatermarks(
                 Map<FeatureOption, MergingStrategy> mergingStrategies,
                 List<SqlWatermark> derivedWatermarkSpecs) {
-            for (SqlWatermark derivedWatermarkSpec : derivedWatermarkSpecs) {
-                SqlIdentifier eventTimeColumnName = 
derivedWatermarkSpec.getEventTimeColumnName();
-
-                HashMap<String, RelDataType> nameToTypeMap = new 
LinkedHashMap<>();
-                nameToTypeMap.putAll(physicalFieldNamesToTypes);
-                nameToTypeMap.putAll(metadataFieldNamesToTypes);
-                nameToTypeMap.putAll(computedFieldNamesToTypes);
-
-                verifyRowtimeAttribute(mergingStrategies, eventTimeColumnName, 
nameToTypeMap);
-                String rowtimeAttribute = eventTimeColumnName.toString();
-
-                SqlNode expression = 
derivedWatermarkSpec.getWatermarkStrategy();
-
-                // this will validate and expand function identifiers.
-                SqlNode validated =
-                        
sqlValidator.validateParameterizedExpression(expression, nameToTypeMap);
-
-                watermarkSpecs.put(
-                        rowtimeAttribute,
-                        new UnresolvedWatermarkSpec(
-                                rowtimeAttribute,
-                                new 
SqlCallExpression(escapeExpressions.apply(validated))));
-            }
-        }
-
-        private void verifyRowtimeAttribute(
-                Map<FeatureOption, MergingStrategy> mergingStrategies,
-                SqlIdentifier eventTimeColumnName,
-                Map<String, RelDataType> allFieldsTypes) {
-            String fullRowtimeExpression = eventTimeColumnName.toString();
-            boolean specAlreadyExists = 
watermarkSpecs.containsKey(fullRowtimeExpression);
-
-            if (specAlreadyExists
-                    && mergingStrategies.get(FeatureOption.WATERMARKS)
-                            != MergingStrategy.OVERWRITING) {
-                throw new ValidationException(
-                        String.format(
-                                "There already exists a watermark spec for 
column '%s' in the base table. You "
-                                        + "might want to specify EXCLUDING 
WATERMARKS or OVERWRITING WATERMARKS.",
-                                fullRowtimeExpression));
-            }
-
-            List<String> components = eventTimeColumnName.names;
-            if (!allFieldsTypes.containsKey(components.get(0))) {
-                throw new ValidationException(
-                        String.format(
-                                "The rowtime attribute field '%s' is not 
defined in the table schema, at %s\n"
-                                        + "Available fields: [%s]",
-                                fullRowtimeExpression,
-                                eventTimeColumnName.getParserPosition(),
-                                allFieldsTypes.keySet().stream()
-                                        .collect(Collectors.joining("', '", 
"'", "'"))));
-            }
-
-            if (components.size() > 1) {
-                RelDataType componentType = 
allFieldsTypes.get(components.get(0));
-                for (int i = 1; i < components.size(); i++) {
-                    RelDataTypeField field = 
componentType.getField(components.get(i), true, false);
-                    if (field == null) {
+            if (mergingStrategies.get(SqlTableLike.FeatureOption.WATERMARKS)
+                    != SqlTableLike.MergingStrategy.OVERWRITING) {
+                for (SqlWatermark derivedWatermarkSpec : 
derivedWatermarkSpecs) {
+                    SqlIdentifier eventTimeColumnName =
+                            derivedWatermarkSpec.getEventTimeColumnName();
+                    String rowtimeAttribute = eventTimeColumnName.toString();
+
+                    if (watermarkSpecs.containsKey(rowtimeAttribute)) {
                         throw new ValidationException(
                                 String.format(
-                                        "The rowtime attribute field '%s' is 
not defined in the table schema, at %s\n"
-                                                + "Nested field '%s' was not 
found in a composite type: %s.",
-                                        fullRowtimeExpression,
-                                        
eventTimeColumnName.getComponent(i).getParserPosition(),
-                                        components.get(i),
-                                        FlinkTypeFactory.toLogicalType(
-                                                
allFieldsTypes.get(components.get(0)))));
+                                        "There already exists a watermark spec 
for column '%s' in the base table. You "
+                                                + "might want to specify 
EXCLUDING WATERMARKS or OVERWRITING WATERMARKS.",
+                                        rowtimeAttribute));
                     }
-                    componentType = field.getType();
                 }
             }
+
+            HashMap<String, RelDataType> nameToTypeMap = new LinkedHashMap<>();
+            nameToTypeMap.putAll(physicalFieldNamesToTypes);
+            nameToTypeMap.putAll(metadataFieldNamesToTypes);
+            nameToTypeMap.putAll(computedFieldNamesToTypes);

Review Comment:
   I left this like before without adding them to a global Map. I noticed the 
LIKE table option does not use the compute fields when evaluating other compute 
columns. But metadata do evaluate the expressions from compute pools. I don't 
know if that's allowed, or if we can use other compute columns to evaluate 
compute expressions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to