yuxiqian commented on code in PR #4081:
URL: https://github.com/apache/flink-cdc/pull/4081#discussion_r2332207294


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java:
##########
@@ -413,6 +467,220 @@ public void testSinkWithSchemaChange(String metastore, 
boolean enableDeleteVecto
                 .hasRootCauseMessage("Object 'table1' not found within 
'paimon_catalog.test'");
     }
 
+    enum RandomSchemaCase {
+        ADD_COLUMN,
+        REMOVE_COLUMN,
+        REORDER_COLUMN,
+        MODIFY_COLUMN;
+    }
+
+    @ParameterizedTest
+    @EnumSource(RandomSchemaCase.class)
+    public void testSinkWithSchemaChangeForExistedTable(RandomSchemaCase 
randomSchemaCase)

Review Comment:
   ```suggestion
      void testSinkWithSchemaChangeForExistedTable(RandomSchemaCase 
randomSchemaCase)
   ```
   
   Make it package-private to suppress warnings 



##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/utils/DataTypeUtils.java:
##########
@@ -197,4 +202,101 @@ public static org.apache.flink.table.types.DataType 
toFlinkDataType(DataType typ
                 throw new IllegalArgumentException("Illegal type: " + type);
         }
     }
+
+    /**
+     * Convert Flink's internal {@link org.apache.flink.table.types.DataType} 
to CDC's {@link
+     * DataType}.
+     */
+    public static DataType 
fromFlinkDataType(org.apache.flink.table.types.DataType flinkType) {
+        LogicalType logicalType = flinkType.getLogicalType();
+        List<org.apache.flink.table.types.DataType> children = 
flinkType.getChildren();
+        DataType dataType;
+        switch (logicalType.getTypeRoot()) {
+            case CHAR:
+                dataType = DataTypes.CHAR(getLength(logicalType));
+                break;
+            case VARCHAR:
+                dataType = DataTypes.VARCHAR(getLength(logicalType));
+                break;
+            case BOOLEAN:
+                dataType = DataTypes.BOOLEAN();
+                break;
+            case BINARY:
+                dataType = DataTypes.BINARY(getLength(logicalType));
+                break;
+            case VARBINARY:
+                dataType = DataTypes.VARBINARY(getLength(logicalType));
+                break;
+            case DECIMAL:
+                dataType = DataTypes.DECIMAL(getPrecision(logicalType), 
getScale(logicalType));
+                break;
+            case TINYINT:
+                dataType = DataTypes.TINYINT();
+                break;
+            case SMALLINT:
+                dataType = DataTypes.SMALLINT();
+                break;
+            case INTEGER:
+                dataType = DataTypes.INT();
+                break;
+            case BIGINT:
+                dataType = DataTypes.BIGINT();
+                break;
+            case FLOAT:
+                dataType = DataTypes.FLOAT();
+                break;
+            case DOUBLE:
+                dataType = DataTypes.DOUBLE();
+                break;
+            case DATE:
+                dataType = DataTypes.DATE();
+                break;
+            case TIME_WITHOUT_TIME_ZONE:
+                dataType = DataTypes.TIME(getPrecision(logicalType));
+                break;
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                dataType = DataTypes.TIMESTAMP(getPrecision(logicalType));
+                break;
+            case TIMESTAMP_WITH_TIME_ZONE:
+                dataType = DataTypes.TIMESTAMP_TZ(getPrecision(logicalType));
+                break;
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                dataType = DataTypes.TIMESTAMP_LTZ(getPrecision(logicalType));
+                break;
+            case ARRAY:
+                Preconditions.checkState(children != null && 
!children.isEmpty());

Review Comment:
   Using `org.apache.flink.table.types.logical.LogicalTypeVisitor` might be 
better, but current implementation is OK to me, too.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java:
##########
@@ -413,6 +467,220 @@ public void testSinkWithSchemaChange(String metastore, 
boolean enableDeleteVecto
                 .hasRootCauseMessage("Object 'table1' not found within 
'paimon_catalog.test'");
     }
 
+    enum RandomSchemaCase {

Review Comment:
   What does "random" mean here?



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java:
##########
@@ -232,4 +344,34 @@ public void processElement(StreamRecord<Event> 
streamRecord) throws Exception {
                         maxBucketsNum),
                 new RowPartitionKeyExtractor(table.schema()));
     }
+
+    /** MixedSchemaInfo is used to store the mixed schema info of upstream and 
paimon table. */
+    private static class MixedSchemaInfo {
+        private final TableSchemaInfo upstreamSchemaInfo;
+
+        private final TableSchemaInfo paimonSchemaInfo;
+
+        private final boolean sameColumnsIgnoreCommentAndDefaultValue;
+
+        public MixedSchemaInfo(
+                TableSchemaInfo upstreamSchemaInfo, TableSchemaInfo 
paimonSchemaInfo) {
+            this.upstreamSchemaInfo = upstreamSchemaInfo;
+            this.paimonSchemaInfo = paimonSchemaInfo;
+            this.sameColumnsIgnoreCommentAndDefaultValue =
+                    PaimonWriterHelper.sameColumnsIgnoreCommentAndDefaultValue(
+                            upstreamSchemaInfo.getSchema(), 
paimonSchemaInfo.getSchema());
+        }
+
+        public TableSchemaInfo getUpstreamSchemaInfo() {
+            return upstreamSchemaInfo;
+        }
+
+        public TableSchemaInfo getPaimonSchemaInfo() {
+            return paimonSchemaInfo;
+        }
+
+        public boolean isSameColumnsIgnoreCommentAndDefaultValue() {

Review Comment:
   ```suggestion
           public boolean isSameColumnsIgnoringCommentAndDefaultValue() {
   ```



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java:
##########
@@ -217,7 +329,7 @@ public void processElement(StreamRecord<Event> 
streamRecord) throws Exception {
         long targetRowNum = table.coreOptions().dynamicBucketTargetRowNum();
         Integer numAssigners = 
table.coreOptions().dynamicBucketInitialBuckets();
         Integer maxBucketsNum = table.coreOptions().dynamicBucketMaxBuckets();
-
+        LOGGER.debug("Succeed to get table info " + table);

Review Comment:
   ```suggestion
           LOGGER.debug("Succeed to get table info {}", table);
   ```
   
   to avoid unnecessary costs of converting `table` to `String`



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java:
##########
@@ -186,25 +191,132 @@ public void processElement(StreamRecord<Event> 
streamRecord) throws Exception {
                     }
             }
             output.collect(
-                    new StreamRecord<>(new BucketWrapperChangeEvent(bucket, 
(ChangeEvent) event)));
-        } else if (event instanceof SchemaChangeEvent) {
-            SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
-            Schema schema =
-                    SchemaUtils.applySchemaChangeEvent(
-                            
Optional.ofNullable(schemaMaps.get(schemaChangeEvent.tableId()))
-                                    .map(TableSchemaInfo::getSchema)
-                                    .orElse(null),
-                            schemaChangeEvent);
-            schemaMaps.put(schemaChangeEvent.tableId(), new 
TableSchemaInfo(schema, zoneId));
+                    new StreamRecord<>(new BucketWrapperChangeEvent(bucket, 
dataChangeEvent)));
+        } else {
             // Broadcast SchemachangeEvent.
             for (int index = 0; index < totalTasksNumber; index++) {
                 output.collect(
                         new StreamRecord<>(
-                                new BucketWrapperChangeEvent(index, 
(ChangeEvent) event)));
+                                new BucketWrapperChangeEvent(
+                                        index,
+                                        
convertSchemaChangeEvent((SchemaChangeEvent) event))));
             }
         }
     }
 
+    @VisibleForTesting
+    public SchemaChangeEvent convertSchemaChangeEvent(SchemaChangeEvent 
schemaChangeEvent)
+            throws Exception {
+        if (schemaChangeEvent instanceof DropTableEvent
+                || schemaChangeEvent instanceof TruncateTableEvent) {
+            return schemaChangeEvent;
+        }
+        TableId tableId = schemaChangeEvent.tableId();
+        Schema upstreamSchema;
+        try {
+            upstreamSchema =
+                    schemaMaps.containsKey(tableId)
+                            ? 
schemaMaps.get(tableId).getUpstreamSchemaInfo().getSchema()
+                            : 
schemaEvolutionClient.getLatestEvolvedSchema(tableId).orElse(null);
+        } catch (Exception e) {
+            // In batch mode, we can't get schema from registry.
+            upstreamSchema = null;
+        }
+        if (!SchemaUtils.isSchemaChangeEventRedundant(upstreamSchema, 
schemaChangeEvent)) {
+            upstreamSchema = 
SchemaUtils.applySchemaChangeEvent(upstreamSchema, schemaChangeEvent);
+        }
+        Schema physicalSchema =
+                PaimonWriterHelper.deduceSchemaForPaimonTable(
+                        
catalog.getTable(PaimonWriterHelper.identifierFromTableId(tableId)));
+        MixedSchemaInfo mixedSchemaInfo =
+                new MixedSchemaInfo(
+                        new TableSchemaInfo(upstreamSchema, zoneId),
+                        new TableSchemaInfo(physicalSchema, zoneId));
+        if (!mixedSchemaInfo.isSameColumnsIgnoreCommentAndDefaultValue()) {
+            LOGGER.warn(
+                    "Upstream schema of {} is {}, which is different with 
paimon physical table schema {}.",

Review Comment:
   ```suggestion
                       "Upstream schema of {} is {}, which is different from 
existing table schema {}. Data precision loss and truncation may occur.",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to