This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d7826760b [INLONG-9299][Sort] Iceberg support all migrate and auto 
create table (#9306)
0d7826760b is described below

commit 0d7826760b23d4c58647e184c2d2dc46b2e41cf1
Author: Sting <zpen...@connect.ust.hk>
AuthorDate: Mon Nov 27 16:13:45 2023 +0800

    [INLONG-9299][Sort] Iceberg support all migrate and auto create table 
(#9306)
---
 .../sort/protocol/constant/DorisConstant.java      |  4 +
 .../sort/protocol/node/load/IcebergLoadNode.java   | 88 +++++++++++++++++++++-
 .../apache/inlong/sort/util/SchemaChangeUtils.java |  4 +-
 .../sink/multiple/DynamicSchemaHandleOperator.java |  6 +-
 .../mysql/source/config/MySqlSourceOptions.java    |  2 +-
 5 files changed, 97 insertions(+), 7 deletions(-)

diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/DorisConstant.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/DorisConstant.java
index e9bea2ff68..2e186de803 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/DorisConstant.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/DorisConstant.java
@@ -65,4 +65,8 @@ public class DorisConstant {
      * The multiple table-pattern of sink
      */
     public static final String SINK_MULTIPLE_TABLE_PATTERN = 
"sink.multiple.table-pattern";
+    /**
+     * The schema change policies of sink
+     */
+    public static final String SINK_SCHEMA_CHANGE_POLICIES = 
"sink.schema-change.policies";
 }
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
index 7f76891dfd..a418fd930b 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
@@ -24,9 +24,13 @@ import org.apache.inlong.sort.protocol.Metadata;
 import org.apache.inlong.sort.protocol.constant.IcebergConstant;
 import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
 import org.apache.inlong.sort.protocol.enums.FilterStrategy;
+import org.apache.inlong.sort.protocol.enums.SchemaChangePolicy;
+import org.apache.inlong.sort.protocol.enums.SchemaChangeType;
 import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.format.Format;
 import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+import org.apache.inlong.sort.util.SchemaChangeUtils;
 
 import com.google.common.base.Preconditions;
 import lombok.Data;
@@ -43,8 +47,15 @@ import java.io.Serializable;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 
+import static 
org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_MULTIPLE_DATABASE_PATTERN;
+import static 
org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_MULTIPLE_ENABLE;
+import static 
org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_MULTIPLE_FORMAT;
+import static 
org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_MULTIPLE_TABLE_PATTERN;
+import static 
org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_SCHEMA_CHANGE_POLICIES;
+
 @JsonTypeName("icebergLoad")
 @Data
 @NoArgsConstructor
@@ -76,7 +87,30 @@ public class IcebergLoadNode extends LoadNode implements 
InlongMetric, Metadata,
     @JsonProperty("appendMode")
     private String appendMode;
 
-    @JsonCreator
+    @Nullable
+    @JsonProperty("sinkMultipleEnable")
+    private Boolean sinkMultipleEnable = false;
+
+    @Nullable
+    @JsonProperty("sinkMultipleFormat")
+    private Format sinkMultipleFormat;
+
+    @Nullable
+    @JsonProperty("databasePattern")
+    private String databasePattern;
+
+    @Nullable
+    @JsonProperty("tablePattern")
+    private String tablePattern;
+
+    @Nullable
+    @JsonProperty("enableSchemaChange")
+    private boolean enableSchemaChange;
+
+    @Nullable
+    @JsonProperty("policyMap")
+    private Map<SchemaChangeType, SchemaChangePolicy> policyMap;
+
     public IcebergLoadNode(@JsonProperty("id") String id,
             @JsonProperty("name") String name,
             @JsonProperty("fields") List<FieldInfo> fields,
@@ -92,14 +126,51 @@ public class IcebergLoadNode extends LoadNode implements 
InlongMetric, Metadata,
             @JsonProperty("uri") String uri,
             @JsonProperty("warehouse") String warehouse,
             @JsonProperty("appendMode") String appendMode) {
+        this(id, name, fields, fieldRelations, filters, filterStrategy, 
sinkParallelism, properties, dbName, tableName,
+                primaryKey, catalogType, uri, warehouse, appendMode, false, 
null,
+                null, null, false, null);
+    }
+
+    @JsonCreator
+    public IcebergLoadNode(@JsonProperty("id") String id,
+            @JsonProperty("name") String name,
+            @JsonProperty("fields") List<FieldInfo> fields,
+            @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
+            @JsonProperty("filters") List<FilterFunction> filters,
+            @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
+            @Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism,
+            @JsonProperty("properties") Map<String, String> properties,
+            @Nonnull @JsonProperty("dbName") String dbName,
+            @Nonnull @JsonProperty("tableName") String tableName,
+            @JsonProperty("primaryKey") String primaryKey,
+            @JsonProperty("catalogType") IcebergConstant.CatalogType 
catalogType,
+            @JsonProperty("uri") String uri,
+            @JsonProperty("warehouse") String warehouse,
+            @JsonProperty("appendMode") String appendMode,
+            @Nullable @JsonProperty(value = "sinkMultipleEnable", defaultValue 
= "false") Boolean sinkMultipleEnable,
+            @Nullable @JsonProperty("sinkMultipleFormat") Format 
sinkMultipleFormat,
+            @Nullable @JsonProperty("databasePattern") String databasePattern,
+            @Nullable @JsonProperty("tablePattern") String tablePattern,
+            @JsonProperty("enableSchemaChange") boolean enableSchemaChange,
+            @Nullable @JsonProperty("policyMap") Map<SchemaChangeType, 
SchemaChangePolicy> policyMap) {
         super(id, name, fields, fieldRelations, filters, filterStrategy, 
sinkParallelism, properties);
-        this.tableName = Preconditions.checkNotNull(tableName, "table name is 
null");
-        this.dbName = Preconditions.checkNotNull(dbName, "db name is null");
         this.primaryKey = primaryKey;
         this.catalogType = catalogType == null ? CatalogType.HIVE : 
catalogType;
         this.uri = uri;
         this.warehouse = warehouse;
         this.appendMode = appendMode;
+        this.sinkMultipleEnable = sinkMultipleEnable;
+        if (sinkMultipleEnable == null || !sinkMultipleEnable) {
+            this.tableName = Preconditions.checkNotNull(tableName, "table name 
is null");
+            this.dbName = Preconditions.checkNotNull(dbName, "db name is 
null");
+        } else {
+            this.databasePattern = Preconditions.checkNotNull(databasePattern, 
"databasePattern is null");
+            this.tablePattern = Preconditions.checkNotNull(tablePattern, 
"tablePattern is null");
+            this.sinkMultipleFormat = 
Preconditions.checkNotNull(sinkMultipleFormat,
+                    "sinkMultipleFormat is null");
+        }
+        this.enableSchemaChange = enableSchemaChange;
+        this.policyMap = policyMap;
     }
 
     @Override
@@ -120,6 +191,17 @@ public class IcebergLoadNode extends LoadNode implements 
InlongMetric, Metadata,
         if (null != warehouse) {
             options.put(IcebergConstant.WAREHOUSE_KEY, warehouse);
         }
+
+        if (sinkMultipleEnable != null && sinkMultipleEnable) {
+            options.put(SINK_MULTIPLE_ENABLE, sinkMultipleEnable.toString());
+            options.put(SINK_MULTIPLE_FORMAT, 
Objects.requireNonNull(sinkMultipleFormat).identifier());
+            options.put(SINK_MULTIPLE_DATABASE_PATTERN, databasePattern);
+            options.put(SINK_MULTIPLE_TABLE_PATTERN, tablePattern);
+            options.put(SINK_SCHEMA_CHANGE_POLICIES, 
SchemaChangeUtils.serialize(policyMap));
+        } else {
+            options.put(SINK_MULTIPLE_ENABLE, "false");
+        }
+
         return options;
     }
 
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/SchemaChangeUtils.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/SchemaChangeUtils.java
index c6ffac7da0..2ef8187022 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/SchemaChangeUtils.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/SchemaChangeUtils.java
@@ -28,6 +28,7 @@ import org.apache.inlong.sort.schema.TableChange;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
+import org.apache.commons.lang3.StringUtils;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -286,8 +287,9 @@ public class SchemaChangeUtils {
         for (String colName : intersectColSet) {
             ColumnSchema oldCol = oldColumnSchemas.get(colName);
             ColumnSchema newCol = newColumnSchemas.get(colName);
+
             if (!oldCol.getType().equals(newCol.getType())
-                    || !oldCol.getComment().equals(newCol.getComment())) {
+                    || !StringUtils.equals(oldCol.getComment(), 
newCol.getComment())) {
                 tableChanges.add(
                         new TableChange.UpdateColumn(
                                 new String[]{newCol.getName()},
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
index 6ae3a3a28b..e416cfd24f 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
@@ -512,8 +512,10 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
                             String.format("Unsupported table %s schema change: 
%s.", tableId.toString(), tableChange));
                 }
             }
-            
IcebergSchemaChangeUtils.applySchemaChanges(transaction.updateSchema(), 
tableChanges);
-            LOG.info("Schema evolution in table({}) for table change: {}", 
tableId, tableChanges);
+            if (!tableChanges.isEmpty()) {
+                
IcebergSchemaChangeUtils.applySchemaChanges(transaction.updateSchema(), 
tableChanges);
+                LOG.info("Schema evolution in table({}) for table change: {}", 
tableId, tableChanges);
+            }
         }
         transaction.commitTransaction();
         handleSchemaInfoEvent(tableId, table.schema());
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
index 8faabdca41..b806028cbd 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
@@ -210,7 +210,7 @@ public class MySqlSourceOptions {
     public static final ConfigOption<Boolean> INCLUDE_INCREMENTAL =
             ConfigOptions.key("include-incremental")
                     .booleanType()
-                    .defaultValue(false)
+                    .defaultValue(true)
                     .withDescription("Whether include a incremental flag in 
data "
                             + "when migrating all databases");
 

Reply via email to