This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 453902dd3 [INLONG-6300][Sort] Schema update policy unifie behavior for 
all column change type (#6306)
453902dd3 is described below

commit 453902dd331ab742de2b0297a61b52d17ca1e64e
Author: thesumery <107393625+thesum...@users.noreply.github.com>
AuthorDate: Thu Oct 27 20:04:11 2022 +0800

    [INLONG-6300][Sort] Schema update policy unifie behavior for all column 
change type (#6306)
---
 .../org/apache/inlong/sort/base/Constants.java     | 12 ++------
 .../inlong/sort/base/sink/MultipleSinkOption.java  | 32 ++++++----------------
 .../base/sink/SchemaUpdateExceptionPolicy.java     |  4 +++
 .../sort/iceberg/FlinkDynamicTableFactory.java     |  6 ++--
 .../inlong/sort/iceberg/IcebergTableSink.java      |  6 ++--
 .../sink/multiple/DynamicSchemaHandleOperator.java | 22 +++++++--------
 6 files changed, 30 insertions(+), 52 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 59c192bdd..fb542ef6d 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -152,15 +152,9 @@ public final class Constants {
                     .withDescription("The option 'sink.multiple.enable' "
                             + "is used to determine whether to support 
multiple sink writing, default is 'false'.");
 
-    public static final ConfigOption<SchemaUpdateExceptionPolicy> 
SINK_MULTIPLE_ADD_COLUMN_POLICY =
-            ConfigOptions.key("sink.multiple.add-column.policy")
+    public static final ConfigOption<SchemaUpdateExceptionPolicy> 
SINK_MULTIPLE_SCHEMA_UPDATE_POLICY =
+            ConfigOptions.key("sink.multiple.schema-update.policy")
                     .enumType(SchemaUpdateExceptionPolicy.class)
                     .defaultValue(SchemaUpdateExceptionPolicy.TRY_IT_BEST)
-                    .withDescription("The action to deal with column add.");
-
-    public static final ConfigOption<SchemaUpdateExceptionPolicy> 
SINK_MULTIPLE_DEL_COLUMN_POLICY =
-            ConfigOptions.key("sink.multiple.del-column.policy")
-                    .enumType(SchemaUpdateExceptionPolicy.class)
-                    .defaultValue(SchemaUpdateExceptionPolicy.TRY_IT_BEST)
-                    .withDescription("The action to deal with column delete.");
+                    .withDescription("The action to deal with schema update in 
multiple sink.");
 }
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
index bf966e1e1..77c924b95 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
@@ -37,22 +37,18 @@ public class MultipleSinkOption implements Serializable {
 
     private String format;
 
-    private SchemaUpdateExceptionPolicy addColumnPolicy;
-
-    private SchemaUpdateExceptionPolicy delColumnPolicy;
+    private SchemaUpdateExceptionPolicy schemaUpdatePolicy;
 
     private String databasePattern;
 
     private String tablePattern;
 
     public MultipleSinkOption(String format,
-            SchemaUpdateExceptionPolicy addColumnPolicy,
-            SchemaUpdateExceptionPolicy delColumnPolicy,
+            SchemaUpdateExceptionPolicy schemaUpdatePolicy,
             String databasePattern,
             String tablePattern) {
         this.format = format;
-        this.addColumnPolicy = addColumnPolicy;
-        this.delColumnPolicy = delColumnPolicy;
+        this.schemaUpdatePolicy = schemaUpdatePolicy;
         this.databasePattern = databasePattern;
         this.tablePattern = tablePattern;
     }
@@ -61,12 +57,8 @@ public class MultipleSinkOption implements Serializable {
         return format;
     }
 
-    public SchemaUpdateExceptionPolicy getAddColumnPolicy() {
-        return addColumnPolicy;
-    }
-
-    public SchemaUpdateExceptionPolicy getDelColumnPolicy() {
-        return delColumnPolicy;
+    public SchemaUpdateExceptionPolicy getSchemaUpdatePolicy() {
+        return schemaUpdatePolicy;
     }
 
     public String getDatabasePattern() {
@@ -83,8 +75,7 @@ public class MultipleSinkOption implements Serializable {
 
     public static class Builder {
         private String format;
-        private SchemaUpdateExceptionPolicy addColumnPolicy;
-        private SchemaUpdateExceptionPolicy delColumnPolicy;
+        private SchemaUpdateExceptionPolicy schemaUpdatePolicy;
         private String databasePattern;
         private String tablePattern;
 
@@ -93,13 +84,8 @@ public class MultipleSinkOption implements Serializable {
             return this;
         }
 
-        public MultipleSinkOption.Builder 
withAddColumnPolicy(SchemaUpdateExceptionPolicy addColumnPolicy) {
-            this.addColumnPolicy = addColumnPolicy;
-            return this;
-        }
-
-        public MultipleSinkOption.Builder 
withDelColumnPolicy(SchemaUpdateExceptionPolicy delColumnPolicy) {
-            this.delColumnPolicy = delColumnPolicy;
+        public MultipleSinkOption.Builder 
withSchemaUpdatePolicy(SchemaUpdateExceptionPolicy schemaUpdatePolicy) {
+            this.schemaUpdatePolicy = schemaUpdatePolicy;
             return this;
         }
 
@@ -114,7 +100,7 @@ public class MultipleSinkOption implements Serializable {
         }
 
         public MultipleSinkOption build() {
-            return new MultipleSinkOption(format, addColumnPolicy, 
delColumnPolicy, databasePattern, tablePattern);
+            return new MultipleSinkOption(format, schemaUpdatePolicy, 
databasePattern, tablePattern);
         }
     }
 
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/SchemaUpdateExceptionPolicy.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/SchemaUpdateExceptionPolicy.java
index 74cd271a6..7da5ac772 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/SchemaUpdateExceptionPolicy.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/SchemaUpdateExceptionPolicy.java
@@ -39,4 +39,8 @@ public enum SchemaUpdateExceptionPolicy {
     SchemaUpdateExceptionPolicy(String description) {
         this.description = description;
     }
+
+    public String getDescription() {
+        return description;
+    }
 }
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
index 5d63d8dc9..4852bca33 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
@@ -50,11 +50,10 @@ import java.util.Set;
 
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
-import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ADD_COLUMN_POLICY;
 import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
-import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DEL_COLUMN_POLICY;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
+import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY;
 import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
 import static 
org.apache.inlong.sort.iceberg.FlinkConfigOptions.ICEBERG_IGNORE_ALL_CHANGELOG;
 
@@ -241,8 +240,7 @@ public class FlinkDynamicTableFactory implements 
DynamicTableSinkFactory, Dynami
         options.add(SINK_MULTIPLE_FORMAT);
         options.add(SINK_MULTIPLE_DATABASE_PATTERN);
         options.add(SINK_MULTIPLE_TABLE_PATTERN);
-        options.add(SINK_MULTIPLE_ADD_COLUMN_POLICY);
-        options.add(SINK_MULTIPLE_DEL_COLUMN_POLICY);
+        options.add(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY);
         return options;
     }
 
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
index 8296d02ff..ee7cf2c89 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
@@ -44,11 +44,10 @@ import java.util.Map;
 
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
-import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ADD_COLUMN_POLICY;
 import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
-import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DEL_COLUMN_POLICY;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
+import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY;
 import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
 import static 
org.apache.inlong.sort.iceberg.FlinkConfigOptions.ICEBERG_IGNORE_ALL_CHANGELOG;
 
@@ -107,8 +106,7 @@ public class IcebergTableSink implements DynamicTableSink, 
SupportsPartitioning,
                             .withFormat(tableOptions.get(SINK_MULTIPLE_FORMAT))
                             
.withDatabasePattern(tableOptions.get(SINK_MULTIPLE_DATABASE_PATTERN))
                             
.withTablePattern(tableOptions.get(SINK_MULTIPLE_TABLE_PATTERN))
-                            
.withAddColumnPolicy(tableOptions.get(SINK_MULTIPLE_ADD_COLUMN_POLICY))
-                            
.withDelColumnPolicy(tableOptions.get(SINK_MULTIPLE_DEL_COLUMN_POLICY))
+                            
.withSchemaUpdatePolicy(tableOptions.get(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY))
                             .build())
                     .append();
         } else {
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
index ea69c12f9..e7fe68127 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
@@ -44,7 +44,6 @@ import 
org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
 import org.apache.inlong.sort.base.sink.MultipleSinkOption;
 import org.apache.inlong.sort.base.sink.TableChange;
 import org.apache.inlong.sort.base.sink.TableChange.AddColumn;
-import org.apache.inlong.sort.base.sink.TableChange.DeleteColumn;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,8 +58,6 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 
-import static 
org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE;
-
 public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWithSchema>
         implements OneInputStreamOperator<RowData, RecordWithSchema>, 
ProcessingTimeCallback {
 
@@ -232,7 +229,7 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
         Transaction transaction = table.newTransaction();
         if (table.schema().sameSchema(oldSchema)) {
             List<TableChange> tableChanges = 
SchemaChangeUtils.diffSchema(oldSchema, newSchema);
-            if (canHandleWithSchemaUpdate(tableId, tableChanges)) {
+            if (canHandleWithSchemaUpdatePolicy(tableId, tableChanges)) {
                 
SchemaChangeUtils.applySchemaChanges(transaction.updateSchema(), tableChanges);
                 LOG.info("Schema evolution in table({}) for table change: {}", 
tableId, tableChanges);
             }
@@ -270,21 +267,22 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
         return record;
     }
 
-    private boolean canHandleWithSchemaUpdate(TableIdentifier tableId, 
List<TableChange> tableChanges) {
+    private boolean canHandleWithSchemaUpdatePolicy(TableIdentifier tableId, 
List<TableChange> tableChanges) {
         boolean canHandle = true;
         for (TableChange tableChange : tableChanges) {
             if (tableChange instanceof AddColumn) {
                 canHandle &= 
MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange,
-                        multipleSinkOption.getAddColumnPolicy());
-            } else if (tableChange instanceof DeleteColumn) {
-                canHandle &= 
MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange,
-                        multipleSinkOption.getDelColumnPolicy());
+                        multipleSinkOption.getSchemaUpdatePolicy());
             } else {
-                canHandle &= 
MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange,
-                        LOG_WITH_IGNORE);
+                if 
(MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange,
+                        multipleSinkOption.getSchemaUpdatePolicy())) {
+                    LOG.info("Ignore table {} schema change: {} because 
iceberg can't handle it.",
+                            tableId, tableChange);
+                }
+                // todo:currently iceberg can only handle addColumn, so always 
return false
+                canHandle = false;
             }
         }
-
         if (!canHandle) {
             blacklist.add(tableId);
         }

Reply via email to