yuxiqian commented on code in PR #4081: URL: https://github.com/apache/flink-cdc/pull/4081#discussion_r2332207294
########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java: ########## @@ -413,6 +467,220 @@ public void testSinkWithSchemaChange(String metastore, boolean enableDeleteVecto .hasRootCauseMessage("Object 'table1' not found within 'paimon_catalog.test'"); } + enum RandomSchemaCase { + ADD_COLUMN, + REMOVE_COLUMN, + REORDER_COLUMN, + MODIFY_COLUMN; + } + + @ParameterizedTest + @EnumSource(RandomSchemaCase.class) + public void testSinkWithSchemaChangeForExistedTable(RandomSchemaCase randomSchemaCase) Review Comment: ```suggestion void testSinkWithSchemaChangeForExistedTable(RandomSchemaCase randomSchemaCase) ``` Make it package-private to suppress warnings ########## flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/utils/DataTypeUtils.java: ########## @@ -197,4 +202,101 @@ public static org.apache.flink.table.types.DataType toFlinkDataType(DataType typ throw new IllegalArgumentException("Illegal type: " + type); } } + + /** + * Convert Flink's internal {@link org.apache.flink.table.types.DataType} to CDC's {@link + * DataType}. + */ + public static DataType fromFlinkDataType(org.apache.flink.table.types.DataType flinkType) { + LogicalType logicalType = flinkType.getLogicalType(); + List<org.apache.flink.table.types.DataType> children = flinkType.getChildren(); + DataType dataType; + switch (logicalType.getTypeRoot()) { + case CHAR: + dataType = DataTypes.CHAR(getLength(logicalType)); + break; + case VARCHAR: + dataType = DataTypes.VARCHAR(getLength(logicalType)); + break; + case BOOLEAN: + dataType = DataTypes.BOOLEAN(); + break; + case BINARY: + dataType = DataTypes.BINARY(getLength(logicalType)); + break; + case VARBINARY: + dataType = DataTypes.VARBINARY(getLength(logicalType)); + break; + case DECIMAL: + dataType = DataTypes.DECIMAL(getPrecision(logicalType), getScale(logicalType)); + break; + case TINYINT: + dataType = DataTypes.TINYINT(); + break; + case SMALLINT: + dataType = DataTypes.SMALLINT(); + break; + case INTEGER: + dataType = DataTypes.INT(); + break; + case BIGINT: + dataType = DataTypes.BIGINT(); + break; + case FLOAT: + dataType = DataTypes.FLOAT(); + break; + case DOUBLE: + dataType = DataTypes.DOUBLE(); + break; + case DATE: + dataType = DataTypes.DATE(); + break; + case TIME_WITHOUT_TIME_ZONE: + dataType = DataTypes.TIME(getPrecision(logicalType)); + break; + case TIMESTAMP_WITHOUT_TIME_ZONE: + dataType = DataTypes.TIMESTAMP(getPrecision(logicalType)); + break; + case TIMESTAMP_WITH_TIME_ZONE: + dataType = DataTypes.TIMESTAMP_TZ(getPrecision(logicalType)); + break; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + dataType = DataTypes.TIMESTAMP_LTZ(getPrecision(logicalType)); + break; + case ARRAY: + Preconditions.checkState(children != null && !children.isEmpty()); Review Comment: Using `org.apache.flink.table.types.logical.LogicalTypeVisitor` might be better, but current implementation is OK to me, too. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java: ########## @@ -413,6 +467,220 @@ public void testSinkWithSchemaChange(String metastore, boolean enableDeleteVecto .hasRootCauseMessage("Object 'table1' not found within 'paimon_catalog.test'"); } + enum RandomSchemaCase { Review Comment: What does "random" mean here? ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java: ########## @@ -232,4 +344,34 @@ public void processElement(StreamRecord<Event> streamRecord) throws Exception { maxBucketsNum), new RowPartitionKeyExtractor(table.schema())); } + + /** MixedSchemaInfo is used to store the mixed schema info of upstream and paimon table. */ + private static class MixedSchemaInfo { + private final TableSchemaInfo upstreamSchemaInfo; + + private final TableSchemaInfo paimonSchemaInfo; + + private final boolean sameColumnsIgnoreCommentAndDefaultValue; + + public MixedSchemaInfo( + TableSchemaInfo upstreamSchemaInfo, TableSchemaInfo paimonSchemaInfo) { + this.upstreamSchemaInfo = upstreamSchemaInfo; + this.paimonSchemaInfo = paimonSchemaInfo; + this.sameColumnsIgnoreCommentAndDefaultValue = + PaimonWriterHelper.sameColumnsIgnoreCommentAndDefaultValue( + upstreamSchemaInfo.getSchema(), paimonSchemaInfo.getSchema()); + } + + public TableSchemaInfo getUpstreamSchemaInfo() { + return upstreamSchemaInfo; + } + + public TableSchemaInfo getPaimonSchemaInfo() { + return paimonSchemaInfo; + } + + public boolean isSameColumnsIgnoreCommentAndDefaultValue() { Review Comment: ```suggestion public boolean isSameColumnsIgnoringCommentAndDefaultValue() { ``` ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java: ########## @@ -217,7 +329,7 @@ public void processElement(StreamRecord<Event> streamRecord) throws Exception { long targetRowNum = table.coreOptions().dynamicBucketTargetRowNum(); Integer numAssigners = table.coreOptions().dynamicBucketInitialBuckets(); Integer maxBucketsNum = table.coreOptions().dynamicBucketMaxBuckets(); - + LOGGER.debug("Succeed to get table info " + table); Review Comment: ```suggestion LOGGER.debug("Succeed to get table info {}", table); ``` to avoid unnecessary costs of converting `table` to `String` ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java: ########## @@ -186,25 +191,132 @@ public void processElement(StreamRecord<Event> streamRecord) throws Exception { } } output.collect( - new StreamRecord<>(new BucketWrapperChangeEvent(bucket, (ChangeEvent) event))); - } else if (event instanceof SchemaChangeEvent) { - SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event; - Schema schema = - SchemaUtils.applySchemaChangeEvent( - Optional.ofNullable(schemaMaps.get(schemaChangeEvent.tableId())) - .map(TableSchemaInfo::getSchema) - .orElse(null), - schemaChangeEvent); - schemaMaps.put(schemaChangeEvent.tableId(), new TableSchemaInfo(schema, zoneId)); + new StreamRecord<>(new BucketWrapperChangeEvent(bucket, dataChangeEvent))); + } else { // Broadcast SchemachangeEvent. for (int index = 0; index < totalTasksNumber; index++) { output.collect( new StreamRecord<>( - new BucketWrapperChangeEvent(index, (ChangeEvent) event))); + new BucketWrapperChangeEvent( + index, + convertSchemaChangeEvent((SchemaChangeEvent) event)))); } } } + @VisibleForTesting + public SchemaChangeEvent convertSchemaChangeEvent(SchemaChangeEvent schemaChangeEvent) + throws Exception { + if (schemaChangeEvent instanceof DropTableEvent + || schemaChangeEvent instanceof TruncateTableEvent) { + return schemaChangeEvent; + } + TableId tableId = schemaChangeEvent.tableId(); + Schema upstreamSchema; + try { + upstreamSchema = + schemaMaps.containsKey(tableId) + ? schemaMaps.get(tableId).getUpstreamSchemaInfo().getSchema() + : schemaEvolutionClient.getLatestEvolvedSchema(tableId).orElse(null); + } catch (Exception e) { + // In batch mode, we can't get schema from registry. + upstreamSchema = null; + } + if (!SchemaUtils.isSchemaChangeEventRedundant(upstreamSchema, schemaChangeEvent)) { + upstreamSchema = SchemaUtils.applySchemaChangeEvent(upstreamSchema, schemaChangeEvent); + } + Schema physicalSchema = + PaimonWriterHelper.deduceSchemaForPaimonTable( + catalog.getTable(PaimonWriterHelper.identifierFromTableId(tableId))); + MixedSchemaInfo mixedSchemaInfo = + new MixedSchemaInfo( + new TableSchemaInfo(upstreamSchema, zoneId), + new TableSchemaInfo(physicalSchema, zoneId)); + if (!mixedSchemaInfo.isSameColumnsIgnoreCommentAndDefaultValue()) { + LOGGER.warn( + "Upstream schema of {} is {}, which is different with paimon physical table schema {}.", Review Comment: ```suggestion "Upstream schema of {} is {}, which is different from existing table schema {}. Data precision loss and truncation may occur.", ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org