This is an automated email from the ASF dual-hosted git repository.

vernedeng pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 0e1b97ac83a8359393299d11139374d16db8ed8c
Author: yunqingmoswu <44659300+yunqingmo...@users.noreply.github.com>
AuthorDate: Mon Jul 17 10:53:45 2023 +0800

    [INLONG-7763][Sort] Support ddl change for doris (#7764)
    
    (cherry picked from commit 186e8a1325a28c744b535230b7404c6eea979100)
---
 .../sort/protocol/node/load/DorisLoadNode.java     |  42 +-
 .../inlong/sort/base/dirty/DirtyOptions.java       |   6 +
 .../sort/base/format/JsonDynamicSchemaFormat.java  |   2 +
 .../sort-flink-v1.13/sort-connectors/doris/pom.xml |  18 +
 .../inlong/sort/doris/http/HttpGetEntity.java      |  40 ++
 .../inlong/sort/doris/schema/OperationHelper.java  | 308 +++++++++++++
 .../sort/doris/schema/SchemaChangeHelper.java      | 476 +++++++++++++++++++++
 .../table/DorisDynamicSchemaOutputFormat.java      |  42 +-
 .../sort/doris/table/DorisDynamicTableFactory.java |  67 ++-
 .../sort/doris/table/DorisDynamicTableSink.java    |  17 +-
 .../sort/doris/schema/OperationHelperTest.java     | 256 +++++++++++
 11 files changed, 1258 insertions(+), 16 deletions(-)

diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DorisLoadNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DorisLoadNode.java
index a3ca5dd338..3e3ac93553 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DorisLoadNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DorisLoadNode.java
@@ -21,10 +21,13 @@ import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.InlongMetric;
 import org.apache.inlong.sort.protocol.constant.DorisConstant;
 import org.apache.inlong.sort.protocol.enums.FilterStrategy;
+import org.apache.inlong.sort.protocol.enums.SchemaChangePolicy;
+import org.apache.inlong.sort.protocol.enums.SchemaChangeType;
 import org.apache.inlong.sort.protocol.node.LoadNode;
 import org.apache.inlong.sort.protocol.node.format.Format;
 import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+import org.apache.inlong.sort.util.SchemaChangeUtils;
 
 import com.google.common.base.Preconditions;
 import lombok.Data;
@@ -95,6 +98,11 @@ public class DorisLoadNode extends LoadNode implements 
InlongMetric, Serializabl
     @Nullable
     @JsonProperty("tablePattern")
     private String tablePattern;
+    @JsonProperty("enableSchemaChange")
+    private boolean enableSchemaChange;
+    @Nullable
+    @JsonProperty("policyMap")
+    private Map<SchemaChangeType, SchemaChangePolicy> policyMap;
 
     public DorisLoadNode(@JsonProperty("id") String id,
             @JsonProperty("name") String name,
@@ -114,7 +122,6 @@ public class DorisLoadNode extends LoadNode implements 
InlongMetric, Serializabl
                 null, null);
     }
 
-    @JsonCreator
     public DorisLoadNode(@JsonProperty("id") String id,
             @JsonProperty("name") String name,
             @JsonProperty("fields") List<FieldInfo> fields,
@@ -132,6 +139,31 @@ public class DorisLoadNode extends LoadNode implements 
InlongMetric, Serializabl
             @Nullable @JsonProperty("sinkMultipleFormat") Format 
sinkMultipleFormat,
             @Nullable @JsonProperty("databasePattern") String databasePattern,
             @Nullable @JsonProperty("tablePattern") String tablePattern) {
+        this(id, name, fields, fieldRelations, filters, filterStrategy, 
sinkParallelism, properties, feNodes, userName,
+                password, tableIdentifier, primaryKey, sinkMultipleEnable, 
sinkMultipleFormat, databasePattern,
+                tablePattern, false, null);
+    }
+
+    @JsonCreator
+    public DorisLoadNode(@JsonProperty("id") String id,
+            @JsonProperty("name") String name,
+            @JsonProperty("fields") List<FieldInfo> fields,
+            @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
+            @JsonProperty("filters") List<FilterFunction> filters,
+            @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
+            @Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism,
+            @JsonProperty("properties") Map<String, String> properties,
+            @Nonnull @JsonProperty("feNodes") String feNodes,
+            @Nonnull @JsonProperty("username") String userName,
+            @Nonnull @JsonProperty("password") String password,
+            @Nullable @JsonProperty("tableIdentifier") String tableIdentifier,
+            @JsonProperty("primaryKey") String primaryKey,
+            @Nullable @JsonProperty(value = "sinkMultipleEnable", defaultValue 
= "false") Boolean sinkMultipleEnable,
+            @Nullable @JsonProperty("sinkMultipleFormat") Format 
sinkMultipleFormat,
+            @Nullable @JsonProperty("databasePattern") String databasePattern,
+            @Nullable @JsonProperty("tablePattern") String tablePattern,
+            @JsonProperty("enableSchemaChange") boolean enableSchemaChange,
+            @Nullable @JsonProperty("policyMap") Map<SchemaChangeType, 
SchemaChangePolicy> policyMap) {
         super(id, name, fields, fieldRelations, filters, filterStrategy, 
sinkParallelism, properties);
         this.feNodes = Preconditions.checkNotNull(feNodes, "feNodes is null");
         this.userName = Preconditions.checkNotNull(userName, "username is 
null");
@@ -146,6 +178,10 @@ public class DorisLoadNode extends LoadNode implements 
InlongMetric, Serializabl
             this.sinkMultipleFormat = 
Preconditions.checkNotNull(sinkMultipleFormat,
                     "sinkMultipleFormat is null");
         }
+        this.enableSchemaChange = enableSchemaChange;
+        this.policyMap = policyMap;
+        Preconditions.checkState(!enableSchemaChange || policyMap != null && 
!policyMap.isEmpty(),
+                "policyMap is empty when enableSchemaChange is 'true'");
     }
 
     @Override
@@ -160,6 +196,10 @@ public class DorisLoadNode extends LoadNode implements 
InlongMetric, Serializabl
             options.put(SINK_MULTIPLE_FORMAT, 
Objects.requireNonNull(sinkMultipleFormat).identifier());
             options.put(SINK_MULTIPLE_DATABASE_PATTERN, databasePattern);
             options.put(SINK_MULTIPLE_TABLE_PATTERN, tablePattern);
+            if (enableSchemaChange) {
+                options.put("sink.schema-change.enable", "true");
+                options.put("sink.schema-change.policies", 
SchemaChangeUtils.serialize(policyMap));
+            }
         } else {
             options.put(SINK_MULTIPLE_ENABLE, "false");
             options.put(DorisConstant.TABLE_IDENTIFIER, tableIdentifier);
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyOptions.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyOptions.java
index 58cfe1f98a..791cba0d71 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyOptions.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyOptions.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.sort.base.dirty;
 
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
+
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.ValidationException;
 
@@ -29,6 +31,7 @@ import static 
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_ENABLE;
 import static 
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_IGNORE_ERRORS;
 import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_LABELS;
 import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_LOG_TAG;
+import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY;
 
 /**
  * Dirty common options
@@ -64,6 +67,9 @@ public class DirtyOptions implements Serializable {
      */
     public static DirtyOptions fromConfig(ReadableConfig config) {
         boolean ignoreDirty = config.get(DIRTY_IGNORE);
+        if (config.get(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY) == 
SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE) {
+            ignoreDirty = true;
+        }
         boolean enableDirtySink = config.get(DIRTY_SIDE_OUTPUT_ENABLE);
         boolean ignoreSinkError = config.get(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS);
         String dirtyConnector = 
config.getOptional(DIRTY_SIDE_OUTPUT_CONNECTOR).orElse(null);
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
index 51e08f8852..e8bef34d90 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
@@ -61,6 +61,8 @@ import static 
org.apache.inlong.sort.formats.json.utils.FormatJsonUtil.SQL_TYPE_
 @SuppressWarnings("LanguageDetectionInspection")
 public abstract class JsonDynamicSchemaFormat extends 
AbstractDynamicSchemaFormat<JsonNode> {
 
+    public static final int DEFAULT_DECIMAL_PRECISION = 15;
+    public static final int DEFAULT_DECIMAL_SCALE = 5;
     private static final Logger LOG = 
LoggerFactory.getLogger(JsonDynamicSchemaFormat.class);
     /**
      * The first item of array
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/pom.xml
index 151efe8059..24dc89e825 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/pom.xml
@@ -42,6 +42,24 @@
             
<artifactId>flink-doris-connector-${flink.minor.version}_${flink.scala.binary.version}</artifactId>
             <version>${flink.connector.doris.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-common</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-format-json-v1.13</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-json</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/http/HttpGetEntity.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/http/HttpGetEntity.java
new file mode 100644
index 0000000000..217d37d3ab
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/http/HttpGetEntity.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.doris.http;
+
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+
+import java.net.URI;
+
+/**
+ * Http entity with get
+ */
+public class HttpGetEntity extends HttpEntityEnclosingRequestBase {
+
+    private final static String METHOD = "GET";
+
+    public HttpGetEntity(String uri) {
+        super();
+        setURI(URI.create(uri));
+    }
+
+    @Override
+    public String getMethod() {
+        return METHOD;
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/OperationHelper.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/OperationHelper.java
new file mode 100644
index 0000000000..c00cf81d2d
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/OperationHelper.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.doris.schema;
+
+import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
+import org.apache.inlong.sort.protocol.ddl.enums.PositionType;
+import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn;
+import org.apache.inlong.sort.protocol.ddl.expressions.Column;
+import org.apache.inlong.sort.protocol.ddl.operations.CreateTableOperation;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.util.Preconditions;
+
+import java.sql.Types;
+import java.util.Iterator;
+import java.util.List;
+import java.util.StringJoiner;
+
+public class OperationHelper {
+
+    private static final String APOSTROPHE = "'";
+    private static final String DOUBLE_QUOTES = "\"";
+    private final JsonDynamicSchemaFormat dynamicSchemaFormat;
+    private final int VARCHAR_MAX_LENGTH = 65533;
+
+    private OperationHelper(JsonDynamicSchemaFormat dynamicSchemaFormat) {
+        this.dynamicSchemaFormat = dynamicSchemaFormat;
+    }
+
+    public static OperationHelper of(JsonDynamicSchemaFormat 
dynamicSchemaFormat) {
+        return new OperationHelper(dynamicSchemaFormat);
+    }
+
+    private String convert2DorisType(int jdbcType, boolean isNullable, 
List<String> precisions) {
+        String type = null;
+        switch (jdbcType) {
+            case Types.BOOLEAN:
+            case Types.DATE:
+            case Types.FLOAT:
+            case Types.DOUBLE:
+                type = 
dynamicSchemaFormat.sqlType2FlinkType(jdbcType).copy(isNullable).asSummaryString();
+                break;
+            case Types.TINYINT:
+            case Types.SMALLINT:
+            case Types.INTEGER:
+            case Types.BIGINT:
+                if (precisions != null && !precisions.isEmpty()) {
+                    type = String.format("%s(%s)%s", 
dynamicSchemaFormat.sqlType2FlinkType(jdbcType).asSummaryString(),
+                            StringUtils.join(precisions, ","), isNullable ? "" 
: " NOT NULL");
+                } else {
+                    type = 
dynamicSchemaFormat.sqlType2FlinkType(jdbcType).copy(isNullable).asSummaryString();
+                }
+                break;
+            case Types.DECIMAL:
+                DecimalType decimalType = (DecimalType) 
dynamicSchemaFormat.sqlType2FlinkType(jdbcType);
+                if (precisions != null && !precisions.isEmpty()) {
+                    Preconditions.checkState(precisions.size() < 3,
+                            "The length of precisions with DECIMAL must small 
than 3");
+                    int precision = Integer.parseInt(precisions.get(0));
+                    int scale = JsonDynamicSchemaFormat.DEFAULT_DECIMAL_SCALE;
+                    if (precisions.size() == 2) {
+                        scale = Integer.parseInt(precisions.get(1));
+                    }
+                    decimalType = new DecimalType(isNullable, precision, 
scale);
+                } else {
+                    decimalType = new DecimalType(isNullable, 
decimalType.getPrecision(), decimalType.getScale());
+                }
+                type = decimalType.asSummaryString();
+                break;
+            case Types.CHAR:
+                LogicalType charType = 
dynamicSchemaFormat.sqlType2FlinkType(jdbcType);
+                if (precisions != null && !precisions.isEmpty()) {
+                    Preconditions.checkState(precisions.size() == 1,
+                            "The length of precisions with CHAR must be 1");
+                    charType = new CharType(isNullable, 
Integer.parseInt(precisions.get(0)));
+                } else {
+                    charType = charType.copy(isNullable);
+                }
+                type = charType.asSerializableString();
+                break;
+            case Types.VARCHAR:
+                LogicalType varcharType = 
dynamicSchemaFormat.sqlType2FlinkType(jdbcType);
+                if (precisions != null && !precisions.isEmpty()) {
+                    Preconditions.checkState(precisions.size() == 1,
+                            "The length of precisions with VARCHAR must be 1");
+                    // Because the precision definition of varchar by Doris is 
different from that of MySQL.
+                    // The precision in MySQL is the number of characters, 
while Doris is the number of bytes,
+                    // and Chinese characters occupy 3 bytes, so the precision 
multiplys by 3 here.
+                    int precision = 
Math.min(Integer.parseInt(precisions.get(0)) * 3, VARCHAR_MAX_LENGTH);
+                    varcharType = new VarCharType(isNullable, precision);
+                } else {
+                    varcharType = varcharType.copy(isNullable);
+                }
+                type = varcharType.asSerializableString();
+                break;
+            // The following types are not directly supported in doris,
+            // and can only be converted to compatible types as much as 
possible
+            case Types.TIME:
+            case Types.TIME_WITH_TIMEZONE:
+            case Types.BINARY:
+            case Types.VARBINARY:
+            case Types.BLOB:
+            case Types.CLOB:
+            case Types.LONGNVARCHAR:
+            case Types.LONGVARBINARY:
+            case Types.LONGVARCHAR:
+            case Types.ARRAY:
+            case Types.NCHAR:
+            case Types.NCLOB:
+            case Types.OTHER:
+                type = String.format("STRING%s", isNullable ? "" : " NOT 
NULL");
+                break;
+            case Types.TIMESTAMP_WITH_TIMEZONE:
+            case Types.TIMESTAMP:
+                type = "DATETIME";
+                break;
+            case Types.REAL:
+            case Types.NUMERIC:
+                int precision = 
JsonDynamicSchemaFormat.DEFAULT_DECIMAL_PRECISION;
+                int scale = JsonDynamicSchemaFormat.DEFAULT_DECIMAL_SCALE;
+                if (precisions != null && !precisions.isEmpty()) {
+                    Preconditions.checkState(precisions.size() < 3,
+                            "The length of precisions with NUMERIC must small 
than 3");
+                    precision = Integer.parseInt(precisions.get(0));
+                    if (precisions.size() == 2) {
+                        scale = Integer.parseInt(precisions.get(1));
+                    }
+                }
+                decimalType = new DecimalType(isNullable, precision, scale);
+                type = decimalType.asSerializableString();
+                break;
+            case Types.BIT:
+                type = String.format("BOOLEAN %s", isNullable ? "" : " NOT 
NULL");
+                break;
+            default:
+                type = String.format("STRING%s", isNullable ? "" : " NOT 
NULL");
+        }
+        return type;
+    }
+
+    /**
+     * Build the statement of AddColumn
+     *
+     * @param alterColumns The list of AlterColumn
+     * @return A statement of AddColumn
+     */
+    public String buildAddColumnStatement(List<AlterColumn> alterColumns) {
+        Preconditions.checkState(alterColumns != null
+                && !alterColumns.isEmpty(), "Alter columns is empty");
+        Iterator<AlterColumn> iterator = alterColumns.iterator();
+        StringBuilder sb = new StringBuilder();
+        while (iterator.hasNext()) {
+            AlterColumn expression = iterator.next();
+            Preconditions.checkNotNull(expression.getNewColumn(), "New column 
is null");
+            Column column = expression.getNewColumn();
+            Preconditions.checkState(column.getName() != null && 
!column.getName().trim().isEmpty(),
+                    "The column name is blank");
+            sb.append("ADD COLUMN `").append(column.getName()).append("` ")
+                    
.append(convert2DorisType(expression.getNewColumn().getJdbcType(),
+                            column.isNullable(), column.getDefinition()));
+            if (validDefaultValue(column.getDefaultValue())) {
+                sb.append(" DEFAULT ").append(quote(column.getDefaultValue()));
+            }
+            if (column.getComment() != null) {
+                sb.append(" COMMENT ").append(quote(column.getComment()));
+            }
+            if (column.getPosition() != null && 
column.getPosition().getPositionType() != null) {
+                if (column.getPosition().getPositionType() == 
PositionType.FIRST) {
+                    sb.append(" FIRST");
+                } else if (column.getPosition().getPositionType() == 
PositionType.AFTER) {
+                    
Preconditions.checkState(column.getPosition().getColumnName() != null
+                            && 
!column.getPosition().getColumnName().trim().isEmpty(),
+                            "The column name of Position is empty");
+                    sb.append(" AFTER 
`").append(column.getPosition().getColumnName()).append("`");
+                }
+            }
+            if (iterator.hasNext()) {
+                sb.append(", ");
+            }
+        }
+        return sb.toString();
+    }
+
+    private String quote(String value) {
+        if (value == null) {
+            return "'null'";
+        }
+        if (!value.startsWith(APOSTROPHE) && !value.startsWith(DOUBLE_QUOTES)) 
{
+            return String.format("'%s'", value);
+        }
+        return value;
+    }
+
+    /**
+     * Build the statement of DropColumn
+     *
+     * @param alterColumns The list of AlterColumn
+     * @return A statement of DropColumn
+     */
+    public String buildDropColumnStatement(List<AlterColumn> alterColumns) {
+        Preconditions.checkState(alterColumns != null
+                && !alterColumns.isEmpty(), "Alter columns is empty");
+        Iterator<AlterColumn> iterator = alterColumns.iterator();
+        StringBuilder sb = new StringBuilder();
+        while (iterator.hasNext()) {
+            AlterColumn expression = iterator.next();
+            Preconditions.checkNotNull(expression.getOldColumn(), "Old column 
is null");
+            Column column = expression.getOldColumn();
+            Preconditions.checkState(column.getName() != null && 
!column.getName().trim().isEmpty(),
+                    "The column name is blank");
+            sb.append("DROP COLUMN `").append(column.getName()).append("`");
+            if (iterator.hasNext()) {
+                sb.append(",");
+            }
+        }
+        return sb.toString();
+    }
+
+    /**
+     * Build common statement of alter
+     *
+     * @param database The database of Doris
+     * @param table The table of Doris
+     * @return A statement of Alter table
+     */
+    public String buildAlterStatementCommon(String database, String table) {
+        return "ALTER TABLE `" + database + "`.`" + table + "` ";
+    }
+
+    private boolean validDefaultValue(String defaultValue) {
+        return defaultValue != null && !defaultValue.trim().isEmpty() && 
!"NULL"
+                .equalsIgnoreCase(defaultValue);
+    }
+
+    /**
+     * Build the statement of CreateTable
+     *
+     * @param database The database of Doris
+     * @param table The table of Doris
+     * @param primaryKeys The primary key of Doris
+     * @param operation The Operation
+     * @return A statement of CreateTable
+     */
+    public String buildCreateTableStatement(String database, String table, 
List<String> primaryKeys,
+            CreateTableOperation operation) {
+        StringBuilder sb = new StringBuilder();
+        sb.append("CREATE TABLE IF NOT EXISTS 
`").append(database).append("`.`").append(table).append("`(\n");
+        Preconditions.checkState(operation.getColumns() != null && 
!operation.getColumns().isEmpty(),
+                String.format("The columns of table: %s.%s is empty", 
database, table));
+        Iterator<Column> iterator = operation.getColumns().iterator();
+        StringJoiner joiner = new StringJoiner(",");
+        while (iterator.hasNext()) {
+            Column column = iterator.next();
+            Preconditions.checkNotNull(column, "The column is null");
+            Preconditions.checkState(column.getName() != null && 
!column.getName().trim().isEmpty(),
+                    "The column name is blank");
+            sb.append("\t`").append(column.getName()).append("` 
").append(convert2DorisType(column.getJdbcType(),
+                    column.isNullable(), column.getDefinition()));
+            if (validDefaultValue(column.getDefaultValue())) {
+                sb.append(" DEFAULT ").append(quote(column.getDefaultValue()));
+            }
+            if (column.getComment() != null) {
+                sb.append(" COMMENT ").append(quote(column.getComment()));
+            }
+            joiner.add(String.format("`%s`", column.getName()));
+            if (iterator.hasNext()) {
+                sb.append(",\n");
+            }
+        }
+        sb.append("\n)\n");
+        String model = "DUPLICATE";
+        if (primaryKeys != null && !primaryKeys.isEmpty()) {
+            model = "UNIQUE";
+            joiner = new StringJoiner(",");
+            for (String primaryKey : primaryKeys) {
+                joiner.add(String.format("`%s`", primaryKey));
+            }
+        }
+        String keys = joiner.toString();
+        sb.append(model).append(" KEY(").append(keys).append(")");
+        if (StringUtils.isNotBlank(operation.getComment())) {
+            sb.append("\nCOMMENT ").append(quote(operation.getComment()));
+        }
+        sb.append("\nDISTRIBUTED BY HASH(").append(keys).append(")");
+        // Add light schema change support for it if the version of doris is 
greater than 1.2.0 or equals 1.2.0
+        sb.append("\nPROPERTIES (\n\t\"light_schema_change\" = \"true\"\n)");
+        return sb.toString();
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/SchemaChangeHelper.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/SchemaChangeHelper.java
new file mode 100644
index 0000000000..effae22e1d
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/SchemaChangeHelper.java
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.doris.schema;
+
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
+import org.apache.inlong.sort.base.dirty.DirtyType;
+import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
+import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
+import org.apache.inlong.sort.base.schema.SchemaChangeHandleException;
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
+import org.apache.inlong.sort.doris.http.HttpGetEntity;
+import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn;
+import org.apache.inlong.sort.protocol.ddl.operations.AlterOperation;
+import org.apache.inlong.sort.protocol.ddl.operations.CreateTableOperation;
+import org.apache.inlong.sort.protocol.ddl.operations.Operation;
+import org.apache.inlong.sort.protocol.enums.SchemaChangePolicy;
+import org.apache.inlong.sort.protocol.enums.SchemaChangeType;
+import org.apache.inlong.sort.util.SchemaChangeUtils;
+
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.shaded.org.apache.commons.codec.binary.Base64;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.util.Preconditions;
+import org.apache.http.HttpHeaders;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.StringJoiner;
+
+/**
+ * Schema change helper
+ */
+public class SchemaChangeHelper {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SchemaChangeHelper.class);
+
+    private static final String CHECK_LIGHT_SCHEMA_CHANGE_API = 
"http://%s/api/enable_light_schema_change/%s/%s";;
+    private static final String SCHEMA_CHANGE_API = 
"http://%s/api/query/default_cluster/%s";;
+    private static final String DORIS_HTTP_CALL_SUCCESS = "0";
+    private static final String CONTENT_TYPE_JSON = "application/json";
+    private final boolean schemaChange;
+    private final Map<SchemaChangeType, SchemaChangePolicy> policyMap;
+    private final DorisOptions options;
+    private final JsonDynamicSchemaFormat dynamicSchemaFormat;
+    private final String databasePattern;
+    private final String tablePattern;
+    private final int maxRetries;
+    private final OperationHelper operationHelper;
+    private final SchemaUpdateExceptionPolicy exceptionPolicy;
+    private final SinkTableMetricData metricData;
+    private final DirtySinkHelper<Object> dirtySinkHelper;
+
+    private SchemaChangeHelper(JsonDynamicSchemaFormat dynamicSchemaFormat, 
DorisOptions options, boolean schemaChange,
+            Map<SchemaChangeType, SchemaChangePolicy> policyMap, String 
databasePattern, String tablePattern,
+            int maxRetries, SchemaUpdateExceptionPolicy exceptionPolicy,
+            SinkTableMetricData metricData, DirtySinkHelper<Object> 
dirtySinkHelper) {
+        this.dynamicSchemaFormat = 
Preconditions.checkNotNull(dynamicSchemaFormat, "dynamicSchemaFormat is null");
+        this.options = Preconditions.checkNotNull(options, "doris options is 
null");
+        this.schemaChange = schemaChange;
+        this.policyMap = policyMap;
+        this.databasePattern = databasePattern;
+        this.tablePattern = tablePattern;
+        this.maxRetries = maxRetries;
+        this.exceptionPolicy = exceptionPolicy;
+        this.metricData = metricData;
+        this.dirtySinkHelper = dirtySinkHelper;
+        operationHelper = OperationHelper.of(dynamicSchemaFormat);
+    }
+
+    public static SchemaChangeHelper of(JsonDynamicSchemaFormat 
dynamicSchemaFormat, DorisOptions options,
+            boolean schemaChange, Map<SchemaChangeType, SchemaChangePolicy> 
policyMap, String databasePattern,
+            String tablePattern, int maxRetries, SchemaUpdateExceptionPolicy 
exceptionPolicy,
+            SinkTableMetricData metricData, DirtySinkHelper<Object> 
dirtySinkHelper) {
+        return new SchemaChangeHelper(dynamicSchemaFormat, options, 
schemaChange, policyMap, databasePattern,
+                tablePattern, maxRetries, exceptionPolicy, metricData, 
dirtySinkHelper);
+    }
+
+    /**
+     * Process schema change for Doris
+     *
+     * @param data The origin data
+     */
+    public void process(byte[] originData, JsonNode data) {
+        if (!schemaChange) {
+            return;
+        }
+        String database;
+        String table;
+        try {
+            database = dynamicSchemaFormat.parse(data, databasePattern);
+            table = dynamicSchemaFormat.parse(data, tablePattern);
+        } catch (Exception e) {
+            if (exceptionPolicy == 
SchemaUpdateExceptionPolicy.THROW_WITH_STOP) {
+                throw new SchemaChangeHandleException(
+                        String.format("Parse database, table from origin data 
failed, origin data: %s",
+                                new String(originData)),
+                        e);
+            }
+            LOGGER.warn("Parse database, table from origin data failed, origin 
data: {}", new String(originData), e);
+            if (exceptionPolicy == 
SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE) {
+                dirtySinkHelper.invoke(new String(originData), 
DirtyType.JSON_PROCESS_ERROR, e);
+            }
+            if (metricData != null) {
+                metricData.invokeDirty(1, originData.length);
+            }
+            return;
+        }
+        Operation operation;
+        try {
+            JsonNode operationNode = 
Preconditions.checkNotNull(data.get("operation"),
+                    "Operation node is null");
+            operation = Preconditions.checkNotNull(
+                    
dynamicSchemaFormat.objectMapper.convertValue(operationNode, new 
TypeReference<Operation>() {
+                    }), "Operation is null");
+        } catch (Exception e) {
+            if (exceptionPolicy == 
SchemaUpdateExceptionPolicy.THROW_WITH_STOP) {
+                throw new SchemaChangeHandleException(
+                        String.format("Extract Operation from origin data 
failed,origin data: %s", data), e);
+            }
+            LOGGER.warn("Extract Operation from origin data failed,origin 
data: {}", data, e);
+            handleDirtyData(data, originData, database, table, 
DirtyType.JSON_PROCESS_ERROR, e);
+            return;
+        }
+        String originSchema = dynamicSchemaFormat.extractDDL(data);
+        SchemaChangeType type = 
SchemaChangeUtils.extractSchemaChangeType(operation);
+        if (type == null) {
+            LOGGER.warn("Unsupported for schema-change: {}", originSchema);
+            return;
+        }
+        switch (type) {
+            case ALTER:
+                handleAlterOperation(database, table, originData, 
originSchema, data, (AlterOperation) operation);
+                break;
+            case CREATE_TABLE:
+                doCreateTable(originData, database, table, type, originSchema, 
data, (CreateTableOperation) operation);
+                break;
+            case DROP_TABLE:
+                doDropTable(type, originSchema);
+                break;
+            case RENAME_TABLE:
+                doRenameTable(type, originSchema);
+                break;
+            case TRUNCATE_TABLE:
+                doTruncateTable(type, originSchema);
+                break;
+            default:
+                LOGGER.warn("Unsupported for {}: {}", type, originSchema);
+        }
+    }
+
+    private void handleDirtyData(JsonNode data, byte[] originData, String 
database,
+            String table, DirtyType dirtyType, Throwable e) {
+        if (exceptionPolicy == SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE) {
+            String label = parseValue(data, 
dirtySinkHelper.getDirtyOptions().getLabels());
+            String logTag = parseValue(data, 
dirtySinkHelper.getDirtyOptions().getLogTag());
+            String identifier = parseValue(data, 
dirtySinkHelper.getDirtyOptions().getIdentifier());
+            dirtySinkHelper.invoke(new String(originData), dirtyType, label, 
logTag, identifier, e);
+        }
+        if (metricData != null) {
+            metricData.outputDirtyMetricsWithEstimate(database, table, 1, 
originData.length);
+        }
+    }
+
+    private void reportMetric(String database, String table, int len) {
+        if (metricData != null) {
+            metricData.outputMetrics(database, table, 1, len);
+        }
+    }
+
+    private String parseValue(JsonNode data, String pattern) {
+        try {
+            return dynamicSchemaFormat.parse(data, pattern);
+        } catch (Exception e) {
+            LOGGER.warn("Parse value from pattern failed,the pattern: {}, 
data: {}", pattern, data);
+        }
+        return pattern;
+    }
+
+    private void handleAlterOperation(String database, String table, byte[] 
originData,
+            String originSchema, JsonNode data, AlterOperation operation) {
+        if (operation.getAlterColumns() == null || 
operation.getAlterColumns().isEmpty()) {
+            if (exceptionPolicy == 
SchemaUpdateExceptionPolicy.THROW_WITH_STOP) {
+                throw new SchemaChangeHandleException(
+                        String.format("Alter columns is empty, origin schema: 
%s", originSchema));
+            }
+            LOGGER.warn("Alter columns is empty, origin schema: {}", 
originSchema);
+            return;
+        }
+        Map<SchemaChangeType, List<AlterColumn>> typeMap = new 
LinkedHashMap<>();
+        for (AlterColumn alterColumn : operation.getAlterColumns()) {
+            Set<SchemaChangeType> types = null;
+            try {
+                types = SchemaChangeUtils.extractSchemaChangeType(alterColumn);
+                Preconditions.checkState(!types.isEmpty(), "Schema change 
types is empty");
+            } catch (Exception e) {
+                if (exceptionPolicy == 
SchemaUpdateExceptionPolicy.THROW_WITH_STOP) {
+                    throw new SchemaChangeHandleException(
+                            String.format("Extract schema change type failed, 
origin schema: %s", originSchema), e);
+                }
+                LOGGER.warn("Extract schema change type failed, origin schema: 
{}", originSchema, e);
+            }
+            if (types == null) {
+                continue;
+            }
+            if (types.size() == 1) {
+                SchemaChangeType type = types.stream().findFirst().get();
+                typeMap.computeIfAbsent(type, k -> new 
ArrayList<>()).add(alterColumn);
+            } else {
+                // Handle change column, it only exists change column type and 
rename column in this scenario for now.
+                for (SchemaChangeType type : types) {
+                    SchemaChangePolicy policy = policyMap.get(type);
+                    if (policy == SchemaChangePolicy.ENABLE) {
+                        LOGGER.warn("Unsupported for {}: {}", type, 
originSchema);
+                    } else {
+                        doSchemaChangeBase(type, policy, originSchema);
+                    }
+                }
+            }
+        }
+        if (!typeMap.isEmpty()) {
+            doAlterOperation(database, table, originData, originSchema, data, 
typeMap);
+        }
+    }
+
+    private void doAlterOperation(String database, String table, byte[] 
originData, String originSchema, JsonNode data,
+            Map<SchemaChangeType, List<AlterColumn>> typeMap) {
+        StringJoiner joiner = new StringJoiner(",");
+        for (Entry<SchemaChangeType, List<AlterColumn>> kv : 
typeMap.entrySet()) {
+            SchemaChangePolicy policy = policyMap.get(kv.getKey());
+            doSchemaChangeBase(kv.getKey(), policy, originSchema);
+            if (policy == SchemaChangePolicy.ENABLE) {
+                String alterStatement = null;
+                try {
+                    switch (kv.getKey()) {
+                        case ADD_COLUMN:
+                            alterStatement = doAddColumn(kv.getValue());
+                            break;
+                        case DROP_COLUMN:
+                            alterStatement = doDropColumn(kv.getValue());
+                            break;
+                        case RENAME_COLUMN:
+                            alterStatement = doRenameColumn(kv.getKey(), 
originSchema);
+                            break;
+                        case CHANGE_COLUMN_TYPE:
+                            alterStatement = doChangeColumnType(kv.getKey(), 
originSchema);
+                            break;
+                        default:
+                    }
+                } catch (Exception e) {
+                    if (exceptionPolicy == 
SchemaUpdateExceptionPolicy.THROW_WITH_STOP) {
+                        throw new SchemaChangeHandleException(
+                                String.format("Build alter statement failed, 
origin schema: %s", originSchema), e);
+                    }
+                    LOGGER.warn("Build alter statement failed, origin schema: 
{}", originSchema, e);
+                }
+                if (alterStatement != null) {
+                    joiner.add(alterStatement);
+                }
+            }
+        }
+        String statement = joiner.toString();
+        if (statement.length() != 0) {
+            try {
+                String alterStatementCommon = 
operationHelper.buildAlterStatementCommon(database, table);
+                statement = alterStatementCommon + statement;
+                // The checkLightSchemaChange is removed because most 
scenarios support it
+                boolean result = executeStatement(database, statement);
+                if (!result) {
+                    LOGGER.error("Alter table failed,statement: {}", 
statement);
+                    throw new SchemaChangeHandleException(String.format("Add 
column failed,statement: %s", statement));
+                }
+                LOGGER.info("Alter table success,statement: {}", statement);
+                reportMetric(database, table, originData.length);
+            } catch (Exception e) {
+                if (exceptionPolicy == 
SchemaUpdateExceptionPolicy.THROW_WITH_STOP) {
+                    throw new SchemaChangeHandleException(
+                            String.format("Alter table failed, origin schema: 
%s", originSchema), e);
+                }
+                handleDirtyData(data, originData, database, table, 
DirtyType.HANDLE_ALTER_TABLE_ERROR, e);
+            }
+        }
+    }
+
+    private String doChangeColumnType(SchemaChangeType type, String 
originSchema) {
+        LOGGER.warn("Unsupported for {}: {}", type, originSchema);
+        return null;
+    }
+
+    private String doRenameColumn(SchemaChangeType type, String originSchema) {
+        LOGGER.warn("Unsupported for {}: {}", type, originSchema);
+        return null;
+    }
+
+    private String doDropColumn(List<AlterColumn> alterColumns) {
+        return operationHelper.buildDropColumnStatement(alterColumns);
+    }
+
+    private String doAddColumn(List<AlterColumn> alterColumns) {
+        return operationHelper.buildAddColumnStatement(alterColumns);
+    }
+
+    private void doTruncateTable(SchemaChangeType type, String originSchema) {
+        SchemaChangePolicy policy = 
policyMap.get(SchemaChangeType.TRUNCATE_TABLE);
+        if (policy == SchemaChangePolicy.ENABLE) {
+            LOGGER.warn("Unsupported for {}: {}", type, originSchema);
+            return;
+        }
+        doSchemaChangeBase(type, policy, originSchema);
+    }
+
+    private void doRenameTable(SchemaChangeType type, String originSchema) {
+        SchemaChangePolicy policy = 
policyMap.get(SchemaChangeType.RENAME_TABLE);
+        if (policy == SchemaChangePolicy.ENABLE) {
+            LOGGER.warn("Unsupported for {}: {}", type, originSchema);
+            return;
+        }
+        doSchemaChangeBase(type, policy, originSchema);
+    }
+
+    private void doDropTable(SchemaChangeType type, String originSchema) {
+        SchemaChangePolicy policy = policyMap.get(SchemaChangeType.DROP_TABLE);
+        if (policy == SchemaChangePolicy.ENABLE) {
+            LOGGER.warn("Unsupported for {}: {}", type, originSchema);
+            return;
+        }
+        doSchemaChangeBase(type, policy, originSchema);
+    }
+
+    private void doCreateTable(byte[] originData, String database, String 
table, SchemaChangeType type,
+            String originSchema, JsonNode data, CreateTableOperation 
operation) {
+        SchemaChangePolicy policy = policyMap.get(type);
+        if (policy == SchemaChangePolicy.ENABLE) {
+            try {
+                List<String> primaryKeys = 
dynamicSchemaFormat.extractPrimaryKeyNames(data);
+                String stmt = 
operationHelper.buildCreateTableStatement(database, table, primaryKeys, 
operation);
+                boolean result = executeStatement(database, stmt);
+                if (!result) {
+                    LOGGER.error("Create table failed,statement: {}", stmt);
+                    throw new IOException(String.format("Create table 
failed,statement: %s", stmt));
+                }
+                reportMetric(database, table, originData.length);
+                return;
+            } catch (Exception e) {
+                if (exceptionPolicy == 
SchemaUpdateExceptionPolicy.THROW_WITH_STOP) {
+                    throw new SchemaChangeHandleException(
+                            String.format("Drop column failed, origin schema: 
%s", originSchema), e);
+                }
+                handleDirtyData(data, originData, database, table, 
DirtyType.CREATE_TABLE_ERROR, e);
+                return;
+            }
+        }
+        doSchemaChangeBase(type, policy, originSchema);
+    }
+
+    private void doSchemaChangeBase(SchemaChangeType type, SchemaChangePolicy 
policy, String schema) {
+        if (policy == null) {
+            LOGGER.warn("Unsupported for {}: {}", type, schema);
+            return;
+        }
+        switch (policy) {
+            case LOG:
+                LOGGER.warn("Unsupported for {}: {}", type, schema);
+                break;
+            case ERROR:
+                throw new 
SchemaChangeHandleException(String.format("Unsupported for %s: %s", type, 
schema));
+            default:
+        }
+    }
+
+    private Map<String, Object> buildRequestParam(String column, boolean 
dropColumn) {
+        Map<String, Object> params = new HashMap<>();
+        params.put("isDropColumn", dropColumn);
+        params.put("columnName", column);
+        return params;
+    }
+
+    private String authHeader() {
+        return "Basic " + new 
String(Base64.encodeBase64((options.getUsername() + ":"
+                + options.getPassword()).getBytes(StandardCharsets.UTF_8)));
+    }
+
+    private boolean executeStatement(String database, String stmt) throws 
IOException {
+        Map<String, String> param = new HashMap<>();
+        param.put("stmt", stmt);
+        String requestUrl = String.format(SCHEMA_CHANGE_API, 
options.getFenodes(), database);
+        HttpPost httpPost = new HttpPost(requestUrl);
+        httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
+        httpPost.setHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE_JSON);
+        httpPost.setEntity(new 
StringEntity(dynamicSchemaFormat.objectMapper.writeValueAsString(param)));
+        return sendRequest(httpPost);
+    }
+
+    private boolean checkLightSchemaChange(String database, String table, 
String column, boolean dropColumn)
+            throws IOException {
+        String url = String.format(CHECK_LIGHT_SCHEMA_CHANGE_API, 
options.getFenodes(), database, table);
+        Map<String, Object> param = buildRequestParam(column, dropColumn);
+        HttpGetEntity httpGet = new HttpGetEntity(url);
+        httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
+        httpGet.setEntity(new 
StringEntity(dynamicSchemaFormat.objectMapper.writeValueAsString(param)));
+        boolean success = sendRequest(httpGet);
+        if (!success) {
+            LOGGER.warn("schema change can not do table {}.{}", database, 
table);
+        }
+        return success;
+    }
+
+    @SuppressWarnings("unchecked")
+    private boolean sendRequest(HttpUriRequest request) {
+        try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
+            for (int i = 0; i < maxRetries; i++) {
+                try {
+                    CloseableHttpResponse response = 
httpclient.execute(request);
+                    final int statusCode = 
response.getStatusLine().getStatusCode();
+                    if (statusCode == HttpStatus.SC_OK && response.getEntity() 
!= null) {
+                        String loadResult = 
EntityUtils.toString(response.getEntity());
+                        Map<String, Object> responseMap = 
dynamicSchemaFormat.objectMapper
+                                .readValue(loadResult, Map.class);
+                        String code = responseMap.getOrDefault("code", 
"-1").toString();
+                        if (DORIS_HTTP_CALL_SUCCESS.equals(code)) {
+                            return true;
+                        }
+                        LOGGER.error("send request error: {}", loadResult);
+                    }
+                } catch (Exception e) {
+                    if (i >= maxRetries) {
+                        LOGGER.error("send http requests error", e);
+                        throw new IOException(e);
+                    }
+                    try {
+                        Thread.sleep(1000L * i);
+                    } catch (InterruptedException ex) {
+                        Thread.currentThread().interrupt();
+                        throw new IOException("unable to send http 
request,interrupted while doing another attempt", e);
+                    }
+                }
+            }
+        } catch (Exception e) {
+            LOGGER.error("send request error", e);
+            throw new SchemaChangeHandleException("send request error", e);
+        }
+        return false;
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
index 7017f18749..1ca00e92c8 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
@@ -30,7 +30,9 @@ import 
org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
 import org.apache.inlong.sort.base.util.CalculateObjectSizeUtils;
 import org.apache.inlong.sort.base.util.MetricStateUtils;
 import org.apache.inlong.sort.doris.model.RespContent;
+import org.apache.inlong.sort.doris.schema.SchemaChangeHelper;
 import org.apache.inlong.sort.doris.util.DorisParseUtils;
+import org.apache.inlong.sort.util.SchemaChangeUtils;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
@@ -149,6 +151,11 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
     private final String dynamicSchemaFormat;
     private final boolean ignoreSingleTableErrors;
     private final SchemaUpdateExceptionPolicy schemaUpdatePolicy;
+    private final String[] fieldNames;
+    private final LogicalType[] logicalTypes;
+    private final boolean enableSchemaChange;
+    @Nullable
+    private final String schemaChangePolicies;
     private long batchBytes = 0L;
     private int size;
     private DorisStreamLoad dorisStreamLoad;
@@ -160,15 +167,14 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
     private transient SinkTableMetricData metricData;
     private transient ListState<MetricState> metricStateListState;
     private transient MetricState metricState;
-    private final String[] fieldNames;
     private volatile boolean jsonFormat;
     private volatile RowData.FieldGetter[] fieldGetters;
     private String fieldDelimiter;
     private String lineDelimiter;
     private String columns;
-    private final LogicalType[] logicalTypes;
     private DirtySinkHelper<Object> dirtySinkHelper;
     private transient Schema schema;
+    private SchemaChangeHelper helper;
 
     public DorisDynamicSchemaOutputFormat(DorisOptions option,
             DorisReadOptions readOptions,
@@ -185,7 +191,9 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
             String auditHostAndPorts,
             boolean multipleSink,
             DirtyOptions dirtyOptions,
-            @Nullable DirtySink<Object> dirtySink) {
+            @Nullable DirtySink<Object> dirtySink,
+            boolean enableSchemaChange,
+            @Nullable String schemaChangePolicies) {
         this.options = option;
         this.readOptions = readOptions;
         this.executionOptions = executionOptions;
@@ -201,7 +209,8 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
         this.ignoreSingleTableErrors = ignoreSingleTableErrors;
         this.schemaUpdatePolicy = schemaUpdatePolicy;
         this.dirtySinkHelper = new DirtySinkHelper<>(dirtyOptions, dirtySink);
-
+        this.enableSchemaChange = enableSchemaChange;
+        this.schemaChangePolicies = schemaChangePolicies;
         handleStreamLoadProp();
     }
 
@@ -211,7 +220,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
      * @return builder
      */
     public static DorisDynamicSchemaOutputFormat.Builder builder() {
-        return new DorisDynamicSchemaOutputFormat.Builder();
+        return new Builder();
     }
 
     private void handleStreamLoadProp() {
@@ -274,9 +283,12 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
             }
         }
 
-        if (multipleSink && StringUtils.isNotBlank(dynamicSchemaFormat)) {
+        if (multipleSink) {
             jsonDynamicSchemaFormat =
                     (JsonDynamicSchemaFormat) 
DynamicSchemaFormatFactory.getFormat(dynamicSchemaFormat);
+            helper = SchemaChangeHelper.of(jsonDynamicSchemaFormat, options, 
enableSchemaChange,
+                    enableSchemaChange ? 
SchemaChangeUtils.deserialize(schemaChangePolicies) : null, databasePattern,
+                    tablePattern, executionOptions.getMaxRetries(), 
schemaUpdatePolicy, metricData, dirtySinkHelper);
         }
         MetricOption metricOption = MetricOption.builder()
                 .withInlongLabels(inlongMetric)
@@ -401,7 +413,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
             boolean isDDL = jsonDynamicSchemaFormat.extractDDLFlag(rootNode);
             if (isDDL) {
                 ddlNum.incrementAndGet();
-                // Ignore ddl change for now
+                helper.process(rowData.getBinary(0), rootNode);
                 return;
             }
             String tableIdentifier;
@@ -925,6 +937,8 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
         private String[] fieldNames;
         private DirtyOptions dirtyOptions;
         private DirtySink<Object> dirtySink;
+        private boolean enableSchemaChange;
+        private String schemaChangePolicies;
 
         public Builder() {
             this.optionsBuilder = 
DorisOptions.builder().setTableIdentifier("");
@@ -1021,6 +1035,16 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
             return this;
         }
 
+        public Builder setEnableSchemaChange(boolean enableSchemaChange) {
+            this.enableSchemaChange = enableSchemaChange;
+            return this;
+        }
+
+        public Builder setSchemaChangePolicies(String schemaChangePolicies) {
+            this.schemaChangePolicies = schemaChangePolicies;
+            return this;
+        }
+
         @SuppressWarnings({"rawtypes"})
         public DorisDynamicSchemaOutputFormat build() {
             LogicalType[] logicalTypes = null;
@@ -1044,7 +1068,9 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
                     auditHostAndPorts,
                     multipleSink,
                     dirtyOptions,
-                    dirtySink);
+                    dirtySink,
+                    enableSchemaChange,
+                    schemaChangePolicies);
         }
     }
 }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
index 802979ffc5..5f6e789bb6 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
@@ -22,6 +22,9 @@ import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import org.apache.inlong.sort.base.dirty.utils.DirtySinkFactoryUtils;
 import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
 import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
+import org.apache.inlong.sort.protocol.enums.SchemaChangePolicy;
+import org.apache.inlong.sort.protocol.enums.SchemaChangeType;
+import org.apache.inlong.sort.util.SchemaChangeUtils;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
@@ -43,8 +46,12 @@ import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.table.utils.TableSchemaUtils;
 
 import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 
@@ -67,6 +74,8 @@ import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
 import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_IGNORE_SINGLE_TABLE_ERRORS;
 import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY;
 import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
+import static org.apache.inlong.sort.base.Constants.SINK_SCHEMA_CHANGE_ENABLE;
+import static 
org.apache.inlong.sort.base.Constants.SINK_SCHEMA_CHANGE_POLICIES;
 
 /**
  * This class copy from {@link 
org.apache.doris.flink.table.DorisDynamicTableFactory}
@@ -176,6 +185,35 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
             .withDescription("the flush max bytes (includes all append, upsert 
and delete records), over this number"
                     + " in batch, will flush data. The default value is 
10MB.");
 
+    private static final Map<SchemaChangeType, List<SchemaChangePolicy>> 
SUPPORTS_POLICY_MAP = new HashMap<>();
+
+    static {
+        SUPPORTS_POLICY_MAP.put(SchemaChangeType.CREATE_TABLE,
+                Arrays.asList(SchemaChangePolicy.ENABLE, 
SchemaChangePolicy.IGNORE, SchemaChangePolicy.LOG,
+                        SchemaChangePolicy.ERROR));
+        SUPPORTS_POLICY_MAP.put(SchemaChangeType.DROP_TABLE,
+                Arrays.asList(SchemaChangePolicy.IGNORE, 
SchemaChangePolicy.LOG,
+                        SchemaChangePolicy.ERROR));
+        SUPPORTS_POLICY_MAP.put(SchemaChangeType.RENAME_TABLE,
+                Arrays.asList(SchemaChangePolicy.IGNORE, 
SchemaChangePolicy.LOG,
+                        SchemaChangePolicy.ERROR));
+        SUPPORTS_POLICY_MAP.put(SchemaChangeType.TRUNCATE_TABLE,
+                Arrays.asList(SchemaChangePolicy.IGNORE, 
SchemaChangePolicy.LOG,
+                        SchemaChangePolicy.ERROR));
+        SUPPORTS_POLICY_MAP.put(SchemaChangeType.ADD_COLUMN,
+                Arrays.asList(SchemaChangePolicy.ENABLE, 
SchemaChangePolicy.IGNORE, SchemaChangePolicy.LOG,
+                        SchemaChangePolicy.ERROR));
+        SUPPORTS_POLICY_MAP.put(SchemaChangeType.DROP_COLUMN,
+                Arrays.asList(SchemaChangePolicy.ENABLE, 
SchemaChangePolicy.IGNORE, SchemaChangePolicy.LOG,
+                        SchemaChangePolicy.ERROR));
+        SUPPORTS_POLICY_MAP.put(SchemaChangeType.RENAME_COLUMN,
+                Arrays.asList(SchemaChangePolicy.IGNORE, 
SchemaChangePolicy.LOG,
+                        SchemaChangePolicy.ERROR));
+        SUPPORTS_POLICY_MAP.put(SchemaChangeType.CHANGE_COLUMN_TYPE,
+                Arrays.asList(SchemaChangePolicy.IGNORE, 
SchemaChangePolicy.LOG,
+                        SchemaChangePolicy.ERROR));
+    }
+
     @Override
     public String factoryIdentifier() {
         return "doris-inlong";
@@ -223,6 +261,8 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
         options.add(INLONG_AUDIT);
         options.add(FactoryUtil.SINK_PARALLELISM);
         options.add(AUDIT_KEYS);
+        options.add(SINK_SCHEMA_CHANGE_ENABLE);
+        options.add(SINK_SCHEMA_CHANGE_POLICIES);
         return options;
     }
 
@@ -309,8 +349,10 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
         SchemaUpdateExceptionPolicy schemaUpdatePolicy = helper.getOptions()
                 
.getOptional(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY).orElse(SchemaUpdateExceptionPolicy.THROW_WITH_STOP);
         String sinkMultipleFormat = 
helper.getOptions().getOptional(SINK_MULTIPLE_FORMAT).orElse(null);
-        validateSinkMultiple(physicalSchema.toPhysicalRowDataType(),
-                multipleSink, sinkMultipleFormat, databasePattern, 
tablePattern);
+        boolean enableSchemaChange = 
helper.getOptions().get(SINK_SCHEMA_CHANGE_ENABLE);
+        String schemaChangePolicies = 
helper.getOptions().getOptional(SINK_SCHEMA_CHANGE_POLICIES).orElse(null);
+        validateSinkMultiple(physicalSchema.toPhysicalRowDataType(), 
multipleSink, sinkMultipleFormat,
+                databasePattern, tablePattern, enableSchemaChange, 
schemaChangePolicies);
         String inlongMetric = 
helper.getOptions().getOptional(INLONG_METRIC).orElse(INLONG_METRIC.defaultValue());
         String auditHostAndPorts = 
helper.getOptions().getOptional(INLONG_AUDIT).orElse(INLONG_AUDIT.defaultValue());
         Integer parallelism = 
helper.getOptions().getOptional(FactoryUtil.SINK_PARALLELISM).orElse(null);
@@ -333,11 +375,13 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
                 auditHostAndPorts,
                 parallelism,
                 dirtyOptions,
-                dirtySink);
+                dirtySink,
+                enableSchemaChange,
+                schemaChangePolicies);
     }
 
     private void validateSinkMultiple(DataType physicalDataType, boolean 
multipleSink, String sinkMultipleFormat,
-            String databasePattern, String tablePattern) {
+            String databasePattern, String tablePattern, boolean 
enableSchemaChange, String schemaChangePolicies) {
         if (multipleSink) {
             if (StringUtils.isBlank(databasePattern)) {
                 throw new ValidationException(
@@ -367,6 +411,21 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
                         "Only supports 'BYTES' or 'VARBINARY(n)' of 
PhysicalDataType "
                                 + "when the option 'sink.multiple.enable' is 
'true'");
             }
+            if (enableSchemaChange) {
+                Map<SchemaChangeType, SchemaChangePolicy> policyMap = 
SchemaChangeUtils
+                        .deserialize(schemaChangePolicies);
+                for (Entry<SchemaChangeType, SchemaChangePolicy> kv : 
policyMap.entrySet()) {
+                    List<SchemaChangePolicy> policies = 
SUPPORTS_POLICY_MAP.get(kv.getKey());
+                    if (policies == null) {
+                        throw new ValidationException(
+                                String.format("Unsupported type of 
schemage-change: %s", kv.getKey()));
+                    }
+                    if (!policies.contains(kv.getValue())) {
+                        throw new ValidationException(
+                                String.format("Unsupported policy of 
schemage-change: %s", kv.getValue()));
+                    }
+                }
+            }
         }
     }
 }
\ No newline at end of file
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
index cbec6fd682..be7c8b86b3 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
@@ -53,6 +53,9 @@ public class DorisDynamicTableSink implements 
DynamicTableSink {
     private final Integer parallelism;
     private final DirtyOptions dirtyOptions;
     private @Nullable final DirtySink<Object> dirtySink;
+    private final boolean enableSchemaChange;
+    @Nullable
+    private final String schemaChangePolicies;
 
     public DorisDynamicTableSink(DorisOptions options,
             DorisReadOptions readOptions,
@@ -68,7 +71,9 @@ public class DorisDynamicTableSink implements 
DynamicTableSink {
             String auditHostAndPorts,
             Integer parallelism,
             DirtyOptions dirtyOptions,
-            @Nullable DirtySink<Object> dirtySink) {
+            @Nullable DirtySink<Object> dirtySink,
+            boolean enableSchemaChange,
+            @Nullable String schemaChangePolicies) {
         this.options = options;
         this.readOptions = readOptions;
         this.executionOptions = executionOptions;
@@ -84,6 +89,8 @@ public class DorisDynamicTableSink implements 
DynamicTableSink {
         this.parallelism = parallelism;
         this.dirtyOptions = dirtyOptions;
         this.dirtySink = dirtySink;
+        this.enableSchemaChange = enableSchemaChange;
+        this.schemaChangePolicies = schemaChangePolicies;
     }
 
     @Override
@@ -114,7 +121,9 @@ public class DorisDynamicTableSink implements 
DynamicTableSink {
                 .setIgnoreSingleTableErrors(ignoreSingleTableErrors)
                 .setSchemaUpdatePolicy(schemaUpdatePolicy)
                 .setDirtyOptions(dirtyOptions)
-                .setDirtySink(dirtySink);
+                .setDirtySink(dirtySink)
+                .setEnableSchemaChange(enableSchemaChange)
+                .setSchemaChangePolicies(schemaChangePolicies);
         return SinkFunctionProvider.of(
                 new GenericDorisSinkFunction<>(builder.build()), parallelism);
     }
@@ -135,7 +144,9 @@ public class DorisDynamicTableSink implements 
DynamicTableSink {
                 auditHostAndPorts,
                 parallelism,
                 dirtyOptions,
-                dirtySink);
+                dirtySink,
+                enableSchemaChange,
+                schemaChangePolicies);
     }
 
     @Override
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/test/java/org/apache/inlong/sort/doris/schema/OperationHelperTest.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/test/java/org/apache/inlong/sort/doris/schema/OperationHelperTest.java
new file mode 100644
index 0000000000..2c3cc85d35
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/test/java/org/apache/inlong/sort/doris/schema/OperationHelperTest.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.doris.schema;
+
+import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
+import org.apache.inlong.sort.protocol.ddl.enums.AlterType;
+import org.apache.inlong.sort.protocol.ddl.enums.PositionType;
+import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn;
+import org.apache.inlong.sort.protocol.ddl.expressions.Column;
+import org.apache.inlong.sort.protocol.ddl.expressions.Position;
+import org.apache.inlong.sort.protocol.ddl.operations.CreateTableOperation;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Types;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * Test for {@link OperationHelper}
+ */
+public class OperationHelperTest {
+
+    private final Map<Integer, Column> allTypes2Columns =
+            ImmutableMap.<Integer, Column>builder()
+                    .put(Types.CHAR, new Column("c", 
Collections.singletonList("32"), Types.CHAR,
+                            new Position(PositionType.FIRST, null), true, 
"InLong", "a column"))
+                    .put(Types.VARCHAR, new Column("c", 
Collections.singletonList("32"), Types.VARCHAR,
+                            new Position(PositionType.FIRST, null), false, 
"InLong", "a column"))
+                    .put(Types.SMALLINT, new Column("c", 
Collections.singletonList("8"), Types.SMALLINT,
+                            new Position(PositionType.AFTER, "b"), true, 
"2023", "a column"))
+                    .put(Types.INTEGER, new Column("c", 
Collections.singletonList("11"), Types.INTEGER,
+                            new Position(PositionType.AFTER, "b"), true, 
"2023", "a column"))
+                    .put(Types.BIGINT, new Column("c", 
Collections.singletonList("16"), Types.BIGINT,
+                            new Position(PositionType.AFTER, "b"), true, 
"2023", "a column"))
+                    .put(Types.REAL,
+                            new Column("c", Arrays.asList("11", "2"), 
Types.REAL, new Position(PositionType.AFTER, "b"),
+                                    true, "99.99", "a column"))
+                    .put(Types.DOUBLE, new Column("c", Arrays.asList("11", 
"2"), Types.DOUBLE,
+                            new Position(PositionType.AFTER, "b"), true, 
"99.99", "a column"))
+                    .put(Types.FLOAT, new Column("c", Arrays.asList("11", 
"2"), Types.FLOAT,
+                            new Position(PositionType.AFTER, "b"), true, 
"99.99", "a column"))
+                    .put(Types.DECIMAL, new Column("c", Arrays.asList("11", 
"2"), Types.DECIMAL,
+                            new Position(PositionType.AFTER, "b"), true, 
"99.99", "a column"))
+                    .put(Types.NUMERIC, new Column("c", Arrays.asList("11", 
"2"), Types.NUMERIC,
+                            new Position(PositionType.AFTER, "b"), true, 
"99.99", "a column"))
+                    .put(Types.BIT,
+                            new Column("c", null, Types.BIT, new 
Position(PositionType.AFTER, "b"), true, "false",
+                                    "a column"))
+                    .put(Types.TIME,
+                            new Column("c", null, Types.TIME, new 
Position(PositionType.AFTER, "b"), true, "10:30",
+                                    "a column"))
+                    .put(Types.TIME_WITH_TIMEZONE,
+                            new Column("c", null, Types.TIME_WITH_TIMEZONE, 
new Position(PositionType.AFTER, "b"),
+                                    true, "10:30", "a column"))
+                    .put(Types.TIMESTAMP_WITH_TIMEZONE,
+                            new Column("c", null, 
Types.TIMESTAMP_WITH_TIMEZONE, new Position(PositionType.AFTER, "b"),
+                                    true, "2023-01-01 10:30", "a column"))
+                    .put(Types.TIMESTAMP, new Column("c", null, 
Types.TIMESTAMP, new Position(PositionType.AFTER, "b"),
+                            true, "2023-01-01 10:30", "a column"))
+                    .put(Types.BINARY, new Column("c", null, Types.BINARY, new 
Position(PositionType.AFTER, "b"),
+                            true, "this is a BINARY", "a column"))
+                    .put(Types.VARBINARY, new Column("c", null, Types.BINARY, 
new Position(PositionType.AFTER, "b"),
+                            true, "this is a VARBINARY", "a column"))
+                    .put(Types.BLOB,
+                            new Column("c", null, Types.BLOB, new 
Position(PositionType.AFTER, "b"), true,
+                                    "this is a BLOB",
+                                    "a column"))
+                    .put(Types.CLOB,
+                            new Column("c", null, Types.CLOB, new 
Position(PositionType.AFTER, "b"), true,
+                                    "this is a CLOB",
+                                    "a column"))
+                    .put(Types.DATE,
+                            new Column("c", null, Types.DATE, new 
Position(PositionType.AFTER, "b"), true, "2023-01-01",
+                                    "a column"))
+                    .put(Types.BOOLEAN,
+                            new Column("c", null, Types.BOOLEAN, new 
Position(PositionType.AFTER, "b"), true, "true",
+                                    "a column"))
+                    .put(Types.LONGNVARCHAR,
+                            new Column("c", null, Types.LONGNVARCHAR, new 
Position(PositionType.AFTER, "b"),
+                                    true, "this is a LONGNVARCHAR", "a 
column"))
+                    .put(Types.LONGVARBINARY,
+                            new Column("c", null, Types.LONGVARBINARY, new 
Position(PositionType.AFTER, "b"),
+                                    true, "this is a LONGVARBINARY", "a 
column"))
+                    .put(Types.LONGVARCHAR,
+                            new Column("c", null, Types.LONGVARCHAR, new 
Position(PositionType.AFTER, "b"),
+                                    true, "this is a LONGVARCHAR", "a column"))
+                    .put(Types.ARRAY,
+                            new Column("c", null, Types.ARRAY, new 
Position(PositionType.AFTER, "b"), true,
+                                    "this is a ARRAY",
+                                    "a column"))
+                    .put(Types.NCHAR,
+                            new Column("c", null, Types.NCHAR, new 
Position(PositionType.AFTER, "b"), true,
+                                    "this is a NCHAR",
+                                    "a column"))
+                    .put(Types.NCLOB,
+                            new Column("c", null, Types.NCLOB, new 
Position(PositionType.AFTER, "b"), true,
+                                    "this is a NCLOB",
+                                    "a column"))
+                    .put(Types.TINYINT, new Column("c", 
Collections.singletonList("1"), Types.TINYINT,
+                            new Position(PositionType.FIRST, null), true, "1", 
"a column"))
+                    .put(Types.OTHER,
+                            new Column("c", null, Types.OTHER, new 
Position(PositionType.AFTER, "b"), true,
+                                    "this is a OTHER",
+                                    "a column"))
+                    .build();
+    private final Map<Integer, String> addColumnStatements =
+            ImmutableMap.<Integer, String>builder()
+                    .put(Types.CHAR,
+                            "ADD COLUMN `c` CHAR(32) DEFAULT 'InLong' COMMENT 
'a column' FIRST")
+                    .put(Types.VARCHAR,
+                            "ADD COLUMN `c` VARCHAR(96) NOT NULL DEFAULT 
'InLong' COMMENT 'a column' FIRST")
+                    .put(Types.SMALLINT,
+                            "ADD COLUMN `c` SMALLINT(8) DEFAULT '2023' COMMENT 
'a column' AFTER `b`")
+                    .put(Types.INTEGER,
+                            "ADD COLUMN `c` INT(11) DEFAULT '2023' COMMENT 'a 
column' AFTER `b`")
+                    .put(Types.BIGINT,
+                            "ADD COLUMN `c` BIGINT(16) DEFAULT '2023' COMMENT 
'a column' AFTER `b`")
+                    .put(Types.REAL,
+                            "ADD COLUMN `c` DECIMAL(11, 2) DEFAULT '99.99' 
COMMENT 'a column' AFTER `b`")
+                    .put(Types.DOUBLE,
+                            "ADD COLUMN `c` DOUBLE DEFAULT '99.99' COMMENT 'a 
column' AFTER `b`")
+                    .put(Types.FLOAT,
+                            "ADD COLUMN `c` FLOAT DEFAULT '99.99' COMMENT 'a 
column' AFTER `b`")
+                    .put(Types.DECIMAL,
+                            "ADD COLUMN `c` DECIMAL(11, 2) DEFAULT '99.99' 
COMMENT 'a column' AFTER `b`")
+                    .put(Types.NUMERIC,
+                            "ADD COLUMN `c` DECIMAL(11, 2) DEFAULT '99.99' 
COMMENT 'a column' AFTER `b`")
+                    .put(Types.BIT,
+                            "ADD COLUMN `c` BOOLEAN  DEFAULT 'false' COMMENT 
'a column' AFTER `b`")
+                    .put(Types.TIME,
+                            "ADD COLUMN `c` STRING DEFAULT '10:30' COMMENT 'a 
column' AFTER `b`")
+                    .put(Types.TIME_WITH_TIMEZONE,
+                            "ADD COLUMN `c` STRING DEFAULT '10:30' COMMENT 'a 
column' AFTER `b`")
+                    .put(Types.TIMESTAMP_WITH_TIMEZONE,
+                            "ADD COLUMN `c` DATETIME DEFAULT '2023-01-01 
10:30' COMMENT 'a column' AFTER `b`")
+                    .put(Types.TIMESTAMP,
+                            "ADD COLUMN `c` DATETIME DEFAULT '2023-01-01 
10:30' COMMENT 'a column' AFTER `b`")
+                    .put(Types.BINARY,
+                            "ADD COLUMN `c` STRING DEFAULT 'this is a BINARY' 
COMMENT 'a column' AFTER `b`")
+                    .put(Types.VARBINARY,
+                            "ADD COLUMN `c` STRING DEFAULT 'this is a 
VARBINARY' COMMENT 'a column' AFTER `b`")
+                    .put(Types.BLOB,
+                            "ADD COLUMN `c` STRING DEFAULT 'this is a BLOB' 
COMMENT 'a column' AFTER `b`")
+                    .put(Types.CLOB,
+                            "ADD COLUMN `c` STRING DEFAULT 'this is a CLOB' 
COMMENT 'a column' AFTER `b`")
+                    .put(Types.DATE,
+                            "ADD COLUMN `c` DATE DEFAULT '2023-01-01' COMMENT 
'a column' AFTER `b`")
+                    .put(Types.BOOLEAN,
+                            "ADD COLUMN `c` BOOLEAN DEFAULT 'true' COMMENT 'a 
column' AFTER `b`")
+                    .put(Types.LONGNVARCHAR,
+                            "ADD COLUMN `c` STRING DEFAULT 'this is a 
LONGNVARCHAR' COMMENT 'a column' AFTER `b`")
+                    .put(Types.LONGVARBINARY,
+                            "ADD COLUMN `c` STRING DEFAULT 'this is a 
LONGVARBINARY' COMMENT 'a column' AFTER `b`")
+                    .put(Types.LONGVARCHAR,
+                            "ADD COLUMN `c` STRING DEFAULT 'this is a 
LONGVARCHAR' COMMENT 'a column' AFTER `b`")
+                    .put(Types.ARRAY,
+                            "ADD COLUMN `c` STRING DEFAULT 'this is a ARRAY' 
COMMENT 'a column' AFTER `b`")
+                    .put(Types.NCHAR,
+                            "ADD COLUMN `c` STRING DEFAULT 'this is a NCHAR' 
COMMENT 'a column' AFTER `b`")
+                    .put(Types.NCLOB,
+                            "ADD COLUMN `c` STRING DEFAULT 'this is a NCLOB' 
COMMENT 'a column' AFTER `b`")
+                    .put(Types.TINYINT,
+                            "ADD COLUMN `c` TINYINT(1) DEFAULT '1' COMMENT 'a 
column' FIRST")
+                    .put(Types.OTHER,
+                            "ADD COLUMN `c` STRING DEFAULT 'this is a OTHER' 
COMMENT 'a column' AFTER `b`")
+                    .build();
+    private OperationHelper helper;
+
+    @Before
+    public void init() {
+        helper = OperationHelper.of(
+                (JsonDynamicSchemaFormat) 
DynamicSchemaFormatFactory.getFormat("canal-json"));
+    }
+
+    /**
+     * Test for {@link OperationHelper#buildAddColumnStatement(List)}
+     */
+    @Test
+    public void testBuildAddColumnStatement() {
+        for (Entry<Integer, Column> kv : allTypes2Columns.entrySet()) {
+            Assert.assertEquals(addColumnStatements.get(kv.getKey()),
+                    
helper.buildAddColumnStatement(Collections.singletonList(new AlterColumn(
+                            AlterType.ADD_COLUMN, kv.getValue(), null))));
+        }
+    }
+
+    /**
+     * Test for {@link OperationHelper#buildDropColumnStatement(List)}
+     */
+    @Test
+    public void testBuildDropColumnStatement() {
+        for (Entry<Integer, Column> kv : allTypes2Columns.entrySet()) {
+            Assert.assertEquals("DROP COLUMN `c`",
+                    helper.buildDropColumnStatement(Collections.singletonList(
+                            new AlterColumn(AlterType.DROP_COLUMN, null, 
kv.getValue()))));
+        }
+
+    }
+
+    /**
+     * Test for {@link OperationHelper#buildCreateTableStatement(String, 
String, List, CreateTableOperation)}
+     */
+    @Test
+    public void testBuildCreateTableStatement() {
+        List<String> primaryKeys = Arrays.asList("a", "b");
+        List<Column> columns = Arrays.asList(new Column("a", 
Collections.singletonList("32"), Types.VARCHAR,
+                new Position(PositionType.FIRST, null), false, "InLong", "a 
column"),
+                new Column("b", Collections.singletonList("32"), Types.VARCHAR,
+                        new Position(PositionType.FIRST, null), false, 
"InLong", "a column"),
+                new Column("c", Collections.singletonList("32"), Types.VARCHAR,
+                        new Position(PositionType.FIRST, null), true, 
"InLong", "a column"),
+                new Column("d", Collections.singletonList("32"), Types.VARCHAR,
+                        new Position(PositionType.FIRST, null), true, 
"InLong", "a column"));
+        CreateTableOperation operation = new CreateTableOperation();
+        operation.setComment("create table auto");
+        operation.setColumns(columns);
+        String database = "inlong_database";
+        String table = "inlong_table";
+        Assert.assertEquals("CREATE TABLE IF NOT EXISTS 
`inlong_database`.`inlong_table`(\n"
+                + "\t`a` VARCHAR(96) NOT NULL DEFAULT 'InLong' COMMENT 'a 
column',\n"
+                + "\t`b` VARCHAR(96) NOT NULL DEFAULT 'InLong' COMMENT 'a 
column',\n"
+                + "\t`c` VARCHAR(96) DEFAULT 'InLong' COMMENT 'a column',\n"
+                + "\t`d` VARCHAR(96) DEFAULT 'InLong' COMMENT 'a column'\n"
+                + ")\n"
+                + "UNIQUE KEY(`a`,`b`)\n"
+                + "COMMENT 'create table auto'\n"
+                + "DISTRIBUTED BY HASH(`a`,`b`)\n"
+                + "PROPERTIES (\n"
+                + "\t\"light_schema_change\" = \"true\"\n"
+                + ")",
+                helper.buildCreateTableStatement(database, table, primaryKeys, 
operation));
+    }
+}

Reply via email to