This is an automated email from the ASF dual-hosted git repository. vernedeng pushed a commit to branch branch-1.8 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 0e1b97ac83a8359393299d11139374d16db8ed8c Author: yunqingmoswu <44659300+yunqingmo...@users.noreply.github.com> AuthorDate: Mon Jul 17 10:53:45 2023 +0800 [INLONG-7763][Sort] Support ddl change for doris (#7764) (cherry picked from commit 186e8a1325a28c744b535230b7404c6eea979100) --- .../sort/protocol/node/load/DorisLoadNode.java | 42 +- .../inlong/sort/base/dirty/DirtyOptions.java | 6 + .../sort/base/format/JsonDynamicSchemaFormat.java | 2 + .../sort-flink-v1.13/sort-connectors/doris/pom.xml | 18 + .../inlong/sort/doris/http/HttpGetEntity.java | 40 ++ .../inlong/sort/doris/schema/OperationHelper.java | 308 +++++++++++++ .../sort/doris/schema/SchemaChangeHelper.java | 476 +++++++++++++++++++++ .../table/DorisDynamicSchemaOutputFormat.java | 42 +- .../sort/doris/table/DorisDynamicTableFactory.java | 67 ++- .../sort/doris/table/DorisDynamicTableSink.java | 17 +- .../sort/doris/schema/OperationHelperTest.java | 256 +++++++++++ 11 files changed, 1258 insertions(+), 16 deletions(-) diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DorisLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DorisLoadNode.java index a3ca5dd338..3e3ac93553 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DorisLoadNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DorisLoadNode.java @@ -21,10 +21,13 @@ import org.apache.inlong.sort.protocol.FieldInfo; import org.apache.inlong.sort.protocol.InlongMetric; import org.apache.inlong.sort.protocol.constant.DorisConstant; import org.apache.inlong.sort.protocol.enums.FilterStrategy; +import org.apache.inlong.sort.protocol.enums.SchemaChangePolicy; +import org.apache.inlong.sort.protocol.enums.SchemaChangeType; import org.apache.inlong.sort.protocol.node.LoadNode; import org.apache.inlong.sort.protocol.node.format.Format; import org.apache.inlong.sort.protocol.transformation.FieldRelation; import org.apache.inlong.sort.protocol.transformation.FilterFunction; +import org.apache.inlong.sort.util.SchemaChangeUtils; import com.google.common.base.Preconditions; import lombok.Data; @@ -95,6 +98,11 @@ public class DorisLoadNode extends LoadNode implements InlongMetric, Serializabl @Nullable @JsonProperty("tablePattern") private String tablePattern; + @JsonProperty("enableSchemaChange") + private boolean enableSchemaChange; + @Nullable + @JsonProperty("policyMap") + private Map<SchemaChangeType, SchemaChangePolicy> policyMap; public DorisLoadNode(@JsonProperty("id") String id, @JsonProperty("name") String name, @@ -114,7 +122,6 @@ public class DorisLoadNode extends LoadNode implements InlongMetric, Serializabl null, null); } - @JsonCreator public DorisLoadNode(@JsonProperty("id") String id, @JsonProperty("name") String name, @JsonProperty("fields") List<FieldInfo> fields, @@ -132,6 +139,31 @@ public class DorisLoadNode extends LoadNode implements InlongMetric, Serializabl @Nullable @JsonProperty("sinkMultipleFormat") Format sinkMultipleFormat, @Nullable @JsonProperty("databasePattern") String databasePattern, @Nullable @JsonProperty("tablePattern") String tablePattern) { + this(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties, feNodes, userName, + password, tableIdentifier, primaryKey, sinkMultipleEnable, sinkMultipleFormat, databasePattern, + tablePattern, false, null); + } + + @JsonCreator + public DorisLoadNode(@JsonProperty("id") String id, + @JsonProperty("name") String name, + @JsonProperty("fields") List<FieldInfo> fields, + @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations, + @JsonProperty("filters") List<FilterFunction> filters, + @JsonProperty("filterStrategy") FilterStrategy filterStrategy, + @Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism, + @JsonProperty("properties") Map<String, String> properties, + @Nonnull @JsonProperty("feNodes") String feNodes, + @Nonnull @JsonProperty("username") String userName, + @Nonnull @JsonProperty("password") String password, + @Nullable @JsonProperty("tableIdentifier") String tableIdentifier, + @JsonProperty("primaryKey") String primaryKey, + @Nullable @JsonProperty(value = "sinkMultipleEnable", defaultValue = "false") Boolean sinkMultipleEnable, + @Nullable @JsonProperty("sinkMultipleFormat") Format sinkMultipleFormat, + @Nullable @JsonProperty("databasePattern") String databasePattern, + @Nullable @JsonProperty("tablePattern") String tablePattern, + @JsonProperty("enableSchemaChange") boolean enableSchemaChange, + @Nullable @JsonProperty("policyMap") Map<SchemaChangeType, SchemaChangePolicy> policyMap) { super(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties); this.feNodes = Preconditions.checkNotNull(feNodes, "feNodes is null"); this.userName = Preconditions.checkNotNull(userName, "username is null"); @@ -146,6 +178,10 @@ public class DorisLoadNode extends LoadNode implements InlongMetric, Serializabl this.sinkMultipleFormat = Preconditions.checkNotNull(sinkMultipleFormat, "sinkMultipleFormat is null"); } + this.enableSchemaChange = enableSchemaChange; + this.policyMap = policyMap; + Preconditions.checkState(!enableSchemaChange || policyMap != null && !policyMap.isEmpty(), + "policyMap is empty when enableSchemaChange is 'true'"); } @Override @@ -160,6 +196,10 @@ public class DorisLoadNode extends LoadNode implements InlongMetric, Serializabl options.put(SINK_MULTIPLE_FORMAT, Objects.requireNonNull(sinkMultipleFormat).identifier()); options.put(SINK_MULTIPLE_DATABASE_PATTERN, databasePattern); options.put(SINK_MULTIPLE_TABLE_PATTERN, tablePattern); + if (enableSchemaChange) { + options.put("sink.schema-change.enable", "true"); + options.put("sink.schema-change.policies", SchemaChangeUtils.serialize(policyMap)); + } } else { options.put(SINK_MULTIPLE_ENABLE, "false"); options.put(DorisConstant.TABLE_IDENTIFIER, tableIdentifier); diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyOptions.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyOptions.java index 58cfe1f98a..791cba0d71 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyOptions.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyOptions.java @@ -17,6 +17,8 @@ package org.apache.inlong.sort.base.dirty; +import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy; + import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.ValidationException; @@ -29,6 +31,7 @@ import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_ENABLE; import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_IGNORE_ERRORS; import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_LABELS; import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_LOG_TAG; +import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY; /** * Dirty common options @@ -64,6 +67,9 @@ public class DirtyOptions implements Serializable { */ public static DirtyOptions fromConfig(ReadableConfig config) { boolean ignoreDirty = config.get(DIRTY_IGNORE); + if (config.get(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY) == SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE) { + ignoreDirty = true; + } boolean enableDirtySink = config.get(DIRTY_SIDE_OUTPUT_ENABLE); boolean ignoreSinkError = config.get(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS); String dirtyConnector = config.getOptional(DIRTY_SIDE_OUTPUT_CONNECTOR).orElse(null); diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java index 51e08f8852..e8bef34d90 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java @@ -61,6 +61,8 @@ import static org.apache.inlong.sort.formats.json.utils.FormatJsonUtil.SQL_TYPE_ @SuppressWarnings("LanguageDetectionInspection") public abstract class JsonDynamicSchemaFormat extends AbstractDynamicSchemaFormat<JsonNode> { + public static final int DEFAULT_DECIMAL_PRECISION = 15; + public static final int DEFAULT_DECIMAL_SCALE = 5; private static final Logger LOG = LoggerFactory.getLogger(JsonDynamicSchemaFormat.class); /** * The first item of array diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/pom.xml index 151efe8059..24dc89e825 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/pom.xml @@ -42,6 +42,24 @@ <artifactId>flink-doris-connector-${flink.minor.version}_${flink.scala.binary.version}</artifactId> <version>${flink.connector.doris.version}</version> </dependency> + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-common</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-format-json-v1.13</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-json</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> </dependencies> <build> diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/http/HttpGetEntity.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/http/HttpGetEntity.java new file mode 100644 index 0000000000..217d37d3ab --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/http/HttpGetEntity.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.doris.http; + +import org.apache.http.client.methods.HttpEntityEnclosingRequestBase; + +import java.net.URI; + +/** + * Http entity with get + */ +public class HttpGetEntity extends HttpEntityEnclosingRequestBase { + + private final static String METHOD = "GET"; + + public HttpGetEntity(String uri) { + super(); + setURI(URI.create(uri)); + } + + @Override + public String getMethod() { + return METHOD; + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/OperationHelper.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/OperationHelper.java new file mode 100644 index 0000000000..c00cf81d2d --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/OperationHelper.java @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.doris.schema; + +import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat; +import org.apache.inlong.sort.protocol.ddl.enums.PositionType; +import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn; +import org.apache.inlong.sort.protocol.ddl.expressions.Column; +import org.apache.inlong.sort.protocol.ddl.operations.CreateTableOperation; + +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.table.types.logical.CharType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.util.Preconditions; + +import java.sql.Types; +import java.util.Iterator; +import java.util.List; +import java.util.StringJoiner; + +public class OperationHelper { + + private static final String APOSTROPHE = "'"; + private static final String DOUBLE_QUOTES = "\""; + private final JsonDynamicSchemaFormat dynamicSchemaFormat; + private final int VARCHAR_MAX_LENGTH = 65533; + + private OperationHelper(JsonDynamicSchemaFormat dynamicSchemaFormat) { + this.dynamicSchemaFormat = dynamicSchemaFormat; + } + + public static OperationHelper of(JsonDynamicSchemaFormat dynamicSchemaFormat) { + return new OperationHelper(dynamicSchemaFormat); + } + + private String convert2DorisType(int jdbcType, boolean isNullable, List<String> precisions) { + String type = null; + switch (jdbcType) { + case Types.BOOLEAN: + case Types.DATE: + case Types.FLOAT: + case Types.DOUBLE: + type = dynamicSchemaFormat.sqlType2FlinkType(jdbcType).copy(isNullable).asSummaryString(); + break; + case Types.TINYINT: + case Types.SMALLINT: + case Types.INTEGER: + case Types.BIGINT: + if (precisions != null && !precisions.isEmpty()) { + type = String.format("%s(%s)%s", dynamicSchemaFormat.sqlType2FlinkType(jdbcType).asSummaryString(), + StringUtils.join(precisions, ","), isNullable ? "" : " NOT NULL"); + } else { + type = dynamicSchemaFormat.sqlType2FlinkType(jdbcType).copy(isNullable).asSummaryString(); + } + break; + case Types.DECIMAL: + DecimalType decimalType = (DecimalType) dynamicSchemaFormat.sqlType2FlinkType(jdbcType); + if (precisions != null && !precisions.isEmpty()) { + Preconditions.checkState(precisions.size() < 3, + "The length of precisions with DECIMAL must small than 3"); + int precision = Integer.parseInt(precisions.get(0)); + int scale = JsonDynamicSchemaFormat.DEFAULT_DECIMAL_SCALE; + if (precisions.size() == 2) { + scale = Integer.parseInt(precisions.get(1)); + } + decimalType = new DecimalType(isNullable, precision, scale); + } else { + decimalType = new DecimalType(isNullable, decimalType.getPrecision(), decimalType.getScale()); + } + type = decimalType.asSummaryString(); + break; + case Types.CHAR: + LogicalType charType = dynamicSchemaFormat.sqlType2FlinkType(jdbcType); + if (precisions != null && !precisions.isEmpty()) { + Preconditions.checkState(precisions.size() == 1, + "The length of precisions with CHAR must be 1"); + charType = new CharType(isNullable, Integer.parseInt(precisions.get(0))); + } else { + charType = charType.copy(isNullable); + } + type = charType.asSerializableString(); + break; + case Types.VARCHAR: + LogicalType varcharType = dynamicSchemaFormat.sqlType2FlinkType(jdbcType); + if (precisions != null && !precisions.isEmpty()) { + Preconditions.checkState(precisions.size() == 1, + "The length of precisions with VARCHAR must be 1"); + // Because the precision definition of varchar by Doris is different from that of MySQL. + // The precision in MySQL is the number of characters, while Doris is the number of bytes, + // and Chinese characters occupy 3 bytes, so the precision multiplys by 3 here. + int precision = Math.min(Integer.parseInt(precisions.get(0)) * 3, VARCHAR_MAX_LENGTH); + varcharType = new VarCharType(isNullable, precision); + } else { + varcharType = varcharType.copy(isNullable); + } + type = varcharType.asSerializableString(); + break; + // The following types are not directly supported in doris, + // and can only be converted to compatible types as much as possible + case Types.TIME: + case Types.TIME_WITH_TIMEZONE: + case Types.BINARY: + case Types.VARBINARY: + case Types.BLOB: + case Types.CLOB: + case Types.LONGNVARCHAR: + case Types.LONGVARBINARY: + case Types.LONGVARCHAR: + case Types.ARRAY: + case Types.NCHAR: + case Types.NCLOB: + case Types.OTHER: + type = String.format("STRING%s", isNullable ? "" : " NOT NULL"); + break; + case Types.TIMESTAMP_WITH_TIMEZONE: + case Types.TIMESTAMP: + type = "DATETIME"; + break; + case Types.REAL: + case Types.NUMERIC: + int precision = JsonDynamicSchemaFormat.DEFAULT_DECIMAL_PRECISION; + int scale = JsonDynamicSchemaFormat.DEFAULT_DECIMAL_SCALE; + if (precisions != null && !precisions.isEmpty()) { + Preconditions.checkState(precisions.size() < 3, + "The length of precisions with NUMERIC must small than 3"); + precision = Integer.parseInt(precisions.get(0)); + if (precisions.size() == 2) { + scale = Integer.parseInt(precisions.get(1)); + } + } + decimalType = new DecimalType(isNullable, precision, scale); + type = decimalType.asSerializableString(); + break; + case Types.BIT: + type = String.format("BOOLEAN %s", isNullable ? "" : " NOT NULL"); + break; + default: + type = String.format("STRING%s", isNullable ? "" : " NOT NULL"); + } + return type; + } + + /** + * Build the statement of AddColumn + * + * @param alterColumns The list of AlterColumn + * @return A statement of AddColumn + */ + public String buildAddColumnStatement(List<AlterColumn> alterColumns) { + Preconditions.checkState(alterColumns != null + && !alterColumns.isEmpty(), "Alter columns is empty"); + Iterator<AlterColumn> iterator = alterColumns.iterator(); + StringBuilder sb = new StringBuilder(); + while (iterator.hasNext()) { + AlterColumn expression = iterator.next(); + Preconditions.checkNotNull(expression.getNewColumn(), "New column is null"); + Column column = expression.getNewColumn(); + Preconditions.checkState(column.getName() != null && !column.getName().trim().isEmpty(), + "The column name is blank"); + sb.append("ADD COLUMN `").append(column.getName()).append("` ") + .append(convert2DorisType(expression.getNewColumn().getJdbcType(), + column.isNullable(), column.getDefinition())); + if (validDefaultValue(column.getDefaultValue())) { + sb.append(" DEFAULT ").append(quote(column.getDefaultValue())); + } + if (column.getComment() != null) { + sb.append(" COMMENT ").append(quote(column.getComment())); + } + if (column.getPosition() != null && column.getPosition().getPositionType() != null) { + if (column.getPosition().getPositionType() == PositionType.FIRST) { + sb.append(" FIRST"); + } else if (column.getPosition().getPositionType() == PositionType.AFTER) { + Preconditions.checkState(column.getPosition().getColumnName() != null + && !column.getPosition().getColumnName().trim().isEmpty(), + "The column name of Position is empty"); + sb.append(" AFTER `").append(column.getPosition().getColumnName()).append("`"); + } + } + if (iterator.hasNext()) { + sb.append(", "); + } + } + return sb.toString(); + } + + private String quote(String value) { + if (value == null) { + return "'null'"; + } + if (!value.startsWith(APOSTROPHE) && !value.startsWith(DOUBLE_QUOTES)) { + return String.format("'%s'", value); + } + return value; + } + + /** + * Build the statement of DropColumn + * + * @param alterColumns The list of AlterColumn + * @return A statement of DropColumn + */ + public String buildDropColumnStatement(List<AlterColumn> alterColumns) { + Preconditions.checkState(alterColumns != null + && !alterColumns.isEmpty(), "Alter columns is empty"); + Iterator<AlterColumn> iterator = alterColumns.iterator(); + StringBuilder sb = new StringBuilder(); + while (iterator.hasNext()) { + AlterColumn expression = iterator.next(); + Preconditions.checkNotNull(expression.getOldColumn(), "Old column is null"); + Column column = expression.getOldColumn(); + Preconditions.checkState(column.getName() != null && !column.getName().trim().isEmpty(), + "The column name is blank"); + sb.append("DROP COLUMN `").append(column.getName()).append("`"); + if (iterator.hasNext()) { + sb.append(","); + } + } + return sb.toString(); + } + + /** + * Build common statement of alter + * + * @param database The database of Doris + * @param table The table of Doris + * @return A statement of Alter table + */ + public String buildAlterStatementCommon(String database, String table) { + return "ALTER TABLE `" + database + "`.`" + table + "` "; + } + + private boolean validDefaultValue(String defaultValue) { + return defaultValue != null && !defaultValue.trim().isEmpty() && !"NULL" + .equalsIgnoreCase(defaultValue); + } + + /** + * Build the statement of CreateTable + * + * @param database The database of Doris + * @param table The table of Doris + * @param primaryKeys The primary key of Doris + * @param operation The Operation + * @return A statement of CreateTable + */ + public String buildCreateTableStatement(String database, String table, List<String> primaryKeys, + CreateTableOperation operation) { + StringBuilder sb = new StringBuilder(); + sb.append("CREATE TABLE IF NOT EXISTS `").append(database).append("`.`").append(table).append("`(\n"); + Preconditions.checkState(operation.getColumns() != null && !operation.getColumns().isEmpty(), + String.format("The columns of table: %s.%s is empty", database, table)); + Iterator<Column> iterator = operation.getColumns().iterator(); + StringJoiner joiner = new StringJoiner(","); + while (iterator.hasNext()) { + Column column = iterator.next(); + Preconditions.checkNotNull(column, "The column is null"); + Preconditions.checkState(column.getName() != null && !column.getName().trim().isEmpty(), + "The column name is blank"); + sb.append("\t`").append(column.getName()).append("` ").append(convert2DorisType(column.getJdbcType(), + column.isNullable(), column.getDefinition())); + if (validDefaultValue(column.getDefaultValue())) { + sb.append(" DEFAULT ").append(quote(column.getDefaultValue())); + } + if (column.getComment() != null) { + sb.append(" COMMENT ").append(quote(column.getComment())); + } + joiner.add(String.format("`%s`", column.getName())); + if (iterator.hasNext()) { + sb.append(",\n"); + } + } + sb.append("\n)\n"); + String model = "DUPLICATE"; + if (primaryKeys != null && !primaryKeys.isEmpty()) { + model = "UNIQUE"; + joiner = new StringJoiner(","); + for (String primaryKey : primaryKeys) { + joiner.add(String.format("`%s`", primaryKey)); + } + } + String keys = joiner.toString(); + sb.append(model).append(" KEY(").append(keys).append(")"); + if (StringUtils.isNotBlank(operation.getComment())) { + sb.append("\nCOMMENT ").append(quote(operation.getComment())); + } + sb.append("\nDISTRIBUTED BY HASH(").append(keys).append(")"); + // Add light schema change support for it if the version of doris is greater than 1.2.0 or equals 1.2.0 + sb.append("\nPROPERTIES (\n\t\"light_schema_change\" = \"true\"\n)"); + return sb.toString(); + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/SchemaChangeHelper.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/SchemaChangeHelper.java new file mode 100644 index 0000000000..effae22e1d --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/SchemaChangeHelper.java @@ -0,0 +1,476 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.doris.schema; + +import org.apache.inlong.sort.base.dirty.DirtySinkHelper; +import org.apache.inlong.sort.base.dirty.DirtyType; +import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat; +import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData; +import org.apache.inlong.sort.base.schema.SchemaChangeHandleException; +import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy; +import org.apache.inlong.sort.doris.http.HttpGetEntity; +import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn; +import org.apache.inlong.sort.protocol.ddl.operations.AlterOperation; +import org.apache.inlong.sort.protocol.ddl.operations.CreateTableOperation; +import org.apache.inlong.sort.protocol.ddl.operations.Operation; +import org.apache.inlong.sort.protocol.enums.SchemaChangePolicy; +import org.apache.inlong.sort.protocol.enums.SchemaChangeType; +import org.apache.inlong.sort.util.SchemaChangeUtils; + +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.shaded.org.apache.commons.codec.binary.Base64; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.util.Preconditions; +import org.apache.http.HttpHeaders; +import org.apache.http.HttpStatus; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.StringJoiner; + +/** + * Schema change helper + */ +public class SchemaChangeHelper { + + private static final Logger LOGGER = LoggerFactory.getLogger(SchemaChangeHelper.class); + + private static final String CHECK_LIGHT_SCHEMA_CHANGE_API = "http://%s/api/enable_light_schema_change/%s/%s"; + private static final String SCHEMA_CHANGE_API = "http://%s/api/query/default_cluster/%s"; + private static final String DORIS_HTTP_CALL_SUCCESS = "0"; + private static final String CONTENT_TYPE_JSON = "application/json"; + private final boolean schemaChange; + private final Map<SchemaChangeType, SchemaChangePolicy> policyMap; + private final DorisOptions options; + private final JsonDynamicSchemaFormat dynamicSchemaFormat; + private final String databasePattern; + private final String tablePattern; + private final int maxRetries; + private final OperationHelper operationHelper; + private final SchemaUpdateExceptionPolicy exceptionPolicy; + private final SinkTableMetricData metricData; + private final DirtySinkHelper<Object> dirtySinkHelper; + + private SchemaChangeHelper(JsonDynamicSchemaFormat dynamicSchemaFormat, DorisOptions options, boolean schemaChange, + Map<SchemaChangeType, SchemaChangePolicy> policyMap, String databasePattern, String tablePattern, + int maxRetries, SchemaUpdateExceptionPolicy exceptionPolicy, + SinkTableMetricData metricData, DirtySinkHelper<Object> dirtySinkHelper) { + this.dynamicSchemaFormat = Preconditions.checkNotNull(dynamicSchemaFormat, "dynamicSchemaFormat is null"); + this.options = Preconditions.checkNotNull(options, "doris options is null"); + this.schemaChange = schemaChange; + this.policyMap = policyMap; + this.databasePattern = databasePattern; + this.tablePattern = tablePattern; + this.maxRetries = maxRetries; + this.exceptionPolicy = exceptionPolicy; + this.metricData = metricData; + this.dirtySinkHelper = dirtySinkHelper; + operationHelper = OperationHelper.of(dynamicSchemaFormat); + } + + public static SchemaChangeHelper of(JsonDynamicSchemaFormat dynamicSchemaFormat, DorisOptions options, + boolean schemaChange, Map<SchemaChangeType, SchemaChangePolicy> policyMap, String databasePattern, + String tablePattern, int maxRetries, SchemaUpdateExceptionPolicy exceptionPolicy, + SinkTableMetricData metricData, DirtySinkHelper<Object> dirtySinkHelper) { + return new SchemaChangeHelper(dynamicSchemaFormat, options, schemaChange, policyMap, databasePattern, + tablePattern, maxRetries, exceptionPolicy, metricData, dirtySinkHelper); + } + + /** + * Process schema change for Doris + * + * @param data The origin data + */ + public void process(byte[] originData, JsonNode data) { + if (!schemaChange) { + return; + } + String database; + String table; + try { + database = dynamicSchemaFormat.parse(data, databasePattern); + table = dynamicSchemaFormat.parse(data, tablePattern); + } catch (Exception e) { + if (exceptionPolicy == SchemaUpdateExceptionPolicy.THROW_WITH_STOP) { + throw new SchemaChangeHandleException( + String.format("Parse database, table from origin data failed, origin data: %s", + new String(originData)), + e); + } + LOGGER.warn("Parse database, table from origin data failed, origin data: {}", new String(originData), e); + if (exceptionPolicy == SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE) { + dirtySinkHelper.invoke(new String(originData), DirtyType.JSON_PROCESS_ERROR, e); + } + if (metricData != null) { + metricData.invokeDirty(1, originData.length); + } + return; + } + Operation operation; + try { + JsonNode operationNode = Preconditions.checkNotNull(data.get("operation"), + "Operation node is null"); + operation = Preconditions.checkNotNull( + dynamicSchemaFormat.objectMapper.convertValue(operationNode, new TypeReference<Operation>() { + }), "Operation is null"); + } catch (Exception e) { + if (exceptionPolicy == SchemaUpdateExceptionPolicy.THROW_WITH_STOP) { + throw new SchemaChangeHandleException( + String.format("Extract Operation from origin data failed,origin data: %s", data), e); + } + LOGGER.warn("Extract Operation from origin data failed,origin data: {}", data, e); + handleDirtyData(data, originData, database, table, DirtyType.JSON_PROCESS_ERROR, e); + return; + } + String originSchema = dynamicSchemaFormat.extractDDL(data); + SchemaChangeType type = SchemaChangeUtils.extractSchemaChangeType(operation); + if (type == null) { + LOGGER.warn("Unsupported for schema-change: {}", originSchema); + return; + } + switch (type) { + case ALTER: + handleAlterOperation(database, table, originData, originSchema, data, (AlterOperation) operation); + break; + case CREATE_TABLE: + doCreateTable(originData, database, table, type, originSchema, data, (CreateTableOperation) operation); + break; + case DROP_TABLE: + doDropTable(type, originSchema); + break; + case RENAME_TABLE: + doRenameTable(type, originSchema); + break; + case TRUNCATE_TABLE: + doTruncateTable(type, originSchema); + break; + default: + LOGGER.warn("Unsupported for {}: {}", type, originSchema); + } + } + + private void handleDirtyData(JsonNode data, byte[] originData, String database, + String table, DirtyType dirtyType, Throwable e) { + if (exceptionPolicy == SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE) { + String label = parseValue(data, dirtySinkHelper.getDirtyOptions().getLabels()); + String logTag = parseValue(data, dirtySinkHelper.getDirtyOptions().getLogTag()); + String identifier = parseValue(data, dirtySinkHelper.getDirtyOptions().getIdentifier()); + dirtySinkHelper.invoke(new String(originData), dirtyType, label, logTag, identifier, e); + } + if (metricData != null) { + metricData.outputDirtyMetricsWithEstimate(database, table, 1, originData.length); + } + } + + private void reportMetric(String database, String table, int len) { + if (metricData != null) { + metricData.outputMetrics(database, table, 1, len); + } + } + + private String parseValue(JsonNode data, String pattern) { + try { + return dynamicSchemaFormat.parse(data, pattern); + } catch (Exception e) { + LOGGER.warn("Parse value from pattern failed,the pattern: {}, data: {}", pattern, data); + } + return pattern; + } + + private void handleAlterOperation(String database, String table, byte[] originData, + String originSchema, JsonNode data, AlterOperation operation) { + if (operation.getAlterColumns() == null || operation.getAlterColumns().isEmpty()) { + if (exceptionPolicy == SchemaUpdateExceptionPolicy.THROW_WITH_STOP) { + throw new SchemaChangeHandleException( + String.format("Alter columns is empty, origin schema: %s", originSchema)); + } + LOGGER.warn("Alter columns is empty, origin schema: {}", originSchema); + return; + } + Map<SchemaChangeType, List<AlterColumn>> typeMap = new LinkedHashMap<>(); + for (AlterColumn alterColumn : operation.getAlterColumns()) { + Set<SchemaChangeType> types = null; + try { + types = SchemaChangeUtils.extractSchemaChangeType(alterColumn); + Preconditions.checkState(!types.isEmpty(), "Schema change types is empty"); + } catch (Exception e) { + if (exceptionPolicy == SchemaUpdateExceptionPolicy.THROW_WITH_STOP) { + throw new SchemaChangeHandleException( + String.format("Extract schema change type failed, origin schema: %s", originSchema), e); + } + LOGGER.warn("Extract schema change type failed, origin schema: {}", originSchema, e); + } + if (types == null) { + continue; + } + if (types.size() == 1) { + SchemaChangeType type = types.stream().findFirst().get(); + typeMap.computeIfAbsent(type, k -> new ArrayList<>()).add(alterColumn); + } else { + // Handle change column, it only exists change column type and rename column in this scenario for now. + for (SchemaChangeType type : types) { + SchemaChangePolicy policy = policyMap.get(type); + if (policy == SchemaChangePolicy.ENABLE) { + LOGGER.warn("Unsupported for {}: {}", type, originSchema); + } else { + doSchemaChangeBase(type, policy, originSchema); + } + } + } + } + if (!typeMap.isEmpty()) { + doAlterOperation(database, table, originData, originSchema, data, typeMap); + } + } + + private void doAlterOperation(String database, String table, byte[] originData, String originSchema, JsonNode data, + Map<SchemaChangeType, List<AlterColumn>> typeMap) { + StringJoiner joiner = new StringJoiner(","); + for (Entry<SchemaChangeType, List<AlterColumn>> kv : typeMap.entrySet()) { + SchemaChangePolicy policy = policyMap.get(kv.getKey()); + doSchemaChangeBase(kv.getKey(), policy, originSchema); + if (policy == SchemaChangePolicy.ENABLE) { + String alterStatement = null; + try { + switch (kv.getKey()) { + case ADD_COLUMN: + alterStatement = doAddColumn(kv.getValue()); + break; + case DROP_COLUMN: + alterStatement = doDropColumn(kv.getValue()); + break; + case RENAME_COLUMN: + alterStatement = doRenameColumn(kv.getKey(), originSchema); + break; + case CHANGE_COLUMN_TYPE: + alterStatement = doChangeColumnType(kv.getKey(), originSchema); + break; + default: + } + } catch (Exception e) { + if (exceptionPolicy == SchemaUpdateExceptionPolicy.THROW_WITH_STOP) { + throw new SchemaChangeHandleException( + String.format("Build alter statement failed, origin schema: %s", originSchema), e); + } + LOGGER.warn("Build alter statement failed, origin schema: {}", originSchema, e); + } + if (alterStatement != null) { + joiner.add(alterStatement); + } + } + } + String statement = joiner.toString(); + if (statement.length() != 0) { + try { + String alterStatementCommon = operationHelper.buildAlterStatementCommon(database, table); + statement = alterStatementCommon + statement; + // The checkLightSchemaChange is removed because most scenarios support it + boolean result = executeStatement(database, statement); + if (!result) { + LOGGER.error("Alter table failed,statement: {}", statement); + throw new SchemaChangeHandleException(String.format("Add column failed,statement: %s", statement)); + } + LOGGER.info("Alter table success,statement: {}", statement); + reportMetric(database, table, originData.length); + } catch (Exception e) { + if (exceptionPolicy == SchemaUpdateExceptionPolicy.THROW_WITH_STOP) { + throw new SchemaChangeHandleException( + String.format("Alter table failed, origin schema: %s", originSchema), e); + } + handleDirtyData(data, originData, database, table, DirtyType.HANDLE_ALTER_TABLE_ERROR, e); + } + } + } + + private String doChangeColumnType(SchemaChangeType type, String originSchema) { + LOGGER.warn("Unsupported for {}: {}", type, originSchema); + return null; + } + + private String doRenameColumn(SchemaChangeType type, String originSchema) { + LOGGER.warn("Unsupported for {}: {}", type, originSchema); + return null; + } + + private String doDropColumn(List<AlterColumn> alterColumns) { + return operationHelper.buildDropColumnStatement(alterColumns); + } + + private String doAddColumn(List<AlterColumn> alterColumns) { + return operationHelper.buildAddColumnStatement(alterColumns); + } + + private void doTruncateTable(SchemaChangeType type, String originSchema) { + SchemaChangePolicy policy = policyMap.get(SchemaChangeType.TRUNCATE_TABLE); + if (policy == SchemaChangePolicy.ENABLE) { + LOGGER.warn("Unsupported for {}: {}", type, originSchema); + return; + } + doSchemaChangeBase(type, policy, originSchema); + } + + private void doRenameTable(SchemaChangeType type, String originSchema) { + SchemaChangePolicy policy = policyMap.get(SchemaChangeType.RENAME_TABLE); + if (policy == SchemaChangePolicy.ENABLE) { + LOGGER.warn("Unsupported for {}: {}", type, originSchema); + return; + } + doSchemaChangeBase(type, policy, originSchema); + } + + private void doDropTable(SchemaChangeType type, String originSchema) { + SchemaChangePolicy policy = policyMap.get(SchemaChangeType.DROP_TABLE); + if (policy == SchemaChangePolicy.ENABLE) { + LOGGER.warn("Unsupported for {}: {}", type, originSchema); + return; + } + doSchemaChangeBase(type, policy, originSchema); + } + + private void doCreateTable(byte[] originData, String database, String table, SchemaChangeType type, + String originSchema, JsonNode data, CreateTableOperation operation) { + SchemaChangePolicy policy = policyMap.get(type); + if (policy == SchemaChangePolicy.ENABLE) { + try { + List<String> primaryKeys = dynamicSchemaFormat.extractPrimaryKeyNames(data); + String stmt = operationHelper.buildCreateTableStatement(database, table, primaryKeys, operation); + boolean result = executeStatement(database, stmt); + if (!result) { + LOGGER.error("Create table failed,statement: {}", stmt); + throw new IOException(String.format("Create table failed,statement: %s", stmt)); + } + reportMetric(database, table, originData.length); + return; + } catch (Exception e) { + if (exceptionPolicy == SchemaUpdateExceptionPolicy.THROW_WITH_STOP) { + throw new SchemaChangeHandleException( + String.format("Drop column failed, origin schema: %s", originSchema), e); + } + handleDirtyData(data, originData, database, table, DirtyType.CREATE_TABLE_ERROR, e); + return; + } + } + doSchemaChangeBase(type, policy, originSchema); + } + + private void doSchemaChangeBase(SchemaChangeType type, SchemaChangePolicy policy, String schema) { + if (policy == null) { + LOGGER.warn("Unsupported for {}: {}", type, schema); + return; + } + switch (policy) { + case LOG: + LOGGER.warn("Unsupported for {}: {}", type, schema); + break; + case ERROR: + throw new SchemaChangeHandleException(String.format("Unsupported for %s: %s", type, schema)); + default: + } + } + + private Map<String, Object> buildRequestParam(String column, boolean dropColumn) { + Map<String, Object> params = new HashMap<>(); + params.put("isDropColumn", dropColumn); + params.put("columnName", column); + return params; + } + + private String authHeader() { + return "Basic " + new String(Base64.encodeBase64((options.getUsername() + ":" + + options.getPassword()).getBytes(StandardCharsets.UTF_8))); + } + + private boolean executeStatement(String database, String stmt) throws IOException { + Map<String, String> param = new HashMap<>(); + param.put("stmt", stmt); + String requestUrl = String.format(SCHEMA_CHANGE_API, options.getFenodes(), database); + HttpPost httpPost = new HttpPost(requestUrl); + httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader()); + httpPost.setHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE_JSON); + httpPost.setEntity(new StringEntity(dynamicSchemaFormat.objectMapper.writeValueAsString(param))); + return sendRequest(httpPost); + } + + private boolean checkLightSchemaChange(String database, String table, String column, boolean dropColumn) + throws IOException { + String url = String.format(CHECK_LIGHT_SCHEMA_CHANGE_API, options.getFenodes(), database, table); + Map<String, Object> param = buildRequestParam(column, dropColumn); + HttpGetEntity httpGet = new HttpGetEntity(url); + httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader()); + httpGet.setEntity(new StringEntity(dynamicSchemaFormat.objectMapper.writeValueAsString(param))); + boolean success = sendRequest(httpGet); + if (!success) { + LOGGER.warn("schema change can not do table {}.{}", database, table); + } + return success; + } + + @SuppressWarnings("unchecked") + private boolean sendRequest(HttpUriRequest request) { + try (CloseableHttpClient httpclient = HttpClients.createDefault()) { + for (int i = 0; i < maxRetries; i++) { + try { + CloseableHttpResponse response = httpclient.execute(request); + final int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode == HttpStatus.SC_OK && response.getEntity() != null) { + String loadResult = EntityUtils.toString(response.getEntity()); + Map<String, Object> responseMap = dynamicSchemaFormat.objectMapper + .readValue(loadResult, Map.class); + String code = responseMap.getOrDefault("code", "-1").toString(); + if (DORIS_HTTP_CALL_SUCCESS.equals(code)) { + return true; + } + LOGGER.error("send request error: {}", loadResult); + } + } catch (Exception e) { + if (i >= maxRetries) { + LOGGER.error("send http requests error", e); + throw new IOException(e); + } + try { + Thread.sleep(1000L * i); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new IOException("unable to send http request,interrupted while doing another attempt", e); + } + } + } + } catch (Exception e) { + LOGGER.error("send request error", e); + throw new SchemaChangeHandleException("send request error", e); + } + return false; + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java index 7017f18749..1ca00e92c8 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java +++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java @@ -30,7 +30,9 @@ import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy; import org.apache.inlong.sort.base.util.CalculateObjectSizeUtils; import org.apache.inlong.sort.base.util.MetricStateUtils; import org.apache.inlong.sort.doris.model.RespContent; +import org.apache.inlong.sort.doris.schema.SchemaChangeHelper; import org.apache.inlong.sort.doris.util.DorisParseUtils; +import org.apache.inlong.sort.util.SchemaChangeUtils; import org.apache.commons.lang3.StringUtils; import org.apache.doris.flink.cfg.DorisExecutionOptions; @@ -149,6 +151,11 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> { private final String dynamicSchemaFormat; private final boolean ignoreSingleTableErrors; private final SchemaUpdateExceptionPolicy schemaUpdatePolicy; + private final String[] fieldNames; + private final LogicalType[] logicalTypes; + private final boolean enableSchemaChange; + @Nullable + private final String schemaChangePolicies; private long batchBytes = 0L; private int size; private DorisStreamLoad dorisStreamLoad; @@ -160,15 +167,14 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> { private transient SinkTableMetricData metricData; private transient ListState<MetricState> metricStateListState; private transient MetricState metricState; - private final String[] fieldNames; private volatile boolean jsonFormat; private volatile RowData.FieldGetter[] fieldGetters; private String fieldDelimiter; private String lineDelimiter; private String columns; - private final LogicalType[] logicalTypes; private DirtySinkHelper<Object> dirtySinkHelper; private transient Schema schema; + private SchemaChangeHelper helper; public DorisDynamicSchemaOutputFormat(DorisOptions option, DorisReadOptions readOptions, @@ -185,7 +191,9 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> { String auditHostAndPorts, boolean multipleSink, DirtyOptions dirtyOptions, - @Nullable DirtySink<Object> dirtySink) { + @Nullable DirtySink<Object> dirtySink, + boolean enableSchemaChange, + @Nullable String schemaChangePolicies) { this.options = option; this.readOptions = readOptions; this.executionOptions = executionOptions; @@ -201,7 +209,8 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> { this.ignoreSingleTableErrors = ignoreSingleTableErrors; this.schemaUpdatePolicy = schemaUpdatePolicy; this.dirtySinkHelper = new DirtySinkHelper<>(dirtyOptions, dirtySink); - + this.enableSchemaChange = enableSchemaChange; + this.schemaChangePolicies = schemaChangePolicies; handleStreamLoadProp(); } @@ -211,7 +220,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> { * @return builder */ public static DorisDynamicSchemaOutputFormat.Builder builder() { - return new DorisDynamicSchemaOutputFormat.Builder(); + return new Builder(); } private void handleStreamLoadProp() { @@ -274,9 +283,12 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> { } } - if (multipleSink && StringUtils.isNotBlank(dynamicSchemaFormat)) { + if (multipleSink) { jsonDynamicSchemaFormat = (JsonDynamicSchemaFormat) DynamicSchemaFormatFactory.getFormat(dynamicSchemaFormat); + helper = SchemaChangeHelper.of(jsonDynamicSchemaFormat, options, enableSchemaChange, + enableSchemaChange ? SchemaChangeUtils.deserialize(schemaChangePolicies) : null, databasePattern, + tablePattern, executionOptions.getMaxRetries(), schemaUpdatePolicy, metricData, dirtySinkHelper); } MetricOption metricOption = MetricOption.builder() .withInlongLabels(inlongMetric) @@ -401,7 +413,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> { boolean isDDL = jsonDynamicSchemaFormat.extractDDLFlag(rootNode); if (isDDL) { ddlNum.incrementAndGet(); - // Ignore ddl change for now + helper.process(rowData.getBinary(0), rootNode); return; } String tableIdentifier; @@ -925,6 +937,8 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> { private String[] fieldNames; private DirtyOptions dirtyOptions; private DirtySink<Object> dirtySink; + private boolean enableSchemaChange; + private String schemaChangePolicies; public Builder() { this.optionsBuilder = DorisOptions.builder().setTableIdentifier(""); @@ -1021,6 +1035,16 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> { return this; } + public Builder setEnableSchemaChange(boolean enableSchemaChange) { + this.enableSchemaChange = enableSchemaChange; + return this; + } + + public Builder setSchemaChangePolicies(String schemaChangePolicies) { + this.schemaChangePolicies = schemaChangePolicies; + return this; + } + @SuppressWarnings({"rawtypes"}) public DorisDynamicSchemaOutputFormat build() { LogicalType[] logicalTypes = null; @@ -1044,7 +1068,9 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> { auditHostAndPorts, multipleSink, dirtyOptions, - dirtySink); + dirtySink, + enableSchemaChange, + schemaChangePolicies); } } } diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java index 802979ffc5..5f6e789bb6 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java @@ -22,6 +22,9 @@ import org.apache.inlong.sort.base.dirty.sink.DirtySink; import org.apache.inlong.sort.base.dirty.utils.DirtySinkFactoryUtils; import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory; import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy; +import org.apache.inlong.sort.protocol.enums.SchemaChangePolicy; +import org.apache.inlong.sort.protocol.enums.SchemaChangeType; +import org.apache.inlong.sort.util.SchemaChangeUtils; import org.apache.commons.lang3.StringUtils; import org.apache.doris.flink.cfg.DorisExecutionOptions; @@ -43,8 +46,12 @@ import org.apache.flink.table.types.logical.VarBinaryType; import org.apache.flink.table.utils.TableSchemaUtils; import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Properties; import java.util.Set; @@ -67,6 +74,8 @@ import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT; import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_IGNORE_SINGLE_TABLE_ERRORS; import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY; import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN; +import static org.apache.inlong.sort.base.Constants.SINK_SCHEMA_CHANGE_ENABLE; +import static org.apache.inlong.sort.base.Constants.SINK_SCHEMA_CHANGE_POLICIES; /** * This class copy from {@link org.apache.doris.flink.table.DorisDynamicTableFactory} @@ -176,6 +185,35 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory .withDescription("the flush max bytes (includes all append, upsert and delete records), over this number" + " in batch, will flush data. The default value is 10MB."); + private static final Map<SchemaChangeType, List<SchemaChangePolicy>> SUPPORTS_POLICY_MAP = new HashMap<>(); + + static { + SUPPORTS_POLICY_MAP.put(SchemaChangeType.CREATE_TABLE, + Arrays.asList(SchemaChangePolicy.ENABLE, SchemaChangePolicy.IGNORE, SchemaChangePolicy.LOG, + SchemaChangePolicy.ERROR)); + SUPPORTS_POLICY_MAP.put(SchemaChangeType.DROP_TABLE, + Arrays.asList(SchemaChangePolicy.IGNORE, SchemaChangePolicy.LOG, + SchemaChangePolicy.ERROR)); + SUPPORTS_POLICY_MAP.put(SchemaChangeType.RENAME_TABLE, + Arrays.asList(SchemaChangePolicy.IGNORE, SchemaChangePolicy.LOG, + SchemaChangePolicy.ERROR)); + SUPPORTS_POLICY_MAP.put(SchemaChangeType.TRUNCATE_TABLE, + Arrays.asList(SchemaChangePolicy.IGNORE, SchemaChangePolicy.LOG, + SchemaChangePolicy.ERROR)); + SUPPORTS_POLICY_MAP.put(SchemaChangeType.ADD_COLUMN, + Arrays.asList(SchemaChangePolicy.ENABLE, SchemaChangePolicy.IGNORE, SchemaChangePolicy.LOG, + SchemaChangePolicy.ERROR)); + SUPPORTS_POLICY_MAP.put(SchemaChangeType.DROP_COLUMN, + Arrays.asList(SchemaChangePolicy.ENABLE, SchemaChangePolicy.IGNORE, SchemaChangePolicy.LOG, + SchemaChangePolicy.ERROR)); + SUPPORTS_POLICY_MAP.put(SchemaChangeType.RENAME_COLUMN, + Arrays.asList(SchemaChangePolicy.IGNORE, SchemaChangePolicy.LOG, + SchemaChangePolicy.ERROR)); + SUPPORTS_POLICY_MAP.put(SchemaChangeType.CHANGE_COLUMN_TYPE, + Arrays.asList(SchemaChangePolicy.IGNORE, SchemaChangePolicy.LOG, + SchemaChangePolicy.ERROR)); + } + @Override public String factoryIdentifier() { return "doris-inlong"; @@ -223,6 +261,8 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory options.add(INLONG_AUDIT); options.add(FactoryUtil.SINK_PARALLELISM); options.add(AUDIT_KEYS); + options.add(SINK_SCHEMA_CHANGE_ENABLE); + options.add(SINK_SCHEMA_CHANGE_POLICIES); return options; } @@ -309,8 +349,10 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory SchemaUpdateExceptionPolicy schemaUpdatePolicy = helper.getOptions() .getOptional(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY).orElse(SchemaUpdateExceptionPolicy.THROW_WITH_STOP); String sinkMultipleFormat = helper.getOptions().getOptional(SINK_MULTIPLE_FORMAT).orElse(null); - validateSinkMultiple(physicalSchema.toPhysicalRowDataType(), - multipleSink, sinkMultipleFormat, databasePattern, tablePattern); + boolean enableSchemaChange = helper.getOptions().get(SINK_SCHEMA_CHANGE_ENABLE); + String schemaChangePolicies = helper.getOptions().getOptional(SINK_SCHEMA_CHANGE_POLICIES).orElse(null); + validateSinkMultiple(physicalSchema.toPhysicalRowDataType(), multipleSink, sinkMultipleFormat, + databasePattern, tablePattern, enableSchemaChange, schemaChangePolicies); String inlongMetric = helper.getOptions().getOptional(INLONG_METRIC).orElse(INLONG_METRIC.defaultValue()); String auditHostAndPorts = helper.getOptions().getOptional(INLONG_AUDIT).orElse(INLONG_AUDIT.defaultValue()); Integer parallelism = helper.getOptions().getOptional(FactoryUtil.SINK_PARALLELISM).orElse(null); @@ -333,11 +375,13 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory auditHostAndPorts, parallelism, dirtyOptions, - dirtySink); + dirtySink, + enableSchemaChange, + schemaChangePolicies); } private void validateSinkMultiple(DataType physicalDataType, boolean multipleSink, String sinkMultipleFormat, - String databasePattern, String tablePattern) { + String databasePattern, String tablePattern, boolean enableSchemaChange, String schemaChangePolicies) { if (multipleSink) { if (StringUtils.isBlank(databasePattern)) { throw new ValidationException( @@ -367,6 +411,21 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory "Only supports 'BYTES' or 'VARBINARY(n)' of PhysicalDataType " + "when the option 'sink.multiple.enable' is 'true'"); } + if (enableSchemaChange) { + Map<SchemaChangeType, SchemaChangePolicy> policyMap = SchemaChangeUtils + .deserialize(schemaChangePolicies); + for (Entry<SchemaChangeType, SchemaChangePolicy> kv : policyMap.entrySet()) { + List<SchemaChangePolicy> policies = SUPPORTS_POLICY_MAP.get(kv.getKey()); + if (policies == null) { + throw new ValidationException( + String.format("Unsupported type of schemage-change: %s", kv.getKey())); + } + if (!policies.contains(kv.getValue())) { + throw new ValidationException( + String.format("Unsupported policy of schemage-change: %s", kv.getValue())); + } + } + } } } } \ No newline at end of file diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java index cbec6fd682..be7c8b86b3 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java +++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java @@ -53,6 +53,9 @@ public class DorisDynamicTableSink implements DynamicTableSink { private final Integer parallelism; private final DirtyOptions dirtyOptions; private @Nullable final DirtySink<Object> dirtySink; + private final boolean enableSchemaChange; + @Nullable + private final String schemaChangePolicies; public DorisDynamicTableSink(DorisOptions options, DorisReadOptions readOptions, @@ -68,7 +71,9 @@ public class DorisDynamicTableSink implements DynamicTableSink { String auditHostAndPorts, Integer parallelism, DirtyOptions dirtyOptions, - @Nullable DirtySink<Object> dirtySink) { + @Nullable DirtySink<Object> dirtySink, + boolean enableSchemaChange, + @Nullable String schemaChangePolicies) { this.options = options; this.readOptions = readOptions; this.executionOptions = executionOptions; @@ -84,6 +89,8 @@ public class DorisDynamicTableSink implements DynamicTableSink { this.parallelism = parallelism; this.dirtyOptions = dirtyOptions; this.dirtySink = dirtySink; + this.enableSchemaChange = enableSchemaChange; + this.schemaChangePolicies = schemaChangePolicies; } @Override @@ -114,7 +121,9 @@ public class DorisDynamicTableSink implements DynamicTableSink { .setIgnoreSingleTableErrors(ignoreSingleTableErrors) .setSchemaUpdatePolicy(schemaUpdatePolicy) .setDirtyOptions(dirtyOptions) - .setDirtySink(dirtySink); + .setDirtySink(dirtySink) + .setEnableSchemaChange(enableSchemaChange) + .setSchemaChangePolicies(schemaChangePolicies); return SinkFunctionProvider.of( new GenericDorisSinkFunction<>(builder.build()), parallelism); } @@ -135,7 +144,9 @@ public class DorisDynamicTableSink implements DynamicTableSink { auditHostAndPorts, parallelism, dirtyOptions, - dirtySink); + dirtySink, + enableSchemaChange, + schemaChangePolicies); } @Override diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/test/java/org/apache/inlong/sort/doris/schema/OperationHelperTest.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/test/java/org/apache/inlong/sort/doris/schema/OperationHelperTest.java new file mode 100644 index 0000000000..2c3cc85d35 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/test/java/org/apache/inlong/sort/doris/schema/OperationHelperTest.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.doris.schema; + +import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory; +import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat; +import org.apache.inlong.sort.protocol.ddl.enums.AlterType; +import org.apache.inlong.sort.protocol.ddl.enums.PositionType; +import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn; +import org.apache.inlong.sort.protocol.ddl.expressions.Column; +import org.apache.inlong.sort.protocol.ddl.expressions.Position; +import org.apache.inlong.sort.protocol.ddl.operations.CreateTableOperation; + +import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.sql.Types; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +/** + * Test for {@link OperationHelper} + */ +public class OperationHelperTest { + + private final Map<Integer, Column> allTypes2Columns = + ImmutableMap.<Integer, Column>builder() + .put(Types.CHAR, new Column("c", Collections.singletonList("32"), Types.CHAR, + new Position(PositionType.FIRST, null), true, "InLong", "a column")) + .put(Types.VARCHAR, new Column("c", Collections.singletonList("32"), Types.VARCHAR, + new Position(PositionType.FIRST, null), false, "InLong", "a column")) + .put(Types.SMALLINT, new Column("c", Collections.singletonList("8"), Types.SMALLINT, + new Position(PositionType.AFTER, "b"), true, "2023", "a column")) + .put(Types.INTEGER, new Column("c", Collections.singletonList("11"), Types.INTEGER, + new Position(PositionType.AFTER, "b"), true, "2023", "a column")) + .put(Types.BIGINT, new Column("c", Collections.singletonList("16"), Types.BIGINT, + new Position(PositionType.AFTER, "b"), true, "2023", "a column")) + .put(Types.REAL, + new Column("c", Arrays.asList("11", "2"), Types.REAL, new Position(PositionType.AFTER, "b"), + true, "99.99", "a column")) + .put(Types.DOUBLE, new Column("c", Arrays.asList("11", "2"), Types.DOUBLE, + new Position(PositionType.AFTER, "b"), true, "99.99", "a column")) + .put(Types.FLOAT, new Column("c", Arrays.asList("11", "2"), Types.FLOAT, + new Position(PositionType.AFTER, "b"), true, "99.99", "a column")) + .put(Types.DECIMAL, new Column("c", Arrays.asList("11", "2"), Types.DECIMAL, + new Position(PositionType.AFTER, "b"), true, "99.99", "a column")) + .put(Types.NUMERIC, new Column("c", Arrays.asList("11", "2"), Types.NUMERIC, + new Position(PositionType.AFTER, "b"), true, "99.99", "a column")) + .put(Types.BIT, + new Column("c", null, Types.BIT, new Position(PositionType.AFTER, "b"), true, "false", + "a column")) + .put(Types.TIME, + new Column("c", null, Types.TIME, new Position(PositionType.AFTER, "b"), true, "10:30", + "a column")) + .put(Types.TIME_WITH_TIMEZONE, + new Column("c", null, Types.TIME_WITH_TIMEZONE, new Position(PositionType.AFTER, "b"), + true, "10:30", "a column")) + .put(Types.TIMESTAMP_WITH_TIMEZONE, + new Column("c", null, Types.TIMESTAMP_WITH_TIMEZONE, new Position(PositionType.AFTER, "b"), + true, "2023-01-01 10:30", "a column")) + .put(Types.TIMESTAMP, new Column("c", null, Types.TIMESTAMP, new Position(PositionType.AFTER, "b"), + true, "2023-01-01 10:30", "a column")) + .put(Types.BINARY, new Column("c", null, Types.BINARY, new Position(PositionType.AFTER, "b"), + true, "this is a BINARY", "a column")) + .put(Types.VARBINARY, new Column("c", null, Types.BINARY, new Position(PositionType.AFTER, "b"), + true, "this is a VARBINARY", "a column")) + .put(Types.BLOB, + new Column("c", null, Types.BLOB, new Position(PositionType.AFTER, "b"), true, + "this is a BLOB", + "a column")) + .put(Types.CLOB, + new Column("c", null, Types.CLOB, new Position(PositionType.AFTER, "b"), true, + "this is a CLOB", + "a column")) + .put(Types.DATE, + new Column("c", null, Types.DATE, new Position(PositionType.AFTER, "b"), true, "2023-01-01", + "a column")) + .put(Types.BOOLEAN, + new Column("c", null, Types.BOOLEAN, new Position(PositionType.AFTER, "b"), true, "true", + "a column")) + .put(Types.LONGNVARCHAR, + new Column("c", null, Types.LONGNVARCHAR, new Position(PositionType.AFTER, "b"), + true, "this is a LONGNVARCHAR", "a column")) + .put(Types.LONGVARBINARY, + new Column("c", null, Types.LONGVARBINARY, new Position(PositionType.AFTER, "b"), + true, "this is a LONGVARBINARY", "a column")) + .put(Types.LONGVARCHAR, + new Column("c", null, Types.LONGVARCHAR, new Position(PositionType.AFTER, "b"), + true, "this is a LONGVARCHAR", "a column")) + .put(Types.ARRAY, + new Column("c", null, Types.ARRAY, new Position(PositionType.AFTER, "b"), true, + "this is a ARRAY", + "a column")) + .put(Types.NCHAR, + new Column("c", null, Types.NCHAR, new Position(PositionType.AFTER, "b"), true, + "this is a NCHAR", + "a column")) + .put(Types.NCLOB, + new Column("c", null, Types.NCLOB, new Position(PositionType.AFTER, "b"), true, + "this is a NCLOB", + "a column")) + .put(Types.TINYINT, new Column("c", Collections.singletonList("1"), Types.TINYINT, + new Position(PositionType.FIRST, null), true, "1", "a column")) + .put(Types.OTHER, + new Column("c", null, Types.OTHER, new Position(PositionType.AFTER, "b"), true, + "this is a OTHER", + "a column")) + .build(); + private final Map<Integer, String> addColumnStatements = + ImmutableMap.<Integer, String>builder() + .put(Types.CHAR, + "ADD COLUMN `c` CHAR(32) DEFAULT 'InLong' COMMENT 'a column' FIRST") + .put(Types.VARCHAR, + "ADD COLUMN `c` VARCHAR(96) NOT NULL DEFAULT 'InLong' COMMENT 'a column' FIRST") + .put(Types.SMALLINT, + "ADD COLUMN `c` SMALLINT(8) DEFAULT '2023' COMMENT 'a column' AFTER `b`") + .put(Types.INTEGER, + "ADD COLUMN `c` INT(11) DEFAULT '2023' COMMENT 'a column' AFTER `b`") + .put(Types.BIGINT, + "ADD COLUMN `c` BIGINT(16) DEFAULT '2023' COMMENT 'a column' AFTER `b`") + .put(Types.REAL, + "ADD COLUMN `c` DECIMAL(11, 2) DEFAULT '99.99' COMMENT 'a column' AFTER `b`") + .put(Types.DOUBLE, + "ADD COLUMN `c` DOUBLE DEFAULT '99.99' COMMENT 'a column' AFTER `b`") + .put(Types.FLOAT, + "ADD COLUMN `c` FLOAT DEFAULT '99.99' COMMENT 'a column' AFTER `b`") + .put(Types.DECIMAL, + "ADD COLUMN `c` DECIMAL(11, 2) DEFAULT '99.99' COMMENT 'a column' AFTER `b`") + .put(Types.NUMERIC, + "ADD COLUMN `c` DECIMAL(11, 2) DEFAULT '99.99' COMMENT 'a column' AFTER `b`") + .put(Types.BIT, + "ADD COLUMN `c` BOOLEAN DEFAULT 'false' COMMENT 'a column' AFTER `b`") + .put(Types.TIME, + "ADD COLUMN `c` STRING DEFAULT '10:30' COMMENT 'a column' AFTER `b`") + .put(Types.TIME_WITH_TIMEZONE, + "ADD COLUMN `c` STRING DEFAULT '10:30' COMMENT 'a column' AFTER `b`") + .put(Types.TIMESTAMP_WITH_TIMEZONE, + "ADD COLUMN `c` DATETIME DEFAULT '2023-01-01 10:30' COMMENT 'a column' AFTER `b`") + .put(Types.TIMESTAMP, + "ADD COLUMN `c` DATETIME DEFAULT '2023-01-01 10:30' COMMENT 'a column' AFTER `b`") + .put(Types.BINARY, + "ADD COLUMN `c` STRING DEFAULT 'this is a BINARY' COMMENT 'a column' AFTER `b`") + .put(Types.VARBINARY, + "ADD COLUMN `c` STRING DEFAULT 'this is a VARBINARY' COMMENT 'a column' AFTER `b`") + .put(Types.BLOB, + "ADD COLUMN `c` STRING DEFAULT 'this is a BLOB' COMMENT 'a column' AFTER `b`") + .put(Types.CLOB, + "ADD COLUMN `c` STRING DEFAULT 'this is a CLOB' COMMENT 'a column' AFTER `b`") + .put(Types.DATE, + "ADD COLUMN `c` DATE DEFAULT '2023-01-01' COMMENT 'a column' AFTER `b`") + .put(Types.BOOLEAN, + "ADD COLUMN `c` BOOLEAN DEFAULT 'true' COMMENT 'a column' AFTER `b`") + .put(Types.LONGNVARCHAR, + "ADD COLUMN `c` STRING DEFAULT 'this is a LONGNVARCHAR' COMMENT 'a column' AFTER `b`") + .put(Types.LONGVARBINARY, + "ADD COLUMN `c` STRING DEFAULT 'this is a LONGVARBINARY' COMMENT 'a column' AFTER `b`") + .put(Types.LONGVARCHAR, + "ADD COLUMN `c` STRING DEFAULT 'this is a LONGVARCHAR' COMMENT 'a column' AFTER `b`") + .put(Types.ARRAY, + "ADD COLUMN `c` STRING DEFAULT 'this is a ARRAY' COMMENT 'a column' AFTER `b`") + .put(Types.NCHAR, + "ADD COLUMN `c` STRING DEFAULT 'this is a NCHAR' COMMENT 'a column' AFTER `b`") + .put(Types.NCLOB, + "ADD COLUMN `c` STRING DEFAULT 'this is a NCLOB' COMMENT 'a column' AFTER `b`") + .put(Types.TINYINT, + "ADD COLUMN `c` TINYINT(1) DEFAULT '1' COMMENT 'a column' FIRST") + .put(Types.OTHER, + "ADD COLUMN `c` STRING DEFAULT 'this is a OTHER' COMMENT 'a column' AFTER `b`") + .build(); + private OperationHelper helper; + + @Before + public void init() { + helper = OperationHelper.of( + (JsonDynamicSchemaFormat) DynamicSchemaFormatFactory.getFormat("canal-json")); + } + + /** + * Test for {@link OperationHelper#buildAddColumnStatement(List)} + */ + @Test + public void testBuildAddColumnStatement() { + for (Entry<Integer, Column> kv : allTypes2Columns.entrySet()) { + Assert.assertEquals(addColumnStatements.get(kv.getKey()), + helper.buildAddColumnStatement(Collections.singletonList(new AlterColumn( + AlterType.ADD_COLUMN, kv.getValue(), null)))); + } + } + + /** + * Test for {@link OperationHelper#buildDropColumnStatement(List)} + */ + @Test + public void testBuildDropColumnStatement() { + for (Entry<Integer, Column> kv : allTypes2Columns.entrySet()) { + Assert.assertEquals("DROP COLUMN `c`", + helper.buildDropColumnStatement(Collections.singletonList( + new AlterColumn(AlterType.DROP_COLUMN, null, kv.getValue())))); + } + + } + + /** + * Test for {@link OperationHelper#buildCreateTableStatement(String, String, List, CreateTableOperation)} + */ + @Test + public void testBuildCreateTableStatement() { + List<String> primaryKeys = Arrays.asList("a", "b"); + List<Column> columns = Arrays.asList(new Column("a", Collections.singletonList("32"), Types.VARCHAR, + new Position(PositionType.FIRST, null), false, "InLong", "a column"), + new Column("b", Collections.singletonList("32"), Types.VARCHAR, + new Position(PositionType.FIRST, null), false, "InLong", "a column"), + new Column("c", Collections.singletonList("32"), Types.VARCHAR, + new Position(PositionType.FIRST, null), true, "InLong", "a column"), + new Column("d", Collections.singletonList("32"), Types.VARCHAR, + new Position(PositionType.FIRST, null), true, "InLong", "a column")); + CreateTableOperation operation = new CreateTableOperation(); + operation.setComment("create table auto"); + operation.setColumns(columns); + String database = "inlong_database"; + String table = "inlong_table"; + Assert.assertEquals("CREATE TABLE IF NOT EXISTS `inlong_database`.`inlong_table`(\n" + + "\t`a` VARCHAR(96) NOT NULL DEFAULT 'InLong' COMMENT 'a column',\n" + + "\t`b` VARCHAR(96) NOT NULL DEFAULT 'InLong' COMMENT 'a column',\n" + + "\t`c` VARCHAR(96) DEFAULT 'InLong' COMMENT 'a column',\n" + + "\t`d` VARCHAR(96) DEFAULT 'InLong' COMMENT 'a column'\n" + + ")\n" + + "UNIQUE KEY(`a`,`b`)\n" + + "COMMENT 'create table auto'\n" + + "DISTRIBUTED BY HASH(`a`,`b`)\n" + + "PROPERTIES (\n" + + "\t\"light_schema_change\" = \"true\"\n" + + ")", + helper.buildCreateTableStatement(database, table, primaryKeys, operation)); + } +}