This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 453902dd3 [INLONG-6300][Sort] Schema update policy unifie behavior for all column change type (#6306) 453902dd3 is described below commit 453902dd331ab742de2b0297a61b52d17ca1e64e Author: thesumery <107393625+thesum...@users.noreply.github.com> AuthorDate: Thu Oct 27 20:04:11 2022 +0800 [INLONG-6300][Sort] Schema update policy unifie behavior for all column change type (#6306) --- .../org/apache/inlong/sort/base/Constants.java | 12 ++------ .../inlong/sort/base/sink/MultipleSinkOption.java | 32 ++++++---------------- .../base/sink/SchemaUpdateExceptionPolicy.java | 4 +++ .../sort/iceberg/FlinkDynamicTableFactory.java | 6 ++-- .../inlong/sort/iceberg/IcebergTableSink.java | 6 ++-- .../sink/multiple/DynamicSchemaHandleOperator.java | 22 +++++++-------- 6 files changed, 30 insertions(+), 52 deletions(-) diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java index 59c192bdd..fb542ef6d 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java @@ -152,15 +152,9 @@ public final class Constants { .withDescription("The option 'sink.multiple.enable' " + "is used to determine whether to support multiple sink writing, default is 'false'."); - public static final ConfigOption<SchemaUpdateExceptionPolicy> SINK_MULTIPLE_ADD_COLUMN_POLICY = - ConfigOptions.key("sink.multiple.add-column.policy") + public static final ConfigOption<SchemaUpdateExceptionPolicy> SINK_MULTIPLE_SCHEMA_UPDATE_POLICY = + ConfigOptions.key("sink.multiple.schema-update.policy") .enumType(SchemaUpdateExceptionPolicy.class) .defaultValue(SchemaUpdateExceptionPolicy.TRY_IT_BEST) - .withDescription("The action to deal with column add."); - - public static final ConfigOption<SchemaUpdateExceptionPolicy> SINK_MULTIPLE_DEL_COLUMN_POLICY = - ConfigOptions.key("sink.multiple.del-column.policy") - .enumType(SchemaUpdateExceptionPolicy.class) - .defaultValue(SchemaUpdateExceptionPolicy.TRY_IT_BEST) - .withDescription("The action to deal with column delete."); + .withDescription("The action to deal with schema update in multiple sink."); } diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java index bf966e1e1..77c924b95 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java @@ -37,22 +37,18 @@ public class MultipleSinkOption implements Serializable { private String format; - private SchemaUpdateExceptionPolicy addColumnPolicy; - - private SchemaUpdateExceptionPolicy delColumnPolicy; + private SchemaUpdateExceptionPolicy schemaUpdatePolicy; private String databasePattern; private String tablePattern; public MultipleSinkOption(String format, - SchemaUpdateExceptionPolicy addColumnPolicy, - SchemaUpdateExceptionPolicy delColumnPolicy, + SchemaUpdateExceptionPolicy schemaUpdatePolicy, String databasePattern, String tablePattern) { this.format = format; - this.addColumnPolicy = addColumnPolicy; - this.delColumnPolicy = delColumnPolicy; + this.schemaUpdatePolicy = schemaUpdatePolicy; this.databasePattern = databasePattern; this.tablePattern = tablePattern; } @@ -61,12 +57,8 @@ public class MultipleSinkOption implements Serializable { return format; } - public SchemaUpdateExceptionPolicy getAddColumnPolicy() { - return addColumnPolicy; - } - - public SchemaUpdateExceptionPolicy getDelColumnPolicy() { - return delColumnPolicy; + public SchemaUpdateExceptionPolicy getSchemaUpdatePolicy() { + return schemaUpdatePolicy; } public String getDatabasePattern() { @@ -83,8 +75,7 @@ public class MultipleSinkOption implements Serializable { public static class Builder { private String format; - private SchemaUpdateExceptionPolicy addColumnPolicy; - private SchemaUpdateExceptionPolicy delColumnPolicy; + private SchemaUpdateExceptionPolicy schemaUpdatePolicy; private String databasePattern; private String tablePattern; @@ -93,13 +84,8 @@ public class MultipleSinkOption implements Serializable { return this; } - public MultipleSinkOption.Builder withAddColumnPolicy(SchemaUpdateExceptionPolicy addColumnPolicy) { - this.addColumnPolicy = addColumnPolicy; - return this; - } - - public MultipleSinkOption.Builder withDelColumnPolicy(SchemaUpdateExceptionPolicy delColumnPolicy) { - this.delColumnPolicy = delColumnPolicy; + public MultipleSinkOption.Builder withSchemaUpdatePolicy(SchemaUpdateExceptionPolicy schemaUpdatePolicy) { + this.schemaUpdatePolicy = schemaUpdatePolicy; return this; } @@ -114,7 +100,7 @@ public class MultipleSinkOption implements Serializable { } public MultipleSinkOption build() { - return new MultipleSinkOption(format, addColumnPolicy, delColumnPolicy, databasePattern, tablePattern); + return new MultipleSinkOption(format, schemaUpdatePolicy, databasePattern, tablePattern); } } diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/SchemaUpdateExceptionPolicy.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/SchemaUpdateExceptionPolicy.java index 74cd271a6..7da5ac772 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/SchemaUpdateExceptionPolicy.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/SchemaUpdateExceptionPolicy.java @@ -39,4 +39,8 @@ public enum SchemaUpdateExceptionPolicy { SchemaUpdateExceptionPolicy(String description) { this.description = description; } + + public String getDescription() { + return description; + } } diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java index 5d63d8dc9..4852bca33 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java @@ -50,11 +50,10 @@ import java.util.Set; import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT; import static org.apache.inlong.sort.base.Constants.INLONG_METRIC; -import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ADD_COLUMN_POLICY; import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN; -import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DEL_COLUMN_POLICY; import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE; import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT; +import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY; import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN; import static org.apache.inlong.sort.iceberg.FlinkConfigOptions.ICEBERG_IGNORE_ALL_CHANGELOG; @@ -241,8 +240,7 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, Dynami options.add(SINK_MULTIPLE_FORMAT); options.add(SINK_MULTIPLE_DATABASE_PATTERN); options.add(SINK_MULTIPLE_TABLE_PATTERN); - options.add(SINK_MULTIPLE_ADD_COLUMN_POLICY); - options.add(SINK_MULTIPLE_DEL_COLUMN_POLICY); + options.add(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY); return options; } diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java index 8296d02ff..ee7cf2c89 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java @@ -44,11 +44,10 @@ import java.util.Map; import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT; import static org.apache.inlong.sort.base.Constants.INLONG_METRIC; -import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ADD_COLUMN_POLICY; import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN; -import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DEL_COLUMN_POLICY; import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE; import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT; +import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY; import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN; import static org.apache.inlong.sort.iceberg.FlinkConfigOptions.ICEBERG_IGNORE_ALL_CHANGELOG; @@ -107,8 +106,7 @@ public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning, .withFormat(tableOptions.get(SINK_MULTIPLE_FORMAT)) .withDatabasePattern(tableOptions.get(SINK_MULTIPLE_DATABASE_PATTERN)) .withTablePattern(tableOptions.get(SINK_MULTIPLE_TABLE_PATTERN)) - .withAddColumnPolicy(tableOptions.get(SINK_MULTIPLE_ADD_COLUMN_POLICY)) - .withDelColumnPolicy(tableOptions.get(SINK_MULTIPLE_DEL_COLUMN_POLICY)) + .withSchemaUpdatePolicy(tableOptions.get(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY)) .build()) .append(); } else { diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java index ea69c12f9..e7fe68127 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java @@ -44,7 +44,6 @@ import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory; import org.apache.inlong.sort.base.sink.MultipleSinkOption; import org.apache.inlong.sort.base.sink.TableChange; import org.apache.inlong.sort.base.sink.TableChange.AddColumn; -import org.apache.inlong.sort.base.sink.TableChange.DeleteColumn; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,8 +58,6 @@ import java.util.Map; import java.util.Queue; import java.util.Set; -import static org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE; - public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWithSchema> implements OneInputStreamOperator<RowData, RecordWithSchema>, ProcessingTimeCallback { @@ -232,7 +229,7 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi Transaction transaction = table.newTransaction(); if (table.schema().sameSchema(oldSchema)) { List<TableChange> tableChanges = SchemaChangeUtils.diffSchema(oldSchema, newSchema); - if (canHandleWithSchemaUpdate(tableId, tableChanges)) { + if (canHandleWithSchemaUpdatePolicy(tableId, tableChanges)) { SchemaChangeUtils.applySchemaChanges(transaction.updateSchema(), tableChanges); LOG.info("Schema evolution in table({}) for table change: {}", tableId, tableChanges); } @@ -270,21 +267,22 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi return record; } - private boolean canHandleWithSchemaUpdate(TableIdentifier tableId, List<TableChange> tableChanges) { + private boolean canHandleWithSchemaUpdatePolicy(TableIdentifier tableId, List<TableChange> tableChanges) { boolean canHandle = true; for (TableChange tableChange : tableChanges) { if (tableChange instanceof AddColumn) { canHandle &= MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange, - multipleSinkOption.getAddColumnPolicy()); - } else if (tableChange instanceof DeleteColumn) { - canHandle &= MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange, - multipleSinkOption.getDelColumnPolicy()); + multipleSinkOption.getSchemaUpdatePolicy()); } else { - canHandle &= MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange, - LOG_WITH_IGNORE); + if (MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange, + multipleSinkOption.getSchemaUpdatePolicy())) { + LOG.info("Ignore table {} schema change: {} because iceberg can't handle it.", + tableId, tableChange); + } + // todo:currently iceberg can only handle addColumn, so always return false + canHandle = false; } } - if (!canHandle) { blacklist.add(tableId); }