leonardBang commented on code in PR #4305:
URL: https://github.com/apache/flink-cdc/pull/4305#discussion_r2894419732


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java:
##########
@@ -191,7 +212,7 @@ private void sanityCheck(TableDescriptor 
inferredFlussTable, TableInfo currentTa
         List<String> currentPartitionKeys = 
currentTableInfo.getPartitionKeys();
         if (!inferredPartitionKeys.equals(currentPartitionKeys)) {
             throw new ValidationException(
-                    "The table schema inferred by Flink CDC is not matched 
with current Fluss table schema. "
+                    "The table schema inffered by Flink CDC is not matched 
with current Fluss table schema. "

Review Comment:
   ? existing word is correct.[](url)



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchema.java:
##########
@@ -82,29 +85,45 @@ private void applySchemaChangeEvent(SchemaChangeEvent 
event) {
             org.apache.flink.cdc.common.schema.Schema newSchema =
                     ((CreateTableEvent) event).getSchema();
             // if the table is not exist or the schema is changed, update the 
table info.
-            if (!tableInfoMap.containsKey(tableId)
-                    || !sameCdcColumnsIgnoreCommentAndDefaultValue(
-                            tableInfoMap.get(tableId).upstreamCdcSchema, 
newSchema)) {
+            if (!schemaMaps.containsKey(tableId)
+                    || !sameSchemaIgnoreCommentAndDefaultValue(
+                            schemaMaps.get(tableId).upstreamCdcSchema, 
newSchema)) {
                 Table table = connection.getTable(getTablePath(tableId));
                 TableSchemaInfo newSchemaInfo =
                         new TableSchemaInfo(newSchema, 
table.getTableInfo().getSchema());
-                tableInfoMap.put(tableId, newSchemaInfo);
+                schemaMaps.put(tableId, newSchemaInfo);
+            }
+        } else if (event instanceof AddColumnEvent) {
+            TableSchemaInfo schemaInfo = schemaMaps.get(event.tableId());
+            if (schemaInfo == null) {
+                throw new IllegalStateException(
+                        "Cannot apply AddColumnEvent for table "
+                                + event.tableId()
+                                + ": table schema not found. Ensure 
CreateTableEvent is processed before AddColumnEvent.");
+            }
+            Schema schema = schemaInfo.upstreamCdcSchema;
+            if (!SchemaUtils.isSchemaChangeEventRedundant(schema, event)) {
+                Table table = connection.getTable(getTablePath(tableId));
+                TableSchemaInfo newSchemaInfo =
+                        new TableSchemaInfo(
+                                SchemaUtils.applySchemaChangeEvent(schema, 
event),
+                                table.getTableInfo().getSchema());
+                schemaMaps.put(tableId, newSchemaInfo);
             }
         } else {
-            // TODO: Logics for altering tables are not supported yet.
-            // This is anticipated to be supported in Fluss version 0.8.0.
-            throw new RuntimeException(
-                    "Schema change type not supported. Only CreateTableEvent 
is allowed at the moment.");
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Schema change type %s not supported. Only 
CreateTableEvent and AddColumnEvent is allowed at the moment.",

Review Comment:
   ```suggestion
                               "Schema change type %s not supported. Only 
CreateTableEvent and AddColumnEvent are allowed at the moment.",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to