leonardBang commented on code in PR #4305:
URL: https://github.com/apache/flink-cdc/pull/4305#discussion_r2894419732
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java:
##########
@@ -191,7 +212,7 @@ private void sanityCheck(TableDescriptor
inferredFlussTable, TableInfo currentTa
List<String> currentPartitionKeys =
currentTableInfo.getPartitionKeys();
if (!inferredPartitionKeys.equals(currentPartitionKeys)) {
throw new ValidationException(
- "The table schema inferred by Flink CDC is not matched
with current Fluss table schema. "
+ "The table schema inffered by Flink CDC is not matched
with current Fluss table schema. "
Review Comment:
? existing word is correct.[](url)
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchema.java:
##########
@@ -82,29 +85,45 @@ private void applySchemaChangeEvent(SchemaChangeEvent
event) {
org.apache.flink.cdc.common.schema.Schema newSchema =
((CreateTableEvent) event).getSchema();
// if the table is not exist or the schema is changed, update the
table info.
- if (!tableInfoMap.containsKey(tableId)
- || !sameCdcColumnsIgnoreCommentAndDefaultValue(
- tableInfoMap.get(tableId).upstreamCdcSchema,
newSchema)) {
+ if (!schemaMaps.containsKey(tableId)
+ || !sameSchemaIgnoreCommentAndDefaultValue(
+ schemaMaps.get(tableId).upstreamCdcSchema,
newSchema)) {
Table table = connection.getTable(getTablePath(tableId));
TableSchemaInfo newSchemaInfo =
new TableSchemaInfo(newSchema,
table.getTableInfo().getSchema());
- tableInfoMap.put(tableId, newSchemaInfo);
+ schemaMaps.put(tableId, newSchemaInfo);
+ }
+ } else if (event instanceof AddColumnEvent) {
+ TableSchemaInfo schemaInfo = schemaMaps.get(event.tableId());
+ if (schemaInfo == null) {
+ throw new IllegalStateException(
+ "Cannot apply AddColumnEvent for table "
+ + event.tableId()
+ + ": table schema not found. Ensure
CreateTableEvent is processed before AddColumnEvent.");
+ }
+ Schema schema = schemaInfo.upstreamCdcSchema;
+ if (!SchemaUtils.isSchemaChangeEventRedundant(schema, event)) {
+ Table table = connection.getTable(getTablePath(tableId));
+ TableSchemaInfo newSchemaInfo =
+ new TableSchemaInfo(
+ SchemaUtils.applySchemaChangeEvent(schema,
event),
+ table.getTableInfo().getSchema());
+ schemaMaps.put(tableId, newSchemaInfo);
}
} else {
- // TODO: Logics for altering tables are not supported yet.
- // This is anticipated to be supported in Fluss version 0.8.0.
- throw new RuntimeException(
- "Schema change type not supported. Only CreateTableEvent
is allowed at the moment.");
+ throw new UnsupportedOperationException(
+ String.format(
+ "Schema change type %s not supported. Only
CreateTableEvent and AddColumnEvent is allowed at the moment.",
Review Comment:
```suggestion
"Schema change type %s not supported. Only
CreateTableEvent and AddColumnEvent are allowed at the moment.",
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]