This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 0d7826760b [INLONG-9299][Sort] Iceberg support all migrate and auto create table (#9306) 0d7826760b is described below commit 0d7826760b23d4c58647e184c2d2dc46b2e41cf1 Author: Sting <zpen...@connect.ust.hk> AuthorDate: Mon Nov 27 16:13:45 2023 +0800 [INLONG-9299][Sort] Iceberg support all migrate and auto create table (#9306) --- .../sort/protocol/constant/DorisConstant.java | 4 + .../sort/protocol/node/load/IcebergLoadNode.java | 88 +++++++++++++++++++++- .../apache/inlong/sort/util/SchemaChangeUtils.java | 4 +- .../sink/multiple/DynamicSchemaHandleOperator.java | 6 +- .../mysql/source/config/MySqlSourceOptions.java | 2 +- 5 files changed, 97 insertions(+), 7 deletions(-) diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/DorisConstant.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/DorisConstant.java index e9bea2ff68..2e186de803 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/DorisConstant.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/DorisConstant.java @@ -65,4 +65,8 @@ public class DorisConstant { * The multiple table-pattern of sink */ public static final String SINK_MULTIPLE_TABLE_PATTERN = "sink.multiple.table-pattern"; + /** + * The schema change policies of sink + */ + public static final String SINK_SCHEMA_CHANGE_POLICIES = "sink.schema-change.policies"; } diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java index 7f76891dfd..a418fd930b 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java @@ -24,9 +24,13 @@ import org.apache.inlong.sort.protocol.Metadata; import org.apache.inlong.sort.protocol.constant.IcebergConstant; import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType; import org.apache.inlong.sort.protocol.enums.FilterStrategy; +import org.apache.inlong.sort.protocol.enums.SchemaChangePolicy; +import org.apache.inlong.sort.protocol.enums.SchemaChangeType; import org.apache.inlong.sort.protocol.node.LoadNode; +import org.apache.inlong.sort.protocol.node.format.Format; import org.apache.inlong.sort.protocol.transformation.FieldRelation; import org.apache.inlong.sort.protocol.transformation.FilterFunction; +import org.apache.inlong.sort.util.SchemaChangeUtils; import com.google.common.base.Preconditions; import lombok.Data; @@ -43,8 +47,15 @@ import java.io.Serializable; import java.util.EnumSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; +import static org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_MULTIPLE_DATABASE_PATTERN; +import static org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_MULTIPLE_ENABLE; +import static org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_MULTIPLE_FORMAT; +import static org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_MULTIPLE_TABLE_PATTERN; +import static org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_SCHEMA_CHANGE_POLICIES; + @JsonTypeName("icebergLoad") @Data @NoArgsConstructor @@ -76,7 +87,30 @@ public class IcebergLoadNode extends LoadNode implements InlongMetric, Metadata, @JsonProperty("appendMode") private String appendMode; - @JsonCreator + @Nullable + @JsonProperty("sinkMultipleEnable") + private Boolean sinkMultipleEnable = false; + + @Nullable + @JsonProperty("sinkMultipleFormat") + private Format sinkMultipleFormat; + + @Nullable + @JsonProperty("databasePattern") + private String databasePattern; + + @Nullable + @JsonProperty("tablePattern") + private String tablePattern; + + @Nullable + @JsonProperty("enableSchemaChange") + private boolean enableSchemaChange; + + @Nullable + @JsonProperty("policyMap") + private Map<SchemaChangeType, SchemaChangePolicy> policyMap; + public IcebergLoadNode(@JsonProperty("id") String id, @JsonProperty("name") String name, @JsonProperty("fields") List<FieldInfo> fields, @@ -92,14 +126,51 @@ public class IcebergLoadNode extends LoadNode implements InlongMetric, Metadata, @JsonProperty("uri") String uri, @JsonProperty("warehouse") String warehouse, @JsonProperty("appendMode") String appendMode) { + this(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties, dbName, tableName, + primaryKey, catalogType, uri, warehouse, appendMode, false, null, + null, null, false, null); + } + + @JsonCreator + public IcebergLoadNode(@JsonProperty("id") String id, + @JsonProperty("name") String name, + @JsonProperty("fields") List<FieldInfo> fields, + @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations, + @JsonProperty("filters") List<FilterFunction> filters, + @JsonProperty("filterStrategy") FilterStrategy filterStrategy, + @Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism, + @JsonProperty("properties") Map<String, String> properties, + @Nonnull @JsonProperty("dbName") String dbName, + @Nonnull @JsonProperty("tableName") String tableName, + @JsonProperty("primaryKey") String primaryKey, + @JsonProperty("catalogType") IcebergConstant.CatalogType catalogType, + @JsonProperty("uri") String uri, + @JsonProperty("warehouse") String warehouse, + @JsonProperty("appendMode") String appendMode, + @Nullable @JsonProperty(value = "sinkMultipleEnable", defaultValue = "false") Boolean sinkMultipleEnable, + @Nullable @JsonProperty("sinkMultipleFormat") Format sinkMultipleFormat, + @Nullable @JsonProperty("databasePattern") String databasePattern, + @Nullable @JsonProperty("tablePattern") String tablePattern, + @JsonProperty("enableSchemaChange") boolean enableSchemaChange, + @Nullable @JsonProperty("policyMap") Map<SchemaChangeType, SchemaChangePolicy> policyMap) { super(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties); - this.tableName = Preconditions.checkNotNull(tableName, "table name is null"); - this.dbName = Preconditions.checkNotNull(dbName, "db name is null"); this.primaryKey = primaryKey; this.catalogType = catalogType == null ? CatalogType.HIVE : catalogType; this.uri = uri; this.warehouse = warehouse; this.appendMode = appendMode; + this.sinkMultipleEnable = sinkMultipleEnable; + if (sinkMultipleEnable == null || !sinkMultipleEnable) { + this.tableName = Preconditions.checkNotNull(tableName, "table name is null"); + this.dbName = Preconditions.checkNotNull(dbName, "db name is null"); + } else { + this.databasePattern = Preconditions.checkNotNull(databasePattern, "databasePattern is null"); + this.tablePattern = Preconditions.checkNotNull(tablePattern, "tablePattern is null"); + this.sinkMultipleFormat = Preconditions.checkNotNull(sinkMultipleFormat, + "sinkMultipleFormat is null"); + } + this.enableSchemaChange = enableSchemaChange; + this.policyMap = policyMap; } @Override @@ -120,6 +191,17 @@ public class IcebergLoadNode extends LoadNode implements InlongMetric, Metadata, if (null != warehouse) { options.put(IcebergConstant.WAREHOUSE_KEY, warehouse); } + + if (sinkMultipleEnable != null && sinkMultipleEnable) { + options.put(SINK_MULTIPLE_ENABLE, sinkMultipleEnable.toString()); + options.put(SINK_MULTIPLE_FORMAT, Objects.requireNonNull(sinkMultipleFormat).identifier()); + options.put(SINK_MULTIPLE_DATABASE_PATTERN, databasePattern); + options.put(SINK_MULTIPLE_TABLE_PATTERN, tablePattern); + options.put(SINK_SCHEMA_CHANGE_POLICIES, SchemaChangeUtils.serialize(policyMap)); + } else { + options.put(SINK_MULTIPLE_ENABLE, "false"); + } + return options; } diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/SchemaChangeUtils.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/SchemaChangeUtils.java index c6ffac7da0..2ef8187022 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/SchemaChangeUtils.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/SchemaChangeUtils.java @@ -28,6 +28,7 @@ import org.apache.inlong.sort.schema.TableChange; import com.google.common.base.Preconditions; import com.google.common.collect.Sets; +import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; import java.util.HashMap; @@ -286,8 +287,9 @@ public class SchemaChangeUtils { for (String colName : intersectColSet) { ColumnSchema oldCol = oldColumnSchemas.get(colName); ColumnSchema newCol = newColumnSchemas.get(colName); + if (!oldCol.getType().equals(newCol.getType()) - || !oldCol.getComment().equals(newCol.getComment())) { + || !StringUtils.equals(oldCol.getComment(), newCol.getComment())) { tableChanges.add( new TableChange.UpdateColumn( new String[]{newCol.getName()}, diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java index 6ae3a3a28b..e416cfd24f 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java +++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java @@ -512,8 +512,10 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi String.format("Unsupported table %s schema change: %s.", tableId.toString(), tableChange)); } } - IcebergSchemaChangeUtils.applySchemaChanges(transaction.updateSchema(), tableChanges); - LOG.info("Schema evolution in table({}) for table change: {}", tableId, tableChanges); + if (!tableChanges.isEmpty()) { + IcebergSchemaChangeUtils.applySchemaChanges(transaction.updateSchema(), tableChanges); + LOG.info("Schema evolution in table({}) for table change: {}", tableId, tableChanges); + } } transaction.commitTransaction(); handleSchemaInfoEvent(tableId, table.schema()); diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java index 8faabdca41..b806028cbd 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java +++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java @@ -210,7 +210,7 @@ public class MySqlSourceOptions { public static final ConfigOption<Boolean> INCLUDE_INCREMENTAL = ConfigOptions.key("include-incremental") .booleanType() - .defaultValue(false) + .defaultValue(true) .withDescription("Whether include a incremental flag in data " + "when migrating all databases");