This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new 415f37a [Improve]The change basis of table schema is changed to parse data column field (#17) 415f37a is described below commit 415f37a7aa2f6bf27c969c8363499edd0399cfd0 Author: wudongliang <46414265+donglian...@users.noreply.github.com> AuthorDate: Thu May 16 14:05:32 2024 +0800 [Improve]The change basis of table schema is changed to parse data column field (#17) --- .../connector/jdbc/util}/DateTimeUtils.java | 6 +- .../debezium/connector/jdbc/util/SchemaUtils.java | 58 +++++ .../doris/kafka/connector/cfg/DorisOptions.java | 17 +- .../connector/cfg/DorisSinkConnectorConfig.java | 5 +- .../connector/converter/RecordDescriptor.java | 67 ++--- .../kafka/connector/converter/RecordService.java | 148 ++++++++++- .../schema/SchemaChangeManager.java | 84 ++++-- .../SchemaEvolutionMode.java} | 24 +- .../connector/converter/type/AbstractDateType.java | 11 +- .../connector/converter/type/AbstractTimeType.java | 28 +- .../converter/type/AbstractTimestampType.java | 23 +- .../connector/converter/type/AbstractType.java | 23 ++ .../doris/kafka/connector/converter/type/Type.java | 3 + .../converter/type/connect/ConnectBooleanType.java | 8 + .../converter/type/connect/ConnectBytesType.java | 7 + .../converter/type/connect/ConnectDateType.java | 2 +- .../converter/type/connect/ConnectDecimalType.java | 13 + .../converter/type/connect/ConnectFloat32Type.java | 8 + .../converter/type/connect/ConnectFloat64Type.java | 8 + .../converter/type/connect/ConnectInt16Type.java | 8 + .../converter/type/connect/ConnectInt32Type.java | 8 + .../converter/type/connect/ConnectInt64Type.java | 8 + .../converter/type/connect/ConnectInt8Type.java | 8 + .../connect/ConnectMapToConnectStringType.java | 6 + .../converter/type/connect/ConnectStringType.java | 19 ++ .../converter/type/connect/ConnectTimeType.java | 2 +- .../type/connect/ConnectTimestampType.java | 2 +- .../converter/type/debezium/DateType.java | 2 +- .../converter/type/debezium/MicroTimeType.java | 2 +- .../type/debezium/MicroTimestampType.java | 2 +- .../converter/type/debezium/NanoTimeType.java | 2 +- .../converter/type/debezium/NanoTimestampType.java | 2 +- .../converter/type/debezium/TimeType.java | 2 +- .../type/debezium/VariableScaleDecimalType.java | 12 + .../type/doris}/DorisType.java | 5 +- .../type/doris/DorisTypeProperties.java} | 8 +- .../kafka/connector/dialect/mysql/MysqlType.java | 213 --------------- .../connector/service/DorisDefaultSinkService.java | 26 +- .../kafka/connector/utils/ConfigCheckUtils.java | 29 --- .../writer/schema/DebeziumSchemaChange.java | 289 --------------------- .../writer/schema/SchemaChangeHelper.java | 159 ------------ .../connector/converter/TestRecordService.java | 99 ++++++- .../connector/writer/TestDebeziumSchemaChange.java | 133 ---------- .../kafka/connector/writer/TestRecordBuffer.java | 11 +- 44 files changed, 640 insertions(+), 960 deletions(-) diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/utils/DateTimeUtils.java b/src/main/java/io/debezium/connector/jdbc/util/DateTimeUtils.java similarity index 95% rename from src/main/java/org/apache/doris/kafka/connector/converter/utils/DateTimeUtils.java rename to src/main/java/io/debezium/connector/jdbc/util/DateTimeUtils.java index 09ee9d0..941254d 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/utils/DateTimeUtils.java +++ b/src/main/java/io/debezium/connector/jdbc/util/DateTimeUtils.java @@ -15,8 +15,12 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. + * + * Copied from + * https://github.com/debezium/debezium-connector-jdbc/blob/main/src/main/java/io/debezium/connector/jdbc/util/DateTimeUtils.java */ -package org.apache.doris.kafka.connector.converter.utils; + +package io.debezium.connector.jdbc.util; import io.debezium.time.Conversions; import java.sql.Timestamp; diff --git a/src/main/java/io/debezium/connector/jdbc/util/SchemaUtils.java b/src/main/java/io/debezium/connector/jdbc/util/SchemaUtils.java new file mode 100644 index 0000000..178507c --- /dev/null +++ b/src/main/java/io/debezium/connector/jdbc/util/SchemaUtils.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * Copied from + * https://github.com/debezium/debezium-connector-jdbc/blob/main/src/main/java/io/debezium/connector/jdbc/util/SchemaUtils.java + */ + +package io.debezium.connector.jdbc.util; + +import java.util.Objects; +import java.util.Optional; +import org.apache.kafka.connect.data.Schema; + +public class SchemaUtils { + private static final String SCHEMA_PARAMETER_COLUMN_TYPE = "__debezium.source.column.type"; + private static final String SCHEMA_PARAMETER_COLUMN_LENGTH = "__debezium.source.column.length"; + private static final String SCHEMA_PARAMETER_COLUMN_PRECISION = + "__debezium.source.column.scale"; + private static final String SCHEMA_PARAMETER_COLUMN_NAME = "__debezium.source.column.name"; + + public static Optional<String> getSourceColumnType(Schema schema) { + return getSchemaParameter(schema, SCHEMA_PARAMETER_COLUMN_TYPE); + } + + public static Optional<String> getSourceColumnLength(Schema schema) { + return getSchemaParameter(schema, SCHEMA_PARAMETER_COLUMN_LENGTH); + } + + public static Optional<String> getSourceColumnPrecision(Schema schema) { + return getSchemaParameter(schema, SCHEMA_PARAMETER_COLUMN_PRECISION); + } + + public static Optional<String> getSourceColumnName(Schema schema) { + return getSchemaParameter(schema, SCHEMA_PARAMETER_COLUMN_NAME); + } + + public static Optional<String> getSchemaParameter(Schema schema, String parameterName) { + if (!Objects.isNull(schema.parameters())) { + return Optional.ofNullable(schema.parameters().get(parameterName)); + } + return Optional.empty(); + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java index e9f1297..4596f69 100644 --- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java +++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Properties; import java.util.stream.Collectors; import org.apache.doris.kafka.connector.converter.ConverterMode; +import org.apache.doris.kafka.connector.converter.schema.SchemaEvolutionMode; import org.apache.doris.kafka.connector.utils.ConfigCheckUtils; import org.apache.doris.kafka.connector.writer.DeliveryGuarantee; import org.apache.doris.kafka.connector.writer.load.LoadModel; @@ -44,7 +45,6 @@ public class DorisOptions { private final String password; private final String database; private final Map<String, String> topicMap; - private final String schemaTopic; private final int fileSize; private final int recordNum; private long flushTime; @@ -62,6 +62,7 @@ public class DorisOptions { private LoadModel loadModel; private DeliveryGuarantee deliveryGuarantee; private ConverterMode converterMode; + private SchemaEvolutionMode schemaEvolutionMode; public DorisOptions(Map<String, String> config) { this.name = config.get(DorisSinkConnectorConfig.NAME); @@ -91,6 +92,11 @@ public class DorisOptions { config.getOrDefault( DorisSinkConnectorConfig.CONVERT_MODE, DorisSinkConnectorConfig.CONVERT_MODE_DEFAULT)); + this.schemaEvolutionMode = + SchemaEvolutionMode.of( + config.getOrDefault( + DorisSinkConnectorConfig.SCHEMA_EVOLUTION, + DorisSinkConnectorConfig.SCHEMA_EVOLUTION_DEFAULT)); this.fileSize = Integer.parseInt(config.get(DorisSinkConnectorConfig.BUFFER_SIZE_BYTES)); this.recordNum = @@ -105,7 +111,6 @@ public class DorisOptions { this.flushTime = DorisSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC_MIN; } this.topicMap = getTopicToTableMap(config); - this.schemaTopic = config.get(DorisSinkConnectorConfig.SCHEMA_TOPIC); enableCustomJMX = DorisSinkConnectorConfig.JMX_OPT_DEFAULT; if (config.containsKey(DorisSinkConnectorConfig.JMX_OPT)) { @@ -281,6 +286,10 @@ public class DorisOptions { return this.converterMode; } + public SchemaEvolutionMode getSchemaEvolutionMode() { + return this.schemaEvolutionMode; + } + public boolean isAutoRedirect() { return autoRedirect; } @@ -293,10 +302,6 @@ public class DorisOptions { return enableDelete; } - public String getSchemaTopic() { - return schemaTopic; - } - /** * parse topic to table map * diff --git a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java index 94ea08e..5c33da4 100644 --- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java +++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java @@ -23,6 +23,7 @@ import java.time.Duration; import java.util.Map; import org.apache.doris.kafka.connector.DorisSinkConnector; import org.apache.doris.kafka.connector.converter.ConverterMode; +import org.apache.doris.kafka.connector.converter.schema.SchemaEvolutionMode; import org.apache.doris.kafka.connector.utils.ConfigCheckUtils; import org.apache.doris.kafka.connector.writer.DeliveryGuarantee; import org.apache.doris.kafka.connector.writer.load.LoadModel; @@ -78,9 +79,11 @@ public class DorisSinkConnectorConfig { public static final String DELIVERY_GUARANTEE_DEFAULT = DeliveryGuarantee.AT_LEAST_ONCE.name(); public static final String CONVERT_MODE = "converter.mode"; public static final String CONVERT_MODE_DEFAULT = ConverterMode.NORMAL.getName(); - public static final String SCHEMA_TOPIC = "schema.topic"; + // Prefix for Doris StreamLoad specific properties. public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties."; + public static final String SCHEMA_EVOLUTION = "schema.evolution"; + public static final String SCHEMA_EVOLUTION_DEFAULT = SchemaEvolutionMode.NONE.getName(); // metrics public static final String JMX_OPT = "jmx"; diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java b/src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java index 11d097f..c4f7243 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java @@ -23,7 +23,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import org.apache.doris.kafka.connector.dialect.mysql.MysqlType; +import org.apache.doris.kafka.connector.converter.type.Type; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; @@ -119,27 +119,34 @@ public class RecordDescriptor { public static class FieldDescriptor { private final Schema schema; private final String name; + private final Map<String, Type> typeRegistry; + private final Type type; + private final String typeName; private final String schemaTypeName; private final String schemaName; private String comment; private String defaultValue; - public FieldDescriptor( - Schema schema, String name, String schemaTypeName, String schemaName) { + public FieldDescriptor(Schema schema, String name, Map<String, Type> typeRegistry) { this.schema = schema; this.name = name; - this.schemaTypeName = schemaTypeName; - this.schemaName = schemaName; + this.typeRegistry = typeRegistry; + this.schemaName = schema.name(); + this.schemaTypeName = schema.type().name(); + this.type = + Objects.nonNull(schema.name()) + ? typeRegistry.get(schema.name()) + : typeRegistry.get(schema.type().name()); + this.typeName = type.getTypeName(schema); } public FieldDescriptor( Schema schema, String name, - String schemaTypeName, - String schemaName, + Map<String, Type> typeRegistry, String comment, String defaultValue) { - this(schema, name, schemaTypeName, schemaName); + this(schema, name, typeRegistry); this.comment = comment; this.defaultValue = defaultValue; } @@ -148,6 +155,14 @@ public class RecordDescriptor { return name; } + public Type getType() { + return type; + } + + public String getTypeName() { + return typeName; + } + public String getSchemaName() { return schemaName; } @@ -172,7 +187,7 @@ public class RecordDescriptor { public static class Builder { private SinkRecord sinkRecord; - private Struct tableChange; + private Map<String, Type> typeRegistry; // Internal build state private final List<String> keyFieldNames = new ArrayList<>(); @@ -184,8 +199,8 @@ public class RecordDescriptor { return this; } - public Builder withTableChange(Struct tableChange) { - this.tableChange = tableChange; + public Builder withTypeRegistry(Map<String, Type> typeRegistry) { + this.typeRegistry = typeRegistry; return this; } @@ -193,11 +208,7 @@ public class RecordDescriptor { Objects.requireNonNull(sinkRecord, "The sink record must be provided."); final boolean flattened = !isTombstone(sinkRecord) && isFlattened(sinkRecord); - if (Objects.nonNull(tableChange)) { - readTableChangeData(tableChange); - } else { - readSinkRecordNonKeyData(sinkRecord, flattened); - } + readSinkRecordNonKeyData(sinkRecord, flattened); return new RecordDescriptor( sinkRecord, @@ -208,27 +219,6 @@ public class RecordDescriptor { flattened); } - private void readTableChangeData(final Struct tableChange) { - Struct tableChangeTable = tableChange.getStruct("table"); - List<Object> tableChangeColumns = tableChangeTable.getArray("columns"); - for (Object column : tableChangeColumns) { - Struct columnStruct = (Struct) column; - Schema schema = columnStruct.schema(); - String name = columnStruct.getString("name"); - String typeName = columnStruct.getString("typeName"); - Integer length = columnStruct.getInt32("length"); - Integer scale = columnStruct.getInt32("scale"); - String dorisType = MysqlType.toDorisType(typeName, length, scale); - String comment = columnStruct.getString("comment"); - String defaultValue = columnStruct.getString("defaultValueExpression"); - nonKeyFieldNames.add(name); - allFields.put( - name, - new FieldDescriptor( - schema, name, dorisType, schema.name(), comment, defaultValue)); - } - } - private boolean isFlattened(SinkRecord record) { return record.valueSchema().name() == null || !record.valueSchema().name().contains("Envelope"); @@ -266,8 +256,7 @@ public class RecordDescriptor { } private void applyNonKeyField(String name, Schema schema) { - FieldDescriptor fieldDescriptor = - new FieldDescriptor(schema, name, schema.type().name(), schema.name()); + FieldDescriptor fieldDescriptor = new FieldDescriptor(schema, name, typeRegistry); nonKeyFieldNames.add(fieldDescriptor.getName()); allFields.put(fieldDescriptor.getName(), fieldDescriptor); } diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java b/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java index 1390761..8bc2480 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java @@ -21,17 +21,32 @@ package org.apache.doris.kafka.connector.converter; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import io.debezium.util.Strings; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.StringJoiner; +import java.util.stream.Collectors; import org.apache.doris.kafka.connector.cfg.DorisOptions; +import org.apache.doris.kafka.connector.converter.schema.SchemaChangeManager; +import org.apache.doris.kafka.connector.converter.schema.SchemaEvolutionMode; import org.apache.doris.kafka.connector.converter.type.Type; import org.apache.doris.kafka.connector.exception.DataFormatException; +import org.apache.doris.kafka.connector.exception.DorisException; +import org.apache.doris.kafka.connector.exception.SchemaChangeException; +import org.apache.doris.kafka.connector.model.ColumnDescriptor; +import org.apache.doris.kafka.connector.model.TableDescriptor; +import org.apache.doris.kafka.connector.model.doris.Schema; +import org.apache.doris.kafka.connector.service.DorisSystemService; +import org.apache.doris.kafka.connector.service.RestService; import org.apache.doris.kafka.connector.writer.LoadConstants; import org.apache.doris.kafka.connector.writer.RecordBuffer; import org.apache.kafka.connect.data.Struct; @@ -43,10 +58,14 @@ import org.slf4j.LoggerFactory; public class RecordService { private static final Logger LOG = LoggerFactory.getLogger(RecordService.class); + public static final String SCHEMA_CHANGE_VALUE = "SchemaChangeValue"; private static final ObjectMapper MAPPER = new ObjectMapper(); private final JsonConverter converter; + private DorisSystemService dorisSystemService; + private SchemaChangeManager schemaChangeManager; private DorisOptions dorisOptions; private RecordTypeRegister recordTypeRegister; + private Set<RecordDescriptor.FieldDescriptor> missingFields; public RecordService() { this.converter = new JsonConverter(); @@ -59,6 +78,8 @@ public class RecordService { this(); this.dorisOptions = dorisOptions; this.recordTypeRegister = new RecordTypeRegister(dorisOptions); + this.dorisSystemService = new DorisSystemService(dorisOptions); + this.schemaChangeManager = new SchemaChangeManager(dorisOptions); } /** @@ -68,10 +89,14 @@ public class RecordService { public String processStructRecord(SinkRecord record) { String processedRecord; if (ConverterMode.DEBEZIUM_INGESTION == dorisOptions.getConverterMode()) { + validate(record); RecordDescriptor recordDescriptor = buildRecordDescriptor(record); if (recordDescriptor.isTombstone()) { return null; } + String tableName = dorisOptions.getTopicMapTable(recordDescriptor.getTopicName()); + checkAndApplyTableChangesIfNeeded(tableName, recordDescriptor); + List<String> nonKeyFieldNames = recordDescriptor.getNonKeyFieldNames(); if (recordDescriptor.isDelete()) { processedRecord = @@ -96,6 +121,101 @@ public class RecordService { return processedRecord; } + private void validate(SinkRecord record) { + if (isSchemaChange(record)) { + LOG.warn( + "Schema change records are not supported by JDBC connector. Adjust `topics` or `topics.regex` to exclude schema change topic."); + throw new DorisException( + "Schema change records are not supported by JDBC connector. Adjust `topics` or `topics.regex` to exclude schema change topic."); + } + } + + private static boolean isSchemaChange(SinkRecord record) { + return record.valueSchema() != null + && !Strings.isNullOrEmpty(record.valueSchema().name()) + && record.valueSchema().name().contains(SCHEMA_CHANGE_VALUE); + } + + private void checkAndApplyTableChangesIfNeeded( + String tableName, RecordDescriptor recordDescriptor) { + if (!hasTable(tableName)) { + // TODO Table does not exist, lets attempt to create it. + } else { + // Table exists, lets attempt to alter it if necessary. + alterTableIfNeeded(tableName, recordDescriptor); + } + } + + private boolean hasTable(String tableName) { + return dorisSystemService.tableExists(dorisOptions.getDatabase(), tableName); + } + + private void alterTableIfNeeded(String tableName, RecordDescriptor record) { + // Resolve table metadata from the database + final TableDescriptor table = obtainTableSchema(tableName); + + missingFields = resolveMissingFields(record, table); + if (missingFields.isEmpty()) { + // There are no missing fields, simply return + // TODO should we check column type changes or default value changes? + return; + } + + LOG.info( + "Find some miss columns in {} table, try to alter add this columns={}.", + tableName, + missingFields.stream() + .map(RecordDescriptor.FieldDescriptor::getName) + .collect(Collectors.toList())); + if (SchemaEvolutionMode.NONE.equals(dorisOptions.getSchemaEvolutionMode())) { + LOG.warn( + "Table '{}' cannot be altered because schema evolution is disabled.", + tableName); + throw new SchemaChangeException( + "Cannot alter table " + table + " because schema evolution is disabled"); + } + for (RecordDescriptor.FieldDescriptor missingField : missingFields) { + schemaChangeManager.addColumnDDL(tableName, missingField); + } + } + + private Set<RecordDescriptor.FieldDescriptor> resolveMissingFields( + RecordDescriptor record, TableDescriptor table) { + Set<RecordDescriptor.FieldDescriptor> missingFields = new HashSet<>(); + for (Map.Entry<String, RecordDescriptor.FieldDescriptor> entry : + record.getFields().entrySet()) { + String filedName = entry.getKey(); + if (!table.hasColumn(filedName)) { + missingFields.add(entry.getValue()); + } + } + return missingFields; + } + + private TableDescriptor obtainTableSchema(String tableName) { + // TODO when the table structure is obtained from doris for first time, it should be + // obtained in the cache later. + Schema schema = + RestService.getSchema(dorisOptions, dorisOptions.getDatabase(), tableName, LOG); + List<ColumnDescriptor> columnDescriptors = new ArrayList<>(); + schema.getProperties() + .forEach( + column -> { + ColumnDescriptor columnDescriptor = + ColumnDescriptor.builder() + .columnName(column.getName()) + .typeName(column.getType()) + .comment(column.getComment()) + .build(); + columnDescriptors.add(columnDescriptor); + }); + return TableDescriptor.builder() + .tableName(tableName) + .type(schema.getKeysType()) + .columns(columnDescriptors) + .build(); + } + /** process list record from kafka [{"name":"doris1"},{"name":"doris2"}] */ public String processListRecord(SinkRecord record) { try { @@ -130,19 +250,13 @@ public class RecordService { RecordDescriptor record, Struct source, List<String> fields, boolean isDelete) { Map<String, Object> filedMapping = new LinkedHashMap<>(); String filedResult = null; - final Map<String, Type> typeRegistry = recordTypeRegister.getTypeRegistry(); for (String fieldName : fields) { final RecordDescriptor.FieldDescriptor field = record.getFields().get(fieldName); - String fieldSchemaName = field.getSchemaName(); - String fieldSchemaTypeName = field.getSchemaTypeName(); + Type type = field.getType(); Object value = field.getSchema().isOptional() ? source.getWithoutDefault(fieldName) : source.get(fieldName); - Type type = - Objects.nonNull(fieldSchemaName) - ? typeRegistry.get(fieldSchemaName) - : typeRegistry.get(fieldSchemaTypeName); Object convertValue = type.getValue(value); if (Objects.nonNull(convertValue) && !type.isNumber()) { filedMapping.put(fieldName, convertValue.toString()); @@ -186,10 +300,28 @@ public class RecordService { private RecordDescriptor buildRecordDescriptor(SinkRecord record) { RecordDescriptor recordDescriptor; try { - recordDescriptor = RecordDescriptor.builder().withSinkRecord(record).build(); + recordDescriptor = + RecordDescriptor.builder() + .withSinkRecord(record) + .withTypeRegistry(recordTypeRegister.getTypeRegistry()) + .build(); } catch (Exception e) { throw new ConnectException("Failed to process a sink record", e); } return recordDescriptor; } + + public void setSchemaChangeManager(SchemaChangeManager schemaChangeManager) { + this.schemaChangeManager = schemaChangeManager; + } + + @VisibleForTesting + public void setDorisSystemService(DorisSystemService dorisSystemService) { + this.dorisSystemService = dorisSystemService; + } + + @VisibleForTesting + public Set<RecordDescriptor.FieldDescriptor> getMissingFields() { + return missingFields; + } } diff --git a/src/main/java/org/apache/doris/kafka/connector/writer/schema/SchemaChangeManager.java b/src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaChangeManager.java similarity index 69% rename from src/main/java/org/apache/doris/kafka/connector/writer/schema/SchemaChangeManager.java rename to src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaChangeManager.java index 1ee9c1e..376edf9 100644 --- a/src/main/java/org/apache/doris/kafka/connector/writer/schema/SchemaChangeManager.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaChangeManager.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.doris.kafka.connector.writer.schema; +package org.apache.doris.kafka.connector.converter.schema; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; @@ -28,8 +28,8 @@ import java.util.Map; import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang3.StringUtils; import org.apache.doris.kafka.connector.cfg.DorisOptions; +import org.apache.doris.kafka.connector.converter.RecordDescriptor; import org.apache.doris.kafka.connector.exception.SchemaChangeException; -import org.apache.doris.kafka.connector.utils.HttpGetWithEntity; import org.apache.http.HttpHeaders; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; @@ -44,8 +44,7 @@ import org.slf4j.LoggerFactory; public class SchemaChangeManager implements Serializable { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(SchemaChangeManager.class); - private static final String CHECK_SCHEMA_CHANGE_API = - "http://%s/api/enable_light_schema_change/%s/%s"; + private static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s"; private static final String SCHEMA_CHANGE_API = "http://%s/api/query/default_cluster/%s"; private final ObjectMapper objectMapper = new ObjectMapper(); private final DorisOptions dorisOptions; @@ -54,29 +53,6 @@ public class SchemaChangeManager implements Serializable { this.dorisOptions = dorisOptions; } - public static Map<String, Object> buildRequestParam(boolean dropColumn, String columnName) { - Map<String, Object> params = new HashMap<>(); - params.put("isDropColumn", dropColumn); - params.put("columnName", columnName); - return params; - } - - /** check ddl can do light schema change. */ - public boolean checkSchemaChange(String database, String table, Map<String, Object> params) - throws IllegalArgumentException, IOException { - if (params.isEmpty()) { - return false; - } - String requestUrl = - String.format(CHECK_SCHEMA_CHANGE_API, dorisOptions.getHttpUrl(), database, table); - HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl); - httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader()); - httpGet.setEntity(new StringEntity(objectMapper.writeValueAsString(params))); - String responseEntity = ""; - Map<String, Object> responseMap = handleResponse(httpGet, responseEntity); - return handleSchemaChange(responseMap, responseEntity); - } - private boolean handleSchemaChange(Map<String, Object> responseMap, String responseEntity) { String code = responseMap.getOrDefault("code", "-1").toString(); if (code.equals("0")) { @@ -86,6 +62,60 @@ public class SchemaChangeManager implements Serializable { } } + public void addColumnDDL(String tableName, RecordDescriptor.FieldDescriptor field) { + try { + String addColumnDDL = buildAddColumnDDL(dorisOptions.getDatabase(), tableName, field); + boolean status = execute(addColumnDDL, dorisOptions.getDatabase()); + LOG.info( + "Add missing column for {} table, ddl={}, status={}", + tableName, + addColumnDDL, + status); + } catch (Exception e) { + LOG.warn("Failed to add column for {}, cause by: ", tableName, e); + throw new SchemaChangeException( + "Failed to add column for " + tableName + ", cause by:", e); + } + } + + public static String buildAddColumnDDL( + String database, String tableName, RecordDescriptor.FieldDescriptor field) { + String name = field.getName(); + String typeName = field.getTypeName(); + String comment = field.getComment(); + String defaultValue = field.getDefaultValue(); + + String addDDL = + String.format( + ADD_DDL, + identifier(database) + "." + identifier(tableName), + identifier(name), + typeName); + if (defaultValue != null) { + addDDL = addDDL + " DEFAULT " + quoteDefaultValue(defaultValue); + } + if (StringUtils.isNotEmpty(comment)) { + addDDL = addDDL + " COMMENT '" + quoteComment(comment) + "'"; + } + return addDDL; + } + + private static String quoteComment(String comment) { + return comment.replaceAll("'", "\\\\'"); + } + + private static String identifier(String name) { + return "`" + name + "`"; + } + + private static String quoteDefaultValue(String defaultValue) { + // DEFAULT current_timestamp not need quote + if (defaultValue.equalsIgnoreCase("current_timestamp")) { + return defaultValue; + } + return "'" + defaultValue + "'"; + } + /** execute sql in doris. */ public boolean execute(String ddl, String database) throws IOException, IllegalArgumentException { diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt8Type.java b/src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaEvolutionMode.java similarity index 67% copy from src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt8Type.java copy to src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaEvolutionMode.java index 55c82cf..d9b6a9b 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt8Type.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaEvolutionMode.java @@ -16,19 +16,25 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.doris.kafka.connector.converter.type.connect; -public class ConnectInt8Type extends AbstractConnectSchemaType { +package org.apache.doris.kafka.connector.converter.schema; - public static final ConnectInt8Type INSTANCE = new ConnectInt8Type(); +public enum SchemaEvolutionMode { + NONE("none"), - @Override - public String[] getRegistrationKeys() { - return new String[] {"INT8"}; + BASIC("basic"); + + private final String name; + + SchemaEvolutionMode(String name) { + this.name = name; + } + + public static SchemaEvolutionMode of(String name) { + return SchemaEvolutionMode.valueOf(name.toUpperCase()); } - @Override - public boolean isNumber() { - return true; + public String getName() { + return name; } } diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractDateType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractDateType.java index de32cd6..ab25931 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractDateType.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractDateType.java @@ -18,5 +18,14 @@ */ package org.apache.doris.kafka.connector.converter.type; +import org.apache.doris.kafka.connector.converter.type.doris.DorisType; +import org.apache.kafka.connect.data.Schema; + /** An abstract base class for all temporal date implementations of {@link Type}. */ -public abstract class AbstractDateType extends AbstractTemporalType {} +public abstract class AbstractDateType extends AbstractTemporalType { + + @Override + public String getTypeName(Schema schema) { + return DorisType.DATE; + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimeType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimeType.java index 533f1e1..79e0105 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimeType.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimeType.java @@ -18,5 +18,31 @@ */ package org.apache.doris.kafka.connector.converter.type; +import java.util.Optional; +import org.apache.doris.kafka.connector.converter.type.doris.DorisType; +import org.apache.doris.kafka.connector.converter.type.doris.DorisTypeProperties; +import org.apache.kafka.connect.data.Schema; + /** An abstract temporal implementation of {@link Type} for {@code TIME} based columns. */ -public abstract class AbstractTimeType extends AbstractTemporalType {} +public abstract class AbstractTimeType extends AbstractTemporalType { + + @Override + public String getTypeName(Schema schema) { + // NOTE: + // The MySQL connector does not use the __debezium.source.column.scale parameter to pass + // the time column's precision but instead uses the __debezium.source.column.length key + // which differs from all other connector implementations. + // + final int precision = getTimePrecision(schema); + return String.format( + "%s(%s)", + DorisType.DATETIME, + Math.min(precision, DorisTypeProperties.MAX_SUPPORTED_DATE_TIME_PRECISION)); + } + + protected int getTimePrecision(Schema schema) { + final String length = getSourceColumnLength(schema).orElse("0"); + final Optional<String> scale = getSourceColumnPrecision(schema); + return scale.map(Integer::parseInt).orElseGet(() -> Integer.parseInt(length)); + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimestampType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimestampType.java index 3d50376..0b8d45d 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimestampType.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimestampType.java @@ -18,5 +18,26 @@ */ package org.apache.doris.kafka.connector.converter.type; +import java.util.Optional; +import org.apache.doris.kafka.connector.converter.type.doris.DorisType; +import org.apache.doris.kafka.connector.converter.type.doris.DorisTypeProperties; +import org.apache.kafka.connect.data.Schema; + /** An abstract temporal implementation of {@link Type} for {@code TIMESTAMP} based columns. */ -public abstract class AbstractTimestampType extends AbstractTemporalType {} +public abstract class AbstractTimestampType extends AbstractTemporalType { + + @Override + public String getTypeName(Schema schema) { + final int precision = getTimePrecision(schema); + return String.format( + "%s(%s)", + DorisType.DATETIME, + Math.min(precision, DorisTypeProperties.MAX_SUPPORTED_DATE_TIME_PRECISION)); + } + + protected int getTimePrecision(Schema schema) { + final String length = getSourceColumnLength(schema).orElse("0"); + final Optional<String> scale = getSourceColumnPrecision(schema); + return scale.map(Integer::parseInt).orElseGet(() -> Integer.parseInt(length)); + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractType.java index d915a89..650e792 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractType.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractType.java @@ -18,7 +18,11 @@ */ package org.apache.doris.kafka.connector.converter.type; +import io.debezium.connector.jdbc.util.SchemaUtils; +import java.util.Objects; +import java.util.Optional; import org.apache.doris.kafka.connector.cfg.DorisOptions; +import org.apache.kafka.connect.data.Schema; /** An abstract implementation of {@link Type}, which all types should extend. */ public abstract class AbstractType implements Type { @@ -40,4 +44,23 @@ public abstract class AbstractType implements Type { public String toString() { return getClass().getSimpleName(); } + + protected Optional<String> getSchemaParameter(Schema schema, String parameterName) { + if (!Objects.isNull(schema.parameters())) { + return Optional.ofNullable(schema.parameters().get(parameterName)); + } + return Optional.empty(); + } + + protected Optional<String> getSourceColumnType(Schema schema) { + return SchemaUtils.getSourceColumnType(schema); + } + + protected Optional<String> getSourceColumnLength(Schema schema) { + return SchemaUtils.getSourceColumnLength(schema); + } + + protected Optional<String> getSourceColumnPrecision(Schema schema) { + return SchemaUtils.getSourceColumnPrecision(schema); + } } diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/Type.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/Type.java index c284f0e..698e838 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/type/Type.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/Type.java @@ -19,6 +19,7 @@ package org.apache.doris.kafka.connector.converter.type; import org.apache.doris.kafka.connector.cfg.DorisOptions; +import org.apache.kafka.connect.data.Schema; /** * A type indicates the type of each column of kafka record, including various column types of @@ -42,5 +43,7 @@ public interface Type { /** Get the actual converted value based on the column type. */ Object getValue(Object sourceValue); + String getTypeName(Schema schema); + boolean isNumber(); } diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBooleanType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBooleanType.java index 18c5af3..dfac2d5 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBooleanType.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBooleanType.java @@ -18,6 +18,9 @@ */ package org.apache.doris.kafka.connector.converter.type.connect; +import org.apache.doris.kafka.connector.converter.type.doris.DorisType; +import org.apache.kafka.connect.data.Schema; + public class ConnectBooleanType extends AbstractConnectSchemaType { public static final ConnectBooleanType INSTANCE = new ConnectBooleanType(); @@ -26,4 +29,9 @@ public class ConnectBooleanType extends AbstractConnectSchemaType { public String[] getRegistrationKeys() { return new String[] {"BOOLEAN"}; } + + @Override + public String getTypeName(Schema schema) { + return DorisType.BOOLEAN; + } } diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBytesType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBytesType.java index 6c2701c..fbd07a9 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBytesType.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBytesType.java @@ -19,6 +19,8 @@ package org.apache.doris.kafka.connector.converter.type.connect; import java.nio.ByteBuffer; +import org.apache.doris.kafka.connector.converter.type.doris.DorisType; +import org.apache.kafka.connect.data.Schema; public class ConnectBytesType extends AbstractConnectSchemaType { @@ -37,6 +39,11 @@ public class ConnectBytesType extends AbstractConnectSchemaType { return bytesToHexString(getByteArrayFromValue(sourceValue)); } + @Override + public String getTypeName(Schema schema) { + return DorisType.STRING; + } + private byte[] getByteArrayFromValue(Object value) { byte[] byteArray = null; if (value instanceof ByteBuffer) { diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDateType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDateType.java index 4106f8d..acac4af 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDateType.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDateType.java @@ -18,8 +18,8 @@ */ package org.apache.doris.kafka.connector.converter.type.connect; +import io.debezium.connector.jdbc.util.DateTimeUtils; import org.apache.doris.kafka.connector.converter.type.AbstractDateType; -import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils; import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.errors.ConnectException; diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDecimalType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDecimalType.java index 8625883..a0ab7c9 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDecimalType.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDecimalType.java @@ -19,7 +19,9 @@ package org.apache.doris.kafka.connector.converter.type.connect; import org.apache.doris.kafka.connector.converter.type.AbstractType; +import org.apache.doris.kafka.connector.converter.type.doris.DorisType; import org.apache.kafka.connect.data.Decimal; +import org.apache.kafka.connect.data.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,6 +36,17 @@ public class ConnectDecimalType extends AbstractType { return new String[] {Decimal.LOGICAL_NAME}; } + @Override + public String getTypeName(Schema schema) { + int scale = Integer.parseInt(getSchemaParameter(schema, "scale").orElse("0")); + int precision = + Integer.parseInt( + getSchemaParameter(schema, "connect.decimal.precision").orElse("0")); + return precision <= 38 + ? String.format("%s(%s,%s)", DorisType.DECIMAL, precision, Math.max(scale, 0)) + : DorisType.STRING; + } + @Override public boolean isNumber() { return true; diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat32Type.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat32Type.java index 98b6936..fc75ba9 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat32Type.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat32Type.java @@ -18,6 +18,9 @@ */ package org.apache.doris.kafka.connector.converter.type.connect; +import org.apache.doris.kafka.connector.converter.type.doris.DorisType; +import org.apache.kafka.connect.data.Schema; + public class ConnectFloat32Type extends AbstractConnectSchemaType { public static final ConnectFloat32Type INSTANCE = new ConnectFloat32Type(); @@ -27,6 +30,11 @@ public class ConnectFloat32Type extends AbstractConnectSchemaType { return new String[] {"FLOAT32"}; } + @Override + public String getTypeName(Schema schema) { + return DorisType.FLOAT; + } + @Override public boolean isNumber() { return true; diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat64Type.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat64Type.java index f050c15..3a74391 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat64Type.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat64Type.java @@ -18,6 +18,9 @@ */ package org.apache.doris.kafka.connector.converter.type.connect; +import org.apache.doris.kafka.connector.converter.type.doris.DorisType; +import org.apache.kafka.connect.data.Schema; + public class ConnectFloat64Type extends AbstractConnectSchemaType { public static final ConnectFloat64Type INSTANCE = new ConnectFloat64Type(); @@ -27,6 +30,11 @@ public class ConnectFloat64Type extends AbstractConnectSchemaType { return new String[] {"FLOAT64"}; } + @Override + public String getTypeName(Schema schema) { + return DorisType.DOUBLE; + } + @Override public boolean isNumber() { return true; diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt16Type.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt16Type.java index 573813b..6a61c77 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt16Type.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt16Type.java @@ -18,6 +18,9 @@ */ package org.apache.doris.kafka.connector.converter.type.connect; +import org.apache.doris.kafka.connector.converter.type.doris.DorisType; +import org.apache.kafka.connect.data.Schema; + public class ConnectInt16Type extends AbstractConnectSchemaType { public static final ConnectInt16Type INSTANCE = new ConnectInt16Type(); @@ -27,6 +30,11 @@ public class ConnectInt16Type extends AbstractConnectSchemaType { return new String[] {"INT16"}; } + @Override + public String getTypeName(Schema schema) { + return DorisType.SMALLINT; + } + @Override public boolean isNumber() { return true; diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt32Type.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt32Type.java index 50dd6c7..e11ad5f 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt32Type.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt32Type.java @@ -18,6 +18,9 @@ */ package org.apache.doris.kafka.connector.converter.type.connect; +import org.apache.doris.kafka.connector.converter.type.doris.DorisType; +import org.apache.kafka.connect.data.Schema; + public class ConnectInt32Type extends AbstractConnectSchemaType { public static final ConnectInt32Type INSTANCE = new ConnectInt32Type(); @@ -27,6 +30,11 @@ public class ConnectInt32Type extends AbstractConnectSchemaType { return new String[] {"INT32"}; } + @Override + public String getTypeName(Schema schema) { + return DorisType.INT; + } + @Override public boolean isNumber() { return true; diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt64Type.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt64Type.java index c08abb6..a322da6 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt64Type.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt64Type.java @@ -18,6 +18,9 @@ */ package org.apache.doris.kafka.connector.converter.type.connect; +import org.apache.doris.kafka.connector.converter.type.doris.DorisType; +import org.apache.kafka.connect.data.Schema; + public class ConnectInt64Type extends AbstractConnectSchemaType { public static final ConnectInt64Type INSTANCE = new ConnectInt64Type(); @@ -27,6 +30,11 @@ public class ConnectInt64Type extends AbstractConnectSchemaType { return new String[] {"INT64"}; } + @Override + public String getTypeName(Schema schema) { + return DorisType.BIGINT; + } + @Override public boolean isNumber() { return true; diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt8Type.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt8Type.java index 55c82cf..5c3fae6 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt8Type.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt8Type.java @@ -18,6 +18,9 @@ */ package org.apache.doris.kafka.connector.converter.type.connect; +import org.apache.doris.kafka.connector.converter.type.doris.DorisType; +import org.apache.kafka.connect.data.Schema; + public class ConnectInt8Type extends AbstractConnectSchemaType { public static final ConnectInt8Type INSTANCE = new ConnectInt8Type(); @@ -27,6 +30,11 @@ public class ConnectInt8Type extends AbstractConnectSchemaType { return new String[] {"INT8"}; } + @Override + public String getTypeName(Schema schema) { + return DorisType.TINYINT; + } + @Override public boolean isNumber() { return true; diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectMapToConnectStringType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectMapToConnectStringType.java index cac2624..707dd1c 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectMapToConnectStringType.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectMapToConnectStringType.java @@ -19,12 +19,18 @@ package org.apache.doris.kafka.connector.converter.type.connect; import java.util.Map; +import org.apache.kafka.connect.data.Schema; public class ConnectMapToConnectStringType extends AbstractConnectMapType { public static final ConnectMapToConnectStringType INSTANCE = new ConnectMapToConnectStringType(); + @Override + public String getTypeName(Schema schema) { + return ConnectStringType.INSTANCE.getTypeName(schema); + } + @Override public Object getValue(Object sourceValue) { if (sourceValue instanceof Map) { diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectStringType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectStringType.java index 0353020..bda5478 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectStringType.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectStringType.java @@ -18,6 +18,10 @@ */ package org.apache.doris.kafka.connector.converter.type.connect; +import org.apache.doris.kafka.connector.converter.type.doris.DorisType; +import org.apache.doris.kafka.connector.converter.type.doris.DorisTypeProperties; +import org.apache.kafka.connect.data.Schema; + /** * An implementation of {@link org.apache.doris.kafka.connector.converter.type.Type} that supports * {@code STRING} connect schema types. @@ -26,8 +30,23 @@ public class ConnectStringType extends AbstractConnectSchemaType { public static final ConnectStringType INSTANCE = new ConnectStringType(); + @Override + public String getTypeName(Schema schema) { + int columnLength = getColumnLength(schema); + if (columnLength > 0) { + return columnLength * 3 > DorisTypeProperties.MAX_VARCHAR_SIZE + ? DorisType.STRING + : String.format("%s(%s)", DorisType.VARCHAR, columnLength * 3); + } + return DorisType.STRING; + } + @Override public String[] getRegistrationKeys() { return new String[] {"STRING"}; } + + private int getColumnLength(Schema schema) { + return Integer.parseInt(getSourceColumnLength(schema).orElse("0")); + } } diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimeType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimeType.java index de3be44..c2e1698 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimeType.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimeType.java @@ -18,12 +18,12 @@ */ package org.apache.doris.kafka.connector.converter.type.connect; +import io.debezium.connector.jdbc.util.DateTimeUtils; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.util.Date; import org.apache.doris.kafka.connector.converter.type.AbstractTimeType; -import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils; import org.apache.kafka.connect.data.Time; import org.apache.kafka.connect.errors.ConnectException; diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimestampType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimestampType.java index 8af71b9..2de8c42 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimestampType.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimestampType.java @@ -18,8 +18,8 @@ */ package org.apache.doris.kafka.connector.converter.type.connect; +import io.debezium.connector.jdbc.util.DateTimeUtils; import org.apache.doris.kafka.connector.converter.type.AbstractTimestampType; -import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils; import org.apache.kafka.connect.data.Timestamp; import org.apache.kafka.connect.errors.ConnectException; diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/DateType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/DateType.java index 912f0a4..a5589f3 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/DateType.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/DateType.java @@ -18,9 +18,9 @@ */ package org.apache.doris.kafka.connector.converter.type.debezium; +import io.debezium.connector.jdbc.util.DateTimeUtils; import io.debezium.time.Date; import org.apache.doris.kafka.connector.converter.type.AbstractDateType; -import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils; import org.apache.kafka.connect.errors.ConnectException; public class DateType extends AbstractDateType { diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimeType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimeType.java index 36eeceb..b2a1381 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimeType.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimeType.java @@ -18,9 +18,9 @@ */ package org.apache.doris.kafka.connector.converter.type.debezium; +import io.debezium.connector.jdbc.util.DateTimeUtils; import io.debezium.time.MicroTime; import java.time.LocalTime; -import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils; public class MicroTimeType extends AbstractDebeziumTimeType { diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimestampType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimestampType.java index b8c71a2..cb8e3c9 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimestampType.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimestampType.java @@ -18,9 +18,9 @@ */ package org.apache.doris.kafka.connector.converter.type.debezium; +import io.debezium.connector.jdbc.util.DateTimeUtils; import io.debezium.time.MicroTimestamp; import java.time.LocalDateTime; -import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils; public class MicroTimestampType extends AbstractDebeziumTimestampType { diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimeType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimeType.java index 9519e64..abcc05e 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimeType.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimeType.java @@ -18,9 +18,9 @@ */ package org.apache.doris.kafka.connector.converter.type.debezium; +import io.debezium.connector.jdbc.util.DateTimeUtils; import io.debezium.time.NanoTime; import java.time.LocalTime; -import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils; public class NanoTimeType extends AbstractDebeziumTimeType { diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimestampType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimestampType.java index eec06c8..a7c08d0 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimestampType.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimestampType.java @@ -18,10 +18,10 @@ */ package org.apache.doris.kafka.connector.converter.type.debezium; +import io.debezium.connector.jdbc.util.DateTimeUtils; import io.debezium.time.MicroTimestamp; import io.debezium.time.NanoTimestamp; import java.time.LocalDateTime; -import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils; /** * An implementation of {@link org.apache.doris.kafka.connector.converter.type.Type} for {@link diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/TimeType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/TimeType.java index 83e95d9..be1d329 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/TimeType.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/TimeType.java @@ -18,9 +18,9 @@ */ package org.apache.doris.kafka.connector.converter.type.debezium; +import io.debezium.connector.jdbc.util.DateTimeUtils; import io.debezium.time.Time; import java.time.LocalTime; -import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils; public class TimeType extends AbstractDebeziumTimeType { diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/VariableScaleDecimalType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/VariableScaleDecimalType.java index e38fe41..1cbff47 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/VariableScaleDecimalType.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/VariableScaleDecimalType.java @@ -22,6 +22,8 @@ import io.debezium.data.VariableScaleDecimal; import java.math.BigDecimal; import java.util.Optional; import org.apache.doris.kafka.connector.converter.type.AbstractType; +import org.apache.doris.kafka.connector.converter.type.doris.DorisType; +import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.ConnectException; @@ -51,6 +53,16 @@ public class VariableScaleDecimalType extends AbstractType { getClass().getSimpleName(), sourceValue, sourceValue.getClass().getName())); } + @Override + public String getTypeName(Schema schema) { + // The data passed by VariableScaleDecimal data types does not provide adequate information + // to + // resolve the precision and scale for the data type, so instead we're going to default to + // the + // maximum double-based data types for the dialect, using DOUBLE. + return DorisType.DOUBLE; + } + @Override public boolean isNumber() { return true; diff --git a/src/main/java/org/apache/doris/kafka/connector/dialect/DorisType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/doris/DorisType.java similarity index 89% rename from src/main/java/org/apache/doris/kafka/connector/dialect/DorisType.java rename to src/main/java/org/apache/doris/kafka/connector/converter/type/doris/DorisType.java index 89b416a..c89be76 100644 --- a/src/main/java/org/apache/doris/kafka/connector/dialect/DorisType.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/doris/DorisType.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.doris.kafka.connector.dialect; +package org.apache.doris.kafka.connector.converter.type.doris; public class DorisType { public static final String BOOLEAN = "BOOLEAN"; @@ -29,11 +29,8 @@ public class DorisType { public static final String FLOAT = "FLOAT"; public static final String DOUBLE = "DOUBLE"; public static final String DECIMAL = "DECIMAL"; - public static final String DECIMAL_V3 = "DECIMALV3"; public static final String DATE = "DATE"; - public static final String DATE_V2 = "DATEV2"; public static final String DATETIME = "DATETIME"; - public static final String DATETIME_V2 = "DATETIMEV2"; public static final String CHAR = "CHAR"; public static final String VARCHAR = "VARCHAR"; public static final String STRING = "STRING"; diff --git a/src/main/java/org/apache/doris/kafka/connector/dialect/DialectProperties.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/doris/DorisTypeProperties.java similarity index 83% rename from src/main/java/org/apache/doris/kafka/connector/dialect/DialectProperties.java rename to src/main/java/org/apache/doris/kafka/connector/converter/type/doris/DorisTypeProperties.java index 62e4cab..b20ae8e 100644 --- a/src/main/java/org/apache/doris/kafka/connector/dialect/DialectProperties.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/doris/DorisTypeProperties.java @@ -17,12 +17,16 @@ * under the License. */ -package org.apache.doris.kafka.connector.dialect; +package org.apache.doris.kafka.connector.converter.type.doris; -public class DialectProperties { +public class DorisTypeProperties { /* Max precision of datetime type of Doris. */ public static final int MAX_SUPPORTED_DATE_TIME_PRECISION = 6; public static final int TIMESTAMP_TYPE_MAX_PRECISION = 9; + + public static final int MAX_VARCHAR_SIZE = 65533; + + public static final int MAX_CHAR_SIZE = 255; } diff --git a/src/main/java/org/apache/doris/kafka/connector/dialect/mysql/MysqlType.java b/src/main/java/org/apache/doris/kafka/connector/dialect/mysql/MysqlType.java deleted file mode 100644 index 663aadf..0000000 --- a/src/main/java/org/apache/doris/kafka/connector/dialect/mysql/MysqlType.java +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.doris.kafka.connector.dialect.mysql; - -import static org.apache.doris.kafka.connector.dialect.DialectProperties.MAX_SUPPORTED_DATE_TIME_PRECISION; -import static org.apache.doris.kafka.connector.dialect.DialectProperties.TIMESTAMP_TYPE_MAX_PRECISION; - -import com.google.common.base.Preconditions; -import org.apache.doris.kafka.connector.dialect.DorisType; - -public class MysqlType { - - // MySQL driver returns width of timestamp types instead of precision. - // 19 characters are used for zero-precision timestamps while others - // require 19 + precision + 1 characters with the additional character - // required for the decimal separator. - private static final int ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE = 19; - private static final String BIT = "BIT"; - private static final String BOOLEAN = "BOOLEAN"; - private static final String BOOL = "BOOL"; - private static final String TINYINT = "TINYINT"; - private static final String TINYINT_UNSIGNED = "TINYINT UNSIGNED"; - private static final String TINYINT_UNSIGNED_ZEROFILL = "TINYINT UNSIGNED ZEROFILL"; - private static final String SMALLINT = "SMALLINT"; - private static final String SMALLINT_UNSIGNED = "SMALLINT UNSIGNED"; - private static final String SMALLINT_UNSIGNED_ZEROFILL = "SMALLINT UNSIGNED ZEROFILL"; - private static final String MEDIUMINT = "MEDIUMINT"; - private static final String MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED"; - private static final String MEDIUMINT_UNSIGNED_ZEROFILL = "MEDIUMINT UNSIGNED ZEROFILL"; - private static final String INT = "INT"; - private static final String INT_UNSIGNED = "INT UNSIGNED"; - private static final String INT_UNSIGNED_ZEROFILL = "INT UNSIGNED ZEROFILL"; - private static final String BIGINT = "BIGINT"; - private static final String SERIAL = "SERIAL"; - private static final String BIGINT_UNSIGNED = "BIGINT UNSIGNED"; - private static final String BIGINT_UNSIGNED_ZEROFILL = "BIGINT UNSIGNED ZEROFILL"; - private static final String REAL = "REAL"; - private static final String REAL_UNSIGNED = "REAL UNSIGNED"; - private static final String REAL_UNSIGNED_ZEROFILL = "REAL UNSIGNED ZEROFILL"; - private static final String FLOAT = "FLOAT"; - private static final String FLOAT_UNSIGNED = "FLOAT UNSIGNED"; - private static final String FLOAT_UNSIGNED_ZEROFILL = "FLOAT UNSIGNED ZEROFILL"; - private static final String DOUBLE = "DOUBLE"; - private static final String DOUBLE_UNSIGNED = "DOUBLE UNSIGNED"; - private static final String DOUBLE_UNSIGNED_ZEROFILL = "DOUBLE UNSIGNED ZEROFILL"; - private static final String DOUBLE_PRECISION = "DOUBLE PRECISION"; - private static final String DOUBLE_PRECISION_UNSIGNED = "DOUBLE PRECISION UNSIGNED"; - private static final String DOUBLE_PRECISION_UNSIGNED_ZEROFILL = - "DOUBLE PRECISION UNSIGNED ZEROFILL"; - private static final String NUMERIC = "NUMERIC"; - private static final String NUMERIC_UNSIGNED = "NUMERIC UNSIGNED"; - private static final String NUMERIC_UNSIGNED_ZEROFILL = "NUMERIC UNSIGNED ZEROFILL"; - private static final String FIXED = "FIXED"; - private static final String FIXED_UNSIGNED = "FIXED UNSIGNED"; - private static final String FIXED_UNSIGNED_ZEROFILL = "FIXED UNSIGNED ZEROFILL"; - private static final String DECIMAL = "DECIMAL"; - private static final String DECIMAL_UNSIGNED = "DECIMAL UNSIGNED"; - private static final String DECIMAL_UNSIGNED_ZEROFILL = "DECIMAL UNSIGNED ZEROFILL"; - private static final String CHAR = "CHAR"; - private static final String VARCHAR = "VARCHAR"; - private static final String TINYTEXT = "TINYTEXT"; - private static final String MEDIUMTEXT = "MEDIUMTEXT"; - private static final String TEXT = "TEXT"; - private static final String LONGTEXT = "LONGTEXT"; - private static final String DATE = "DATE"; - private static final String TIME = "TIME"; - private static final String DATETIME = "DATETIME"; - private static final String TIMESTAMP = "TIMESTAMP"; - private static final String YEAR = "YEAR"; - private static final String BINARY = "BINARY"; - private static final String VARBINARY = "VARBINARY"; - private static final String TINYBLOB = "TINYBLOB"; - private static final String MEDIUMBLOB = "MEDIUMBLOB"; - private static final String BLOB = "BLOB"; - private static final String LONGBLOB = "LONGBLOB"; - private static final String JSON = "JSON"; - private static final String ENUM = "ENUM"; - private static final String SET = "SET"; - - public static String toDorisType(String type, Integer length, Integer scale) { - switch (type.toUpperCase()) { - case BIT: - case BOOLEAN: - case BOOL: - return DorisType.BOOLEAN; - case TINYINT: - return DorisType.TINYINT; - case TINYINT_UNSIGNED: - case TINYINT_UNSIGNED_ZEROFILL: - case SMALLINT: - return DorisType.SMALLINT; - case SMALLINT_UNSIGNED: - case SMALLINT_UNSIGNED_ZEROFILL: - case INT: - case MEDIUMINT: - case YEAR: - return DorisType.INT; - case INT_UNSIGNED: - case INT_UNSIGNED_ZEROFILL: - case MEDIUMINT_UNSIGNED: - case MEDIUMINT_UNSIGNED_ZEROFILL: - case BIGINT: - return DorisType.BIGINT; - case BIGINT_UNSIGNED: - case BIGINT_UNSIGNED_ZEROFILL: - return DorisType.LARGEINT; - case FLOAT: - case FLOAT_UNSIGNED: - case FLOAT_UNSIGNED_ZEROFILL: - return DorisType.FLOAT; - case REAL: - case REAL_UNSIGNED: - case REAL_UNSIGNED_ZEROFILL: - case DOUBLE: - case DOUBLE_UNSIGNED: - case DOUBLE_UNSIGNED_ZEROFILL: - case DOUBLE_PRECISION: - case DOUBLE_PRECISION_UNSIGNED: - case DOUBLE_PRECISION_UNSIGNED_ZEROFILL: - return DorisType.DOUBLE; - case NUMERIC: - case NUMERIC_UNSIGNED: - case NUMERIC_UNSIGNED_ZEROFILL: - case FIXED: - case FIXED_UNSIGNED: - case FIXED_UNSIGNED_ZEROFILL: - case DECIMAL: - case DECIMAL_UNSIGNED: - case DECIMAL_UNSIGNED_ZEROFILL: - return length != null && length <= 38 - ? String.format( - "%s(%s,%s)", - DorisType.DECIMAL_V3, - length, - scale != null && scale >= 0 ? scale : 0) - : DorisType.STRING; - case DATE: - return DorisType.DATE_V2; - case DATETIME: - case TIMESTAMP: - // default precision is 0 - // see https://dev.mysql.com/doc/refman/8.0/en/date-and-time-type-syntax.html - if (length == null - || length <= 0 - || length == ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE) { - return String.format("%s(%s)", DorisType.DATETIME_V2, 0); - } else if (length > ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE + 1) { - // Timestamp with a fraction of seconds. - // For example, 2024-01-01 01:01:01.1 - // The decimal point will occupy 1 character. - // Thus,the length of the timestamp is 21. - return String.format( - "%s(%s)", - DorisType.DATETIME_V2, - Math.min( - length - ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE - 1, - MAX_SUPPORTED_DATE_TIME_PRECISION)); - } else if (length <= TIMESTAMP_TYPE_MAX_PRECISION) { - // For Debezium JSON data, the timestamp/datetime length ranges from 0 to 9. - return String.format( - "%s(%s)", - DorisType.DATETIME_V2, - Math.min(length, MAX_SUPPORTED_DATE_TIME_PRECISION)); - } else { - throw new UnsupportedOperationException( - "Unsupported length: " - + length - + " for MySQL TIMESTAMP/DATETIME types"); - } - case CHAR: - case VARCHAR: - Preconditions.checkNotNull(length); - return length * 3 > 65533 - ? DorisType.STRING - : String.format("%s(%s)", DorisType.VARCHAR, length * 3); - case TINYTEXT: - case TEXT: - case MEDIUMTEXT: - case LONGTEXT: - case ENUM: - case TIME: - case TINYBLOB: - case BLOB: - case MEDIUMBLOB: - case LONGBLOB: - case BINARY: - case VARBINARY: - case SET: - return DorisType.STRING; - case JSON: - return DorisType.JSONB; - default: - throw new UnsupportedOperationException("Unsupported MySQL Type: " + type); - } - } -} diff --git a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java index 29810e4..1022344 100644 --- a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java +++ b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java @@ -23,7 +23,6 @@ import com.codahale.metrics.MetricRegistry; import java.util.Collection; import java.util.HashMap; import java.util.Map; -import java.util.Objects; import org.apache.doris.kafka.connector.DorisSinkTask; import org.apache.doris.kafka.connector.cfg.DorisOptions; import org.apache.doris.kafka.connector.connection.ConnectionProvider; @@ -35,7 +34,6 @@ import org.apache.doris.kafka.connector.writer.CopyIntoWriter; import org.apache.doris.kafka.connector.writer.DorisWriter; import org.apache.doris.kafka.connector.writer.StreamLoadWriter; import org.apache.doris.kafka.connector.writer.load.LoadModel; -import org.apache.doris.kafka.connector.writer.schema.DebeziumSchemaChange; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.sink.SinkRecord; @@ -88,23 +86,15 @@ public class DorisDefaultSinkService implements DorisSinkService { if (writer.containsKey(nameIndex)) { LOG.info("already start task"); } else { - DorisWriter dorisWriter; String topic = topicPartition.topic(); int partition = topicPartition.partition(); - String schemaChangeTopic = dorisOptions.getSchemaTopic(); - if (Objects.nonNull(schemaChangeTopic) && schemaChangeTopic.equals(topic)) { - dorisWriter = - new DebeziumSchemaChange( - topic, partition, dorisOptions, conn, connectMonitor); - } else { - LoadModel loadModel = dorisOptions.getLoadModel(); - dorisWriter = - LoadModel.COPY_INTO.equals(loadModel) - ? new CopyIntoWriter( - topic, partition, dorisOptions, conn, connectMonitor) - : new StreamLoadWriter( - topic, partition, dorisOptions, conn, connectMonitor); - } + LoadModel loadModel = dorisOptions.getLoadModel(); + DorisWriter dorisWriter = + LoadModel.COPY_INTO.equals(loadModel) + ? new CopyIntoWriter( + topic, partition, dorisOptions, conn, connectMonitor) + : new StreamLoadWriter( + topic, partition, dorisOptions, conn, connectMonitor); writer.put(nameIndex, dorisWriter); metricsJmxReporter.start(); } @@ -129,7 +119,7 @@ public class DorisDefaultSinkService implements DorisSinkService { // check all sink writer to see if they need to be flushed for (DorisWriter writer : writer.values()) { // Time based flushing - if (!(writer instanceof DebeziumSchemaChange) && writer.shouldFlush()) { + if (writer.shouldFlush()) { writer.flushBuffer(); } } diff --git a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java index 84d3f90..2356f7d 100644 --- a/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java +++ b/src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java @@ -19,13 +19,9 @@ package org.apache.doris.kafka.connector.utils; -import java.util.Arrays; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.regex.Pattern; -import java.util.stream.Collectors; -import org.apache.commons.lang3.StringUtils; import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig; import org.apache.doris.kafka.connector.exception.ArgumentsException; import org.apache.doris.kafka.connector.exception.DorisException; @@ -77,31 +73,6 @@ public class ConfigCheckUtils { configIsValid = false; } - String schemaTopic = config.get(DorisSinkConnectorConfig.SCHEMA_TOPIC); - if (StringUtils.isNotEmpty(schemaTopic)) { - schemaTopic = schemaTopic.trim(); - if (!topics.isEmpty()) { - List<String> topicList = - Arrays.stream(topics.split(",")).collect(Collectors.toList()); - if (!topicList.contains(schemaTopic)) { - LOG.error( - "schema.topic is not included in topics list, please add! " - + " schema.topic={}, topics={}", - schemaTopic, - topics); - configIsValid = false; - } - } - if (!topicsRegex.isEmpty() && !topicsRegex.equals(schemaTopic)) { - LOG.error( - "topics.regex must equals schema.topic. please check again!" - + " topics.regex={}, schema.topic={}", - topicsRegex, - schemaTopic); - configIsValid = false; - } - } - if (config.containsKey(DorisSinkConnectorConfig.TOPICS_TABLES_MAP) && parseTopicToTableMap(config.get(DorisSinkConnectorConfig.TOPICS_TABLES_MAP)) == null) { diff --git a/src/main/java/org/apache/doris/kafka/connector/writer/schema/DebeziumSchemaChange.java b/src/main/java/org/apache/doris/kafka/connector/writer/schema/DebeziumSchemaChange.java deleted file mode 100644 index 1eeab0e..0000000 --- a/src/main/java/org/apache/doris/kafka/connector/writer/schema/DebeziumSchemaChange.java +++ /dev/null @@ -1,289 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.doris.kafka.connector.writer.schema; - -import com.google.common.annotations.VisibleForTesting; -import io.debezium.data.Envelope; -import io.debezium.util.Strings; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import org.apache.doris.kafka.connector.cfg.DorisOptions; -import org.apache.doris.kafka.connector.connection.ConnectionProvider; -import org.apache.doris.kafka.connector.converter.RecordDescriptor; -import org.apache.doris.kafka.connector.exception.SchemaChangeException; -import org.apache.doris.kafka.connector.metrics.DorisConnectMonitor; -import org.apache.doris.kafka.connector.model.ColumnDescriptor; -import org.apache.doris.kafka.connector.model.TableDescriptor; -import org.apache.doris.kafka.connector.model.doris.Schema; -import org.apache.doris.kafka.connector.service.DorisSystemService; -import org.apache.doris.kafka.connector.service.RestService; -import org.apache.doris.kafka.connector.writer.DorisWriter; -import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.connect.sink.SinkRecord; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class DebeziumSchemaChange extends DorisWriter { - private static final Logger LOG = LoggerFactory.getLogger(DebeziumSchemaChange.class); - public static final String SCHEMA_CHANGE_VALUE = "SchemaChangeValue"; - public static final String TABLE_CHANGES = "tableChanges"; - public static final String TABLE_CHANGES_TYPE = "type"; - private final Map<String, String> topic2TableMap; - private SchemaChangeManager schemaChangeManager; - private DorisSystemService dorisSystemService; - private Set<String> sinkTableSet; - private List<String> ddlSqlList; - - public DebeziumSchemaChange( - String topic, - int partition, - DorisOptions dorisOptions, - ConnectionProvider connectionProvider, - DorisConnectMonitor connectMonitor) { - super(topic, partition, dorisOptions, connectionProvider, connectMonitor); - this.schemaChange = true; - this.sinkTableSet = new HashSet<>(); - this.dorisSystemService = new DorisSystemService(dorisOptions); - this.topic2TableMap = dorisOptions.getTopicMap(); - this.schemaChangeManager = new SchemaChangeManager(dorisOptions); - init(); - } - - @Override - public void fetchOffset() { - // do nothing - } - - private void init() { - Set<Map.Entry<String, String>> entrySet = topic2TableMap.entrySet(); - for (Map.Entry<String, String> entry : entrySet) { - sinkTableSet.add(entry.getValue()); - } - } - - @Override - public void insert(SinkRecord record) { - if (!validate(record)) { - processedOffset.set(record.kafkaOffset()); - return; - } - schemaChange(record); - } - - private boolean validate(final SinkRecord record) { - if (!isSchemaChange(record)) { - LOG.warn( - "Current topic={}, the message does not contain schema change change information, please check schema.topic", - dorisOptions.getSchemaTopic()); - throw new SchemaChangeException( - "The message does not contain schema change change information, please check schema.topic"); - } - - tableName = resolveTableName(record); - if (tableName == null) { - LOG.warn( - "Ignored to write record from topic '{}' partition '{}' offset '{}'. No resolvable table name", - record.topic(), - record.kafkaPartition(), - record.kafkaOffset()); - return false; - } - - if (!sinkTableSet.contains(tableName)) { - LOG.warn( - "The " - + tableName - + " is not defined and requires synchronized data. If you need to synchronize the table data, please configure it in 'doris.topic2table.map'"); - return false; - } - - Struct recordStruct = (Struct) (record.value()); - if (isTruncate(recordStruct)) { - LOG.warn("Truncate {} table is not supported", tableName); - return false; - } - - List<Object> tableChanges = recordStruct.getArray(TABLE_CHANGES); - Struct tableChange = (Struct) tableChanges.get(0); - if ("DROP".equalsIgnoreCase(tableChange.getString(TABLE_CHANGES_TYPE)) - || "CREATE".equalsIgnoreCase(tableChange.getString(TABLE_CHANGES_TYPE))) { - LOG.warn( - "CREATE and DROP {} tables are currently not supported. Please create or drop them manually.", - tableName); - return false; - } - return true; - } - - @Override - public void commit(int partition) { - // do nothing - } - - private void schemaChange(final SinkRecord record) { - Struct recordStruct = (Struct) (record.value()); - List<Object> tableChanges = recordStruct.getArray(TABLE_CHANGES); - Struct tableChange = (Struct) tableChanges.get(0); - RecordDescriptor recordDescriptor = - RecordDescriptor.builder() - .withSinkRecord(record) - .withTableChange(tableChange) - .build(); - tableChange(tableName, recordDescriptor); - } - - private boolean isTruncate(final Struct record) { - // Generally the truncate corresponding tableChanges is empty - return record.getArray(TABLE_CHANGES).isEmpty(); - } - - private static boolean isSchemaChange(SinkRecord record) { - return record.valueSchema() != null - && !Strings.isNullOrEmpty(record.valueSchema().name()) - && record.valueSchema().name().contains(SCHEMA_CHANGE_VALUE); - } - - private String resolveTableName(SinkRecord record) { - if (isTombstone(record)) { - LOG.warn( - "Ignore this record because it seems to be a tombstone that doesn't have source field, then cannot resolve table name in topic '{}', partition '{}', offset '{}'", - record.topic(), - record.kafkaPartition(), - record.kafkaOffset()); - return null; - } - Struct source = ((Struct) record.value()).getStruct(Envelope.FieldName.SOURCE); - return source.getString("table"); - } - - private void alterTableIfNeeded(String tableName, RecordDescriptor record) { - LOG.debug("Attempting to alter table '{}'.", tableName); - if (!hasTable(tableName)) { - LOG.error("Table '{}' does not exist and cannot be altered.", tableName); - throw new SchemaChangeException("Could not find table: " + tableName); - } - final TableDescriptor dorisTableDescriptor = obtainTableSchema(tableName); - SchemaChangeHelper.compareSchema(dorisTableDescriptor, record.getFields()); - ddlSqlList = SchemaChangeHelper.generateDDLSql(dorisOptions.getDatabase(), tableName); - doSchemaChange(dorisOptions.getDatabase(), tableName); - } - - /** Obtain table schema from doris. */ - private TableDescriptor obtainTableSchema(String tableName) { - Schema schema = RestService.getSchema(dorisOptions, dbName, tableName, LOG); - List<ColumnDescriptor> columnDescriptors = new ArrayList<>(); - schema.getProperties() - .forEach( - column -> { - ColumnDescriptor columnDescriptor = - ColumnDescriptor.builder() - .columnName(column.getName()) - .typeName(column.getType()) - .comment(column.getComment()) - .build(); - columnDescriptors.add(columnDescriptor); - }); - return TableDescriptor.builder() - .tableName(tableName) - .type(schema.getKeysType()) - .columns(columnDescriptors) - .build(); - } - - private boolean hasTable(String tableName) { - return dorisSystemService.tableExists(dbName, tableName); - } - - private void tableChange(String tableName, RecordDescriptor recordDescriptor) { - if (!hasTable(tableName)) { - // TODO Table does not exist, automatically created it. - LOG.error("{} Table does not exist, please create manually.", tableName); - } else { - // Table exists, lets attempt to alter it if necessary. - alterTableIfNeeded(tableName, recordDescriptor); - } - processedOffset.set(recordDescriptor.getOffset()); - } - - private boolean doSchemaChange(String database, String tableName) { - boolean status = false; - if (ddlSqlList.isEmpty()) { - LOG.info("Schema change ddl is empty, not need do schema change."); - return false; - } - try { - List<SchemaChangeHelper.DDLSchema> ddlSchemas = SchemaChangeHelper.getDdlSchemas(); - for (int i = 0; i < ddlSqlList.size(); i++) { - SchemaChangeHelper.DDLSchema ddlSchema = ddlSchemas.get(i); - String ddlSql = ddlSqlList.get(i); - boolean doSchemaChange = checkSchemaChange(database, tableName, ddlSchema); - status = - doSchemaChange - && schemaChangeManager.execute(ddlSql, dorisOptions.getDatabase()); - LOG.info("schema change status:{}, ddl:{}", status, ddlSql); - } - } catch (Exception e) { - LOG.warn("schema change error :", e); - } - return status; - } - - private boolean checkSchemaChange( - String database, String table, SchemaChangeHelper.DDLSchema ddlSchema) - throws IllegalArgumentException, IOException { - Map<String, Object> param = - SchemaChangeManager.buildRequestParam( - ddlSchema.isDropColumn(), ddlSchema.getColumnName()); - return schemaChangeManager.checkSchemaChange(database, table, param); - } - - public long getOffset() { - committedOffset.set(processedOffset.get()); - return committedOffset.get() + 1; - } - - private boolean isTombstone(SinkRecord record) { - return record.value() == null; - } - - @VisibleForTesting - public void setSinkTableSet(Set<String> sinkTableSet) { - this.sinkTableSet = sinkTableSet; - } - - @VisibleForTesting - public void setDorisSystemService(DorisSystemService dorisSystemService) { - this.dorisSystemService = dorisSystemService; - } - - @VisibleForTesting - public List<String> getDdlSqlList() { - return ddlSqlList; - } - - @VisibleForTesting - public void setSchemaChangeManager(SchemaChangeManager schemaChangeManager) { - this.schemaChangeManager = schemaChangeManager; - } -} diff --git a/src/main/java/org/apache/doris/kafka/connector/writer/schema/SchemaChangeHelper.java b/src/main/java/org/apache/doris/kafka/connector/writer/schema/SchemaChangeHelper.java deleted file mode 100644 index abf424c..0000000 --- a/src/main/java/org/apache/doris/kafka/connector/writer/schema/SchemaChangeHelper.java +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.doris.kafka.connector.writer.schema; - -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; -import org.apache.commons.compress.utils.Lists; -import org.apache.commons.lang3.StringUtils; -import org.apache.doris.kafka.connector.converter.RecordDescriptor; -import org.apache.doris.kafka.connector.model.ColumnDescriptor; -import org.apache.doris.kafka.connector.model.TableDescriptor; - -public class SchemaChangeHelper { - private static final List<ColumnDescriptor> addColumnDescriptors = Lists.newArrayList(); - // Used to determine whether the column in the doris table can undergo schema change - private static final List<DDLSchema> ddlSchemas = Lists.newArrayList(); - private static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s"; - - // TODO support drop column - // Dropping a column is a dangerous behavior and may result in an accidental deletion. - // There are some problems in the current implementation: each alter column operation will read - // the table structure - // in doris and compare the schema with the topic message. - // When there are more columns in the doris table than in the upstream table, - // these redundant columns in doris will be dropped, regardless of these redundant columns, is - // what you need. - // Therefore, the operation of dropping a column behavior currently requires the user to do it - // himself. - private static final String DROP_DDL = "ALTER TABLE %s DROP COLUMN %s"; - - /** - * Compare kafka upstream table structure with doris table structure. If kafka field does not - * contain the structure of dorisTable, then need to add this field. - * - * @param dorisTable read from the table schema of doris. - * @param fields table structure from kafka upstream data source. - */ - public static void compareSchema( - TableDescriptor dorisTable, Map<String, RecordDescriptor.FieldDescriptor> fields) { - // Determine whether fields need to be added to doris table - addColumnDescriptors.clear(); - Collection<ColumnDescriptor> dorisTableColumns = dorisTable.getColumns(); - Set<String> dorisTableColumnNames = - dorisTableColumns.stream() - .map(ColumnDescriptor::getColumnName) - .collect(Collectors.toSet()); - Set<Map.Entry<String, RecordDescriptor.FieldDescriptor>> fieldsEntries = fields.entrySet(); - for (Map.Entry<String, RecordDescriptor.FieldDescriptor> fieldEntry : fieldsEntries) { - String fieldName = fieldEntry.getKey(); - if (!dorisTableColumnNames.contains(fieldName)) { - RecordDescriptor.FieldDescriptor fieldDescriptor = fieldEntry.getValue(); - ColumnDescriptor columnDescriptor = - new ColumnDescriptor.Builder() - .columnName(fieldDescriptor.getName()) - .typeName(fieldDescriptor.getSchemaTypeName()) - .defaultValue(fieldDescriptor.getDefaultValue()) - .comment(fieldDescriptor.getComment()) - .build(); - addColumnDescriptors.add(columnDescriptor); - } - } - } - - public static List<String> generateDDLSql(String database, String table) { - ddlSchemas.clear(); - List<String> ddlList = Lists.newArrayList(); - for (ColumnDescriptor columnDescriptor : addColumnDescriptors) { - ddlList.add(buildAddColumnDDL(database, table, columnDescriptor)); - ddlSchemas.add(new DDLSchema(columnDescriptor.getColumnName(), false)); - } - return ddlList; - } - - public static List<DDLSchema> getDdlSchemas() { - return ddlSchemas; - } - - private static String buildDropColumnDDL(String database, String tableName, String columName) { - return String.format( - DROP_DDL, - identifier(database) + "." + identifier(tableName), - identifier(columName)); - } - - private static String buildAddColumnDDL( - String database, String tableName, ColumnDescriptor columnDescriptor) { - String columnName = columnDescriptor.getColumnName(); - String columnType = columnDescriptor.getTypeName(); - String defaultValue = columnDescriptor.getDefaultValue(); - String comment = columnDescriptor.getComment(); - String addDDL = - String.format( - ADD_DDL, - identifier(database) + "." + identifier(tableName), - identifier(columnName), - columnType); - if (defaultValue != null) { - addDDL = addDDL + " DEFAULT " + quoteDefaultValue(defaultValue); - } - if (StringUtils.isNotEmpty(comment)) { - addDDL = addDDL + " COMMENT '" + quoteComment(comment) + "'"; - } - return addDDL; - } - - private static String identifier(String name) { - return "`" + name + "`"; - } - - private static String quoteDefaultValue(String defaultValue) { - // DEFAULT current_timestamp not need quote - if (defaultValue.equalsIgnoreCase("current_timestamp")) { - return defaultValue; - } - return "'" + defaultValue + "'"; - } - - private static String quoteComment(String comment) { - return comment.replaceAll("'", "\\\\'"); - } - - public static class DDLSchema { - private final String columnName; - private final boolean isDropColumn; - - public DDLSchema(String columnName, boolean isDropColumn) { - this.columnName = columnName; - this.isDropColumn = isDropColumn; - } - - public String getColumnName() { - return columnName; - } - - public boolean isDropColumn() { - return isDropColumn; - } - } -} diff --git a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java index 3cff4fa..f80a737 100644 --- a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java +++ b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java @@ -19,6 +19,13 @@ package org.apache.doris.kafka.connector.converter; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; + import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -30,20 +37,31 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import org.apache.doris.kafka.connector.cfg.DorisOptions; +import org.apache.doris.kafka.connector.converter.schema.SchemaChangeManager; +import org.apache.doris.kafka.connector.exception.DorisException; +import org.apache.doris.kafka.connector.model.doris.Schema; +import org.apache.doris.kafka.connector.service.DorisSystemService; +import org.apache.doris.kafka.connector.service.RestService; import org.apache.doris.kafka.connector.writer.TestRecordBuffer; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; public class TestRecordService { + private final ObjectMapper objectMapper = new ObjectMapper(); private RecordService recordService; private Properties props = new Properties(); private JsonConverter jsonConverter = new JsonConverter(); + private MockedStatic<RestService> mockRestService; @Before public void init() throws IOException { @@ -54,9 +72,21 @@ public class TestRecordService { props.load(stream); props.put("task_id", "1"); props.put("converter.mode", "debezium_ingestion"); + props.put("schema.evolution", "basic"); + props.put( + "doris.topic2table.map", + "avro_schema.wdl_test.example_table:example_table,normal.wdl_test.test_sink_normal:test_sink_normal"); recordService = new RecordService(new DorisOptions((Map) props)); HashMap<String, String> config = new HashMap<>(); jsonConverter.configure(config, false); + mockRestService = mockStatic(RestService.class); + + SchemaChangeManager mockSchemaChangeManager = Mockito.mock(SchemaChangeManager.class); + DorisSystemService mockDorisSystemService = mock(DorisSystemService.class); + doNothing().when(mockSchemaChangeManager).addColumnDDL(anyString(), any()); + when(mockDorisSystemService.tableExists(anyString(), anyString())).thenReturn(true); + recordService.setDorisSystemService(mockDorisSystemService); + recordService.setSchemaChangeManager(mockSchemaChangeManager); } /** @@ -70,7 +100,19 @@ public class TestRecordService { */ @Test public void processMysqlDebeziumStructRecord() throws IOException { - String topic = "normal.wdl_test.example_table"; + String schemaStr = + "{\"keysType\":\"UNIQUE_KEYS\",\"properties\":[{\"name\":\"id\",\"aggregation_type\":\"\",\"comment\":\"\",\"type\":\"BIGINT\"},{\"name\":\"name\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"VARCHAR\"},{\"name\":\"age\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"INT\"},{\"name\":\"email\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"VARCHAR\"},{\"name\":\"birth_date\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"DATEV [...] + Schema schema = null; + try { + schema = objectMapper.readValue(schemaStr, Schema.class); + } catch (JsonProcessingException e) { + throw new DorisException(e); + } + mockRestService + .when(() -> RestService.getSchema(any(), any(), any(), any())) + .thenReturn(schema); + + String topic = "avro_schema.wdl_test.example_table"; // no delete value String noDeleteValue = "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"age\"},{\"type\":\"string\",\"optional\":true,\"field\":\"email\"},{\"type\":\"int32\",\"optional\":true,\"name\":\"io.debezium.time.Date\",\"version\":1,\"field\":\"birth_date\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"i [...] @@ -86,13 +128,48 @@ public class TestRecordService { buildProcessStructRecord(topic, deleteValue, expectedDeleteValue); } + @Test + public void processMysqlDebeziumStructRecordAlter() throws IOException { + String schemaStr = + "{\"keysType\":\"UNIQUE_KEYS\",\"properties\":[{\"name\":\"id\",\"aggregation_type\":\"\",\"comment\":\"\",\"type\":\"BIGINT\"},{\"name\":\"name\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"VARCHAR\"},{\"name\":\"age\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"INT\"},{\"name\":\"email\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"VARCHAR\"},{\"name\":\"birth_date\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"DATEV [...] + Schema schema = null; + try { + schema = objectMapper.readValue(schemaStr, Schema.class); + } catch (JsonProcessingException e) { + throw new DorisException(e); + } + mockRestService + .when(() -> RestService.getSchema(any(), any(), any(), any())) + .thenReturn(schema); + + String topic = "avro_schema.wdl_test.example_table"; + String topicMsg = + "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"age\"},{\"type\":\"string\",\"optional\":true,\"field\":\"email\"},{\"type\":\"int32\",\"optional\":true,\"name\":\"io.debezium.time.Date\",\"version\":1,\"field\":\"birth_date\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"i [...] + SchemaAndValue schemaValue = + jsonConverter.toConnectData(topic, topicMsg.getBytes(StandardCharsets.UTF_8)); + SinkRecord noDeleteSinkRecord = + TestRecordBuffer.newSinkRecord(topic, schemaValue.value(), 8, schemaValue.schema()); + recordService.processStructRecord(noDeleteSinkRecord); + + // Compare the results of schema change + Map<String, String> resultFields = new HashMap<>(); + resultFields.put("time_column", "DATETIME(0)"); + resultFields.put("blob_column", "STRING"); + Set<RecordDescriptor.FieldDescriptor> missingFields = recordService.getMissingFields(); + for (RecordDescriptor.FieldDescriptor missingField : missingFields) { + Assert.assertTrue(resultFields.containsKey(missingField.getName())); + Assert.assertEquals( + resultFields.get(missingField.getName()), missingField.getTypeName()); + } + } + private void buildProcessStructRecord(String topic, String sourceValue, String target) throws IOException { SchemaAndValue noDeleteSchemaValue = jsonConverter.toConnectData(topic, sourceValue.getBytes(StandardCharsets.UTF_8)); SinkRecord noDeleteSinkRecord = TestRecordBuffer.newSinkRecord( - noDeleteSchemaValue.value(), 8, noDeleteSchemaValue.schema()); + topic, noDeleteSchemaValue.value(), 8, noDeleteSchemaValue.schema()); String processResult = recordService.processStructRecord(noDeleteSinkRecord); Assert.assertEquals(target, processResult); } @@ -113,6 +190,18 @@ public class TestRecordService { @Test public void processStructRecordWithDebeziumSchema() throws IOException { + String schemaStr = + "{\"keysType\":\"UNIQUE_KEYS\",\"properties\":[{\"name\":\"id\",\"aggregation_type\":\"\",\"comment\":\"\",\"type\":\"INT\"},{\"name\":\"name\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"VARCHAR\"}],\"status\":200}"; + Schema schema = null; + try { + schema = objectMapper.readValue(schemaStr, Schema.class); + } catch (JsonProcessingException e) { + throw new DorisException(e); + } + mockRestService + .when(() -> RestService.getSchema(any(), any(), any(), any())) + .thenReturn(schema); + String topic = "normal.wdl_test.test_sink_normal"; // no delete value @@ -171,4 +260,10 @@ public class TestRecordService { String s = recordService.processStringRecord(record); Assert.assertEquals("doris", s); } + + @After + public void close() { + mockRestService.close(); + ; + } } diff --git a/src/test/java/org/apache/doris/kafka/connector/writer/TestDebeziumSchemaChange.java b/src/test/java/org/apache/doris/kafka/connector/writer/TestDebeziumSchemaChange.java deleted file mode 100644 index c95af44..0000000 --- a/src/test/java/org/apache/doris/kafka/connector/writer/TestDebeziumSchemaChange.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.doris.kafka.connector.writer; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.mockStatic; -import static org.mockito.Mockito.when; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import org.apache.doris.kafka.connector.cfg.DorisOptions; -import org.apache.doris.kafka.connector.connection.JdbcConnectionProvider; -import org.apache.doris.kafka.connector.exception.DorisException; -import org.apache.doris.kafka.connector.metrics.DorisConnectMonitor; -import org.apache.doris.kafka.connector.model.doris.Schema; -import org.apache.doris.kafka.connector.service.DorisSystemService; -import org.apache.doris.kafka.connector.service.RestService; -import org.apache.doris.kafka.connector.writer.schema.DebeziumSchemaChange; -import org.apache.doris.kafka.connector.writer.schema.SchemaChangeManager; -import org.apache.kafka.connect.data.SchemaAndValue; -import org.apache.kafka.connect.json.JsonConverter; -import org.apache.kafka.connect.sink.SinkRecord; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.mockito.MockedStatic; -import org.mockito.Mockito; - -public class TestDebeziumSchemaChange { - private final ObjectMapper objectMapper = new ObjectMapper(); - private final JsonConverter jsonConverter = new JsonConverter(); - private final HashSet<String> sinkTableSet = new HashSet<>(); - private DebeziumSchemaChange debeziumSchemaChange; - private DorisOptions dorisOptions; - private String topic; - private MockedStatic<RestService> mockRestService; - - @Before - public void init() throws IOException { - InputStream stream = - this.getClass() - .getClassLoader() - .getResourceAsStream("doris-connector-sink.properties"); - Properties props = new Properties(); - props.load(stream); - props.put("task_id", "1"); - props.put("name", "sink-connector-test"); - topic = "normal"; - dorisOptions = new DorisOptions((Map) props); - DorisConnectMonitor dorisConnectMonitor = mock(DorisConnectMonitor.class); - DorisSystemService mockDorisSystemService = mock(DorisSystemService.class); - jsonConverter.configure(new HashMap<>(), false); - mockRestService = mockStatic(RestService.class); - SchemaChangeManager mockSchemaChangeManager = Mockito.mock(SchemaChangeManager.class); - Mockito.when( - mockSchemaChangeManager.checkSchemaChange( - Mockito.any(), Mockito.any(), Mockito.any())) - .thenReturn(true); - debeziumSchemaChange = - new DebeziumSchemaChange( - topic, - 0, - dorisOptions, - new JdbcConnectionProvider(dorisOptions), - dorisConnectMonitor); - when(mockDorisSystemService.tableExists(anyString(), anyString())).thenReturn(true); - - sinkTableSet.add("normal_time"); - debeziumSchemaChange.setSchemaChangeManager(mockSchemaChangeManager); - debeziumSchemaChange.setSinkTableSet(sinkTableSet); - debeziumSchemaChange.setDorisSystemService(mockDorisSystemService); - } - - @Test - public void testAlterSchemaChange() { - String alterTopicMsg = - "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"version\"},{\"type\":\"string\",\"optional\":false,\"field\":\"connector\"},{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"ts_ms\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Enum\",\"version\":1,\"parameters\":{\"allowed\":\"true,last,false,incremental\"}, [...] - SchemaAndValue schemaAndValue = - jsonConverter.toConnectData(topic, alterTopicMsg.getBytes(StandardCharsets.UTF_8)); - SinkRecord sinkRecord = - TestRecordBuffer.newSinkRecord(schemaAndValue.value(), 8, schemaAndValue.schema()); - String normalTimeSchemaStr = - "{\"keysType\":\"UNIQUE_KEYS\",\"properties\":[{\"name\":\"id\",\"aggregation_type\":\"\",\"comment\":\"\",\"type\":\"INT\"},{\"name\":\"timestamp_test\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"DATETIMEV2\"},{\"name\":\"datetime_test\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"DATETIMEV2\"},{\"name\":\"date_test\",\"aggregation_type\":\"NONE\",\"comment\":\"\",\"type\":\"DATEV2\"}],\"status\":200}"; - Schema normalTimeSchema = null; - try { - normalTimeSchema = objectMapper.readValue(normalTimeSchemaStr, Schema.class); - } catch (JsonProcessingException e) { - throw new DorisException(e); - } - mockRestService - .when(() -> RestService.getSchema(any(), any(), any(), any())) - .thenReturn(normalTimeSchema); - - debeziumSchemaChange.insert(sinkRecord); - List<String> ddlSqlList = debeziumSchemaChange.getDdlSqlList(); - Assert.assertEquals( - ddlSqlList.get(0), - "ALTER TABLE `test_db`.`normal_time` ADD COLUMN `time_test` STRING DEFAULT '12:00'"); - } - - @After - public void close() { - mockRestService.close(); - } -} diff --git a/src/test/java/org/apache/doris/kafka/connector/writer/TestRecordBuffer.java b/src/test/java/org/apache/doris/kafka/connector/writer/TestRecordBuffer.java index 3dd2292..de2e654 100644 --- a/src/test/java/org/apache/doris/kafka/connector/writer/TestRecordBuffer.java +++ b/src/test/java/org/apache/doris/kafka/connector/writer/TestRecordBuffer.java @@ -52,16 +52,11 @@ public class TestRecordBuffer { return record; } - public static SinkRecord newSinkRecord(Object value, long offset, Schema valueSchema) { + public static SinkRecord newSinkRecord( + String topic, Object value, long offset, Schema valueSchema) { SinkRecord record = new SinkRecord( - "topic", - 0, - Schema.OPTIONAL_STRING_SCHEMA, - "key", - valueSchema, - value, - offset); + topic, 0, Schema.OPTIONAL_STRING_SCHEMA, "key", valueSchema, value, offset); return record; } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org