This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new 3589528 [Feature]Support kafka type converter (#12) 3589528 is described below commit 3589528b5052f159aaa27b9efaf55fc28884bd87 Author: wudongliang <46414265+donglian...@users.noreply.github.com> AuthorDate: Mon Apr 15 10:06:56 2024 +0800 [Feature]Support kafka type converter (#12) --- pom.xml | 6 + .../doris/kafka/connector/cfg/DorisOptions.java | 9 + .../connector/cfg/DorisSinkConnectorConfig.java | 2 + .../connector/converter/RecordDescriptor.java | 225 +++++++++++++++++++++ .../kafka/connector/converter/RecordService.java | 107 +++++++--- .../connector/converter/RecordTypeRegister.java | 113 +++++++++++ .../connector/converter/type/AbstractDateType.java | 22 ++ .../converter/type/AbstractTemporalType.java | 51 +++++ .../connector/converter/type/AbstractTimeType.java | 22 ++ .../converter/type/AbstractTimestampType.java | 22 ++ .../connector/converter/type/AbstractType.java | 43 ++++ .../doris/kafka/connector/converter/type/Type.java | 46 +++++ .../type/connect/AbstractConnectMapType.java | 45 +++++ .../type/connect/AbstractConnectSchemaType.java | 32 +++ .../converter/type/connect/ConnectBooleanType.java | 29 +++ .../converter/type/connect/ConnectBytesType.java | 60 ++++++ .../converter/type/connect/ConnectDateType.java | 48 +++++ .../converter/type/connect/ConnectDecimalType.java | 41 ++++ .../converter/type/connect/ConnectFloat32Type.java | 34 ++++ .../converter/type/connect/ConnectFloat64Type.java | 34 ++++ .../converter/type/connect/ConnectInt16Type.java | 34 ++++ .../converter/type/connect/ConnectInt32Type.java | 34 ++++ .../converter/type/connect/ConnectInt64Type.java | 34 ++++ .../converter/type/connect/ConnectInt8Type.java | 34 ++++ .../connect/ConnectMapToConnectStringType.java | 35 ++++ .../converter/type/connect/ConnectStringType.java | 33 +++ .../converter/type/connect/ConnectTimeType.java | 56 +++++ .../type/connect/ConnectTimestampType.java | 49 +++++ .../type/debezium/AbstractDebeziumTimeType.java | 44 ++++ .../debezium/AbstractDebeziumTimestampType.java | 42 ++++ .../converter/type/debezium/DateType.java | 49 +++++ .../converter/type/debezium/MicroTimeType.java | 38 ++++ .../type/debezium/MicroTimestampType.java | 38 ++++ .../converter/type/debezium/NanoTimeType.java | 38 ++++ .../converter/type/debezium/NanoTimestampType.java | 43 ++++ .../converter/type/debezium/TimeType.java | 38 ++++ .../converter/type/debezium/TimestampType.java | 39 ++++ .../type/debezium/VariableScaleDecimalType.java | 58 ++++++ .../converter/type/debezium/ZonedTimeType.java | 55 +++++ .../type/debezium/ZonedTimestampType.java | 52 +++++ .../connector/converter/utils/DateTimeUtils.java | 115 +++++++++++ .../connector/converter/TestRecordService.java | 85 ++++---- 42 files changed, 1973 insertions(+), 61 deletions(-) diff --git a/pom.xml b/pom.xml index 6f8dbab..2ceea3c 100644 --- a/pom.xml +++ b/pom.xml @@ -250,6 +250,12 @@ <artifactId>kafka-connect-avro-converter</artifactId> <version>${confluent.version}</version> </dependency> + <!-- https://mvnrepository.com/artifact/io.debezium/debezium-core --> + <dependency> + <groupId>io.debezium</groupId> + <artifactId>debezium-core</artifactId> + <version>1.9.8.Final</version> + </dependency> </dependencies> <build> diff --git a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java index c47ea1f..6183869 100644 --- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java +++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java @@ -58,6 +58,7 @@ public class DorisOptions { private final Properties streamLoadProp = new Properties(); private String labelPrefix; + private String databaseTimeZone; private LoadModel loadModel; private DeliveryGuarantee deliveryGuarantee; private ConverterMode converterMode; @@ -71,6 +72,10 @@ public class DorisOptions { this.password = config.get(DorisSinkConnectorConfig.DORIS_PASSWORD); this.database = config.get(DorisSinkConnectorConfig.DORIS_DATABASE); this.taskId = Integer.parseInt(config.get(ConfigCheckUtils.TASK_ID)); + this.databaseTimeZone = DorisSinkConnectorConfig.DATABASE_TIME_ZONE_DEFAULT; + if (config.containsKey(DorisSinkConnectorConfig.DATABASE_TIME_ZONE)) { + this.databaseTimeZone = config.get(DorisSinkConnectorConfig.DATABASE_TIME_ZONE); + } this.loadModel = LoadModel.of( config.getOrDefault( @@ -275,6 +280,10 @@ public class DorisOptions { return autoRedirect; } + public String getDatabaseTimeZone() { + return databaseTimeZone; + } + public boolean isEnableDelete() { return enableDelete; } diff --git a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java index 5ca55ef..58b6a62 100644 --- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java +++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java @@ -69,6 +69,8 @@ public class DorisSinkConnectorConfig { public static final String REQUEST_CONNECT_TIMEOUT_MS = "request.connect.timeout.ms"; public static final Integer DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT = 30 * 1000; public static final Integer DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT = 30 * 1000; + public static final String DATABASE_TIME_ZONE = "database.time_zone"; + public static final String DATABASE_TIME_ZONE_DEFAULT = "UTC"; public static final String LOAD_MODEL = "load.model"; public static final String LOAD_MODEL_DEFAULT = LoadModel.STREAM_LOAD.name(); public static final String AUTO_REDIRECT = "auto.redirect"; diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java b/src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java new file mode 100644 index 0000000..66a7c21 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.doris.kafka.connector.converter; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; + +public class RecordDescriptor { + private final SinkRecord record; + private final String topicName; + private final List<String> keyFieldNames; + private final List<String> nonKeyFieldNames; + private final Map<String, FieldDescriptor> fields; + private final boolean flattened; + + private RecordDescriptor( + SinkRecord record, + String topicName, + List<String> keyFieldNames, + List<String> nonKeyFieldNames, + Map<String, FieldDescriptor> fields, + boolean flattened) { + this.record = record; + this.topicName = topicName; + this.keyFieldNames = keyFieldNames; + this.nonKeyFieldNames = nonKeyFieldNames; + this.fields = fields; + this.flattened = flattened; + } + + public String getTopicName() { + return topicName; + } + + public Integer getPartition() { + return record.kafkaPartition(); + } + + public long getOffset() { + return record.kafkaOffset(); + } + + public List<String> getKeyFieldNames() { + return keyFieldNames; + } + + public List<String> getNonKeyFieldNames() { + return nonKeyFieldNames; + } + + public Map<String, FieldDescriptor> getFields() { + return fields; + } + + public boolean isDebeziumSinkRecord() { + return !flattened; + } + + public boolean isTombstone() { + // NOTE + // Debezium TOMBSTONE has both value and valueSchema to null, instead the + // ExtractNewRecordState SMT with delete.handling.mode=none will generate + // a record only with value null that by JDBC connector is treated as a flattened delete. + // See isDelete method. + return record.value() == null && record.valueSchema() == null; + } + + public boolean isDelete() { + if (!isDebeziumSinkRecord()) { + return record.value() == null; + } else if (record.value() != null) { + final Struct value = (Struct) record.value(); + return "d".equals(value.getString("op")); + } + return false; + } + + public Struct getAfterStruct() { + if (isDebeziumSinkRecord()) { + return ((Struct) record.value()).getStruct("after"); + } else { + return ((Struct) record.value()); + } + } + + public Struct getBeforeStruct() { + if (isDebeziumSinkRecord()) { + return ((Struct) record.value()).getStruct("before"); + } else { + return ((Struct) record.value()); + } + } + + public static Builder builder() { + return new Builder(); + } + + public static class FieldDescriptor { + private final Schema schema; + private final String name; + private final String schemaTypeName; + private final String schemaName; + + public FieldDescriptor( + Schema schema, String name, String schemaTypeName, String schemaName) { + this.schema = schema; + this.name = name; + this.schemaTypeName = schemaTypeName; + this.schemaName = schemaName; + } + + public String getName() { + return name; + } + + public String getSchemaName() { + return schemaName; + } + + public Schema getSchema() { + return schema; + } + + public String getSchemaTypeName() { + return schemaTypeName; + } + } + + public static class Builder { + + private SinkRecord sinkRecord; + + // Internal build state + private final List<String> keyFieldNames = new ArrayList<>(); + private final List<String> nonKeyFieldNames = new ArrayList<>(); + private final Map<String, FieldDescriptor> allFields = new LinkedHashMap<>(); + + public Builder withSinkRecord(SinkRecord record) { + this.sinkRecord = record; + return this; + } + + public RecordDescriptor build() { + Objects.requireNonNull(sinkRecord, "The sink record must be provided."); + + final boolean flattened = !isTombstone(sinkRecord) && isFlattened(sinkRecord); + readSinkRecordNonKeyData(sinkRecord, flattened); + + return new RecordDescriptor( + sinkRecord, + sinkRecord.topic(), + keyFieldNames, + nonKeyFieldNames, + allFields, + flattened); + } + + private boolean isFlattened(SinkRecord record) { + return record.valueSchema().name() == null + || !record.valueSchema().name().contains("Envelope"); + } + + private boolean isTombstone(SinkRecord record) { + + return record.value() == null && record.valueSchema() == null; + } + + private void readSinkRecordNonKeyData(SinkRecord record, boolean flattened) { + final Schema valueSchema = record.valueSchema(); + if (valueSchema != null) { + if (flattened) { + // In a flattened event type, it's safe to read the field names directly + // from the schema as this isn't a complex Debezium message type. + applyNonKeyFields(valueSchema); + } else { + final Field after = valueSchema.field("after"); + if (after == null) { + throw new ConnectException( + "Received an unexpected message type that does not have an 'after' Debezium block"); + } + applyNonKeyFields(after.schema()); + } + } + } + + private void applyNonKeyFields(Schema schema) { + for (Field field : schema.fields()) { + if (!keyFieldNames.contains(field.name())) { + applyNonKeyField(field.name(), field.schema()); + } + } + } + + private void applyNonKeyField(String name, Schema schema) { + FieldDescriptor fieldDescriptor = + new FieldDescriptor(schema, name, schema.type().name(), schema.name()); + nonKeyFieldNames.add(fieldDescriptor.getName()); + allFields.put(fieldDescriptor.getName(), fieldDescriptor); + } + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java b/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java index bd55297..1390761 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java @@ -20,19 +20,22 @@ package org.apache.doris.kafka.connector.converter; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.StringJoiner; import org.apache.doris.kafka.connector.cfg.DorisOptions; +import org.apache.doris.kafka.connector.converter.type.Type; import org.apache.doris.kafka.connector.exception.DataFormatException; import org.apache.doris.kafka.connector.writer.LoadConstants; import org.apache.doris.kafka.connector.writer.RecordBuffer; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.sink.SinkRecord; import org.slf4j.Logger; @@ -43,6 +46,7 @@ public class RecordService { private static final ObjectMapper MAPPER = new ObjectMapper(); private final JsonConverter converter; private DorisOptions dorisOptions; + private RecordTypeRegister recordTypeRegister; public RecordService() { this.converter = new JsonConverter(); @@ -54,6 +58,7 @@ public class RecordService { public RecordService(DorisOptions dorisOptions) { this(); this.dorisOptions = dorisOptions; + this.recordTypeRegister = new RecordTypeRegister(dorisOptions); } /** @@ -61,27 +66,34 @@ public class RecordService { * "optional": false, "name": "" }, "payload": { "name": "doris", "__deleted": "true" } } */ public String processStructRecord(SinkRecord record) { - byte[] bytes = - converter.fromConnectData(record.topic(), record.valueSchema(), record.value()); - String recordValue = new String(bytes, StandardCharsets.UTF_8); + String processedRecord; if (ConverterMode.DEBEZIUM_INGESTION == dorisOptions.getConverterMode()) { - try { - Map<String, Object> recordMap = - MAPPER.readValue(recordValue, new TypeReference<Map<String, Object>>() {}); - // delete sign sync - if ("d".equals(recordMap.get("op"))) { - Map<String, Object> beforeValue = (Map<String, Object>) recordMap.get("before"); - beforeValue.put(LoadConstants.DORIS_DELETE_SIGN, LoadConstants.DORIS_DEL_TRUE); - return MAPPER.writeValueAsString(beforeValue); - } - Map<String, Object> afterValue = (Map<String, Object>) recordMap.get("after"); - afterValue.put(LoadConstants.DORIS_DELETE_SIGN, LoadConstants.DORIS_DEL_FALSE); - return MAPPER.writeValueAsString(afterValue); - } catch (JsonProcessingException e) { - LOG.error("parse record failed, cause by parse json error: {}", recordValue); + RecordDescriptor recordDescriptor = buildRecordDescriptor(record); + if (recordDescriptor.isTombstone()) { + return null; } + List<String> nonKeyFieldNames = recordDescriptor.getNonKeyFieldNames(); + if (recordDescriptor.isDelete()) { + processedRecord = + parseFieldValues( + recordDescriptor, + recordDescriptor.getBeforeStruct(), + nonKeyFieldNames, + true); + } else { + processedRecord = + parseFieldValues( + recordDescriptor, + recordDescriptor.getAfterStruct(), + nonKeyFieldNames, + false); + } + } else { + byte[] bytes = + converter.fromConnectData(record.topic(), record.valueSchema(), record.value()); + processedRecord = new String(bytes, StandardCharsets.UTF_8); } - return recordValue; + return processedRecord; } /** process list record from kafka [{"name":"doris1"},{"name":"doris2"}] */ @@ -114,6 +126,43 @@ public class RecordService { return record.value().toString(); } + private String parseFieldValues( + RecordDescriptor record, Struct source, List<String> fields, boolean isDelete) { + Map<String, Object> filedMapping = new LinkedHashMap<>(); + String filedResult = null; + final Map<String, Type> typeRegistry = recordTypeRegister.getTypeRegistry(); + for (String fieldName : fields) { + final RecordDescriptor.FieldDescriptor field = record.getFields().get(fieldName); + String fieldSchemaName = field.getSchemaName(); + String fieldSchemaTypeName = field.getSchemaTypeName(); + Object value = + field.getSchema().isOptional() + ? source.getWithoutDefault(fieldName) + : source.get(fieldName); + Type type = + Objects.nonNull(fieldSchemaName) + ? typeRegistry.get(fieldSchemaName) + : typeRegistry.get(fieldSchemaTypeName); + Object convertValue = type.getValue(value); + if (Objects.nonNull(convertValue) && !type.isNumber()) { + filedMapping.put(fieldName, convertValue.toString()); + } else { + filedMapping.put(fieldName, convertValue); + } + } + try { + if (isDelete) { + filedMapping.put(LoadConstants.DORIS_DELETE_SIGN, LoadConstants.DORIS_DEL_TRUE); + } else { + filedMapping.put(LoadConstants.DORIS_DELETE_SIGN, LoadConstants.DORIS_DEL_FALSE); + } + filedResult = MAPPER.writeValueAsString(filedMapping); + } catch (JsonProcessingException e) { + LOG.error("parse record failed, cause by parse json error: {}", filedMapping); + } + return filedResult; + } + /** * Given a single Record from put API, process it and convert it into a Json String. * @@ -121,14 +170,26 @@ public class RecordService { * @return Json String */ public String getProcessedRecord(SinkRecord record) { + String processedRecord; if (record.value() instanceof Struct) { - return processStructRecord(record); + processedRecord = processStructRecord(record); } else if (record.value() instanceof List) { - return processListRecord(record); + processedRecord = processListRecord(record); } else if (record.value() instanceof Map) { - return processMapRecord(record); + processedRecord = processMapRecord(record); } else { - return processStringRecord(record); + processedRecord = record.value().toString(); + } + return processedRecord; + } + + private RecordDescriptor buildRecordDescriptor(SinkRecord record) { + RecordDescriptor recordDescriptor; + try { + recordDescriptor = RecordDescriptor.builder().withSinkRecord(record).build(); + } catch (Exception e) { + throw new ConnectException("Failed to process a sink record", e); } + return recordDescriptor; } } diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/RecordTypeRegister.java b/src/main/java/org/apache/doris/kafka/connector/converter/RecordTypeRegister.java new file mode 100644 index 0000000..e78909d --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/converter/RecordTypeRegister.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.doris.kafka.connector.converter; + +import java.util.HashMap; +import java.util.Map; +import org.apache.doris.kafka.connector.cfg.DorisOptions; +import org.apache.doris.kafka.connector.converter.type.Type; +import org.apache.doris.kafka.connector.converter.type.connect.ConnectBooleanType; +import org.apache.doris.kafka.connector.converter.type.connect.ConnectBytesType; +import org.apache.doris.kafka.connector.converter.type.connect.ConnectDateType; +import org.apache.doris.kafka.connector.converter.type.connect.ConnectDecimalType; +import org.apache.doris.kafka.connector.converter.type.connect.ConnectFloat32Type; +import org.apache.doris.kafka.connector.converter.type.connect.ConnectFloat64Type; +import org.apache.doris.kafka.connector.converter.type.connect.ConnectInt16Type; +import org.apache.doris.kafka.connector.converter.type.connect.ConnectInt32Type; +import org.apache.doris.kafka.connector.converter.type.connect.ConnectInt64Type; +import org.apache.doris.kafka.connector.converter.type.connect.ConnectInt8Type; +import org.apache.doris.kafka.connector.converter.type.connect.ConnectMapToConnectStringType; +import org.apache.doris.kafka.connector.converter.type.connect.ConnectStringType; +import org.apache.doris.kafka.connector.converter.type.connect.ConnectTimeType; +import org.apache.doris.kafka.connector.converter.type.connect.ConnectTimestampType; +import org.apache.doris.kafka.connector.converter.type.debezium.DateType; +import org.apache.doris.kafka.connector.converter.type.debezium.MicroTimeType; +import org.apache.doris.kafka.connector.converter.type.debezium.MicroTimestampType; +import org.apache.doris.kafka.connector.converter.type.debezium.NanoTimeType; +import org.apache.doris.kafka.connector.converter.type.debezium.NanoTimestampType; +import org.apache.doris.kafka.connector.converter.type.debezium.TimeType; +import org.apache.doris.kafka.connector.converter.type.debezium.TimestampType; +import org.apache.doris.kafka.connector.converter.type.debezium.VariableScaleDecimalType; +import org.apache.doris.kafka.connector.converter.type.debezium.ZonedTimeType; +import org.apache.doris.kafka.connector.converter.type.debezium.ZonedTimestampType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RecordTypeRegister { + + private static final Logger LOG = LoggerFactory.getLogger(RecordTypeRegister.class); + + private final Map<String, Type> typeRegistry = new HashMap<>(); + private final DorisOptions dorisOptions; + + public RecordTypeRegister(DorisOptions dorisOptions) { + this.dorisOptions = dorisOptions; + registerTypes(); + } + + protected void registerTypes() { + // Supported common Debezium data types + registerType(DateType.INSTANCE); + registerType(TimeType.INSTANCE); + registerType(MicroTimeType.INSTANCE); + registerType(TimestampType.INSTANCE); + registerType(MicroTimestampType.INSTANCE); + registerType(NanoTimeType.INSTANCE); + registerType(NanoTimestampType.INSTANCE); + registerType(ZonedTimeType.INSTANCE); + registerType(ZonedTimestampType.INSTANCE); + registerType(VariableScaleDecimalType.INSTANCE); + + // Supported connect data types + registerType(ConnectBooleanType.INSTANCE); + registerType(ConnectBytesType.INSTANCE); + registerType(ConnectDateType.INSTANCE); + registerType(ConnectDecimalType.INSTANCE); + registerType(ConnectFloat32Type.INSTANCE); + registerType(ConnectFloat64Type.INSTANCE); + registerType(ConnectInt8Type.INSTANCE); + registerType(ConnectInt16Type.INSTANCE); + registerType(ConnectInt32Type.INSTANCE); + registerType(ConnectInt64Type.INSTANCE); + registerType(ConnectStringType.INSTANCE); + registerType(ConnectTimestampType.INSTANCE); + registerType(ConnectTimeType.INSTANCE); + registerType(ConnectMapToConnectStringType.INSTANCE); + } + + protected void registerType(Type type) { + type.configure(dorisOptions); + for (String key : type.getRegistrationKeys()) { + final Type existing = typeRegistry.put(key, type); + if (existing != null) { + LOG.debug( + "Type replaced [{}]: {} -> {}", + key, + existing.getClass().getName(), + type.getClass().getName()); + } else { + LOG.debug("Type registered [{}]: {}", key, type.getClass().getName()); + } + } + } + + public Map<String, Type> getTypeRegistry() { + return typeRegistry; + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractDateType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractDateType.java new file mode 100644 index 0000000..de32cd6 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractDateType.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.doris.kafka.connector.converter.type; + +/** An abstract base class for all temporal date implementations of {@link Type}. */ +public abstract class AbstractDateType extends AbstractTemporalType {} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTemporalType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTemporalType.java new file mode 100644 index 0000000..4b50a79 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTemporalType.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.doris.kafka.connector.converter.type; + +import java.time.ZoneId; +import java.util.TimeZone; +import org.apache.doris.kafka.connector.cfg.DorisOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** An abstract base class for all temporal implementations of {@link Type}. */ +public abstract class AbstractTemporalType extends AbstractType { + + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractTemporalType.class); + + private TimeZone databaseTimeZone; + + @Override + public void configure(DorisOptions dorisOptions) { + final String databaseTimeZone = dorisOptions.getDatabaseTimeZone(); + try { + this.databaseTimeZone = TimeZone.getTimeZone(ZoneId.of(databaseTimeZone)); + } catch (Exception e) { + LOGGER.error( + "Failed to resolve time zone '{}', please specify a correct time zone value", + databaseTimeZone, + e); + throw e; + } + } + + protected TimeZone getDatabaseTimeZone() { + return databaseTimeZone; + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimeType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimeType.java new file mode 100644 index 0000000..533f1e1 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimeType.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.doris.kafka.connector.converter.type; + +/** An abstract temporal implementation of {@link Type} for {@code TIME} based columns. */ +public abstract class AbstractTimeType extends AbstractTemporalType {} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimestampType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimestampType.java new file mode 100644 index 0000000..3d50376 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimestampType.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.doris.kafka.connector.converter.type; + +/** An abstract temporal implementation of {@link Type} for {@code TIMESTAMP} based columns. */ +public abstract class AbstractTimestampType extends AbstractTemporalType {} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractType.java new file mode 100644 index 0000000..d915a89 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractType.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.doris.kafka.connector.converter.type; + +import org.apache.doris.kafka.connector.cfg.DorisOptions; + +/** An abstract implementation of {@link Type}, which all types should extend. */ +public abstract class AbstractType implements Type { + + @Override + public void configure(DorisOptions dorisOptions) {} + + @Override + public Object getValue(Object sourceValue) { + return sourceValue; + } + + @Override + public boolean isNumber() { + return false; + } + + @Override + public String toString() { + return getClass().getSimpleName(); + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/Type.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/Type.java new file mode 100644 index 0000000..c284f0e --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/Type.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.doris.kafka.connector.converter.type; + +import org.apache.doris.kafka.connector.cfg.DorisOptions; + +/** + * A type indicates the type of each column of kafka record, including various column types of + * debezium and connect. + */ +public interface Type { + + /** Allows a type to perform initialization/configuration tasks based on user configs. */ + void configure(DorisOptions dorisOptions); + + /** + * Returns the names that this type will be mapped as. + * + * <p>For example, when creating a custom mapping for {@code io.debezium.data.Bits}, a type + * could be registered using the {@code LOGICAL_NAME} of the schema if the type is to be used + * when a schema name is identified; otherwise it could be registered as the raw column type + * when column type propagation is enabled. + */ + String[] getRegistrationKeys(); + + /** Get the actual converted value based on the column type. */ + Object getValue(Object sourceValue); + + boolean isNumber(); +} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/AbstractConnectMapType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/AbstractConnectMapType.java new file mode 100644 index 0000000..53c64d1 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/AbstractConnectMapType.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.doris.kafka.connector.converter.type.connect; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.connect.errors.ConnectException; + +/** + * An implementation of {@link org.apache.doris.kafka.connector.converter.type.Type} for {@code MAP} + * schema types. + */ +public abstract class AbstractConnectMapType extends AbstractConnectSchemaType { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + @Override + public String[] getRegistrationKeys() { + return new String[] {"MAP"}; + } + + protected String mapToJsonString(Object value) { + try { + return MAPPER.writeValueAsString(value); + } catch (JsonProcessingException e) { + throw new ConnectException("Failed to deserialize MAP data to JSON", e); + } + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/AbstractConnectSchemaType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/AbstractConnectSchemaType.java new file mode 100644 index 0000000..d544f75 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/AbstractConnectSchemaType.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.doris.kafka.connector.converter.type.connect; + +import org.apache.doris.kafka.connector.converter.type.AbstractType; + +/** + * An abstract implementation of {@link org.apache.doris.kafka.connector.converter.type.Type} that + * all Kafka Connect based schema types should be derived. + * + * <p>This abstract implementation is used as a marker object to designate types that are operating + * on the raw {@link org.apache.kafka.connect.data.Schema.Type} values rather than custom schema + * types that are contributed by Kafka Connect, such as Date or Time, or other third parties such as + * Debezium. + */ +public abstract class AbstractConnectSchemaType extends AbstractType {} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBooleanType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBooleanType.java new file mode 100644 index 0000000..18c5af3 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBooleanType.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.doris.kafka.connector.converter.type.connect; + +public class ConnectBooleanType extends AbstractConnectSchemaType { + + public static final ConnectBooleanType INSTANCE = new ConnectBooleanType(); + + @Override + public String[] getRegistrationKeys() { + return new String[] {"BOOLEAN"}; + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBytesType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBytesType.java new file mode 100644 index 0000000..6c2701c --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBytesType.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.doris.kafka.connector.converter.type.connect; + +import java.nio.ByteBuffer; + +public class ConnectBytesType extends AbstractConnectSchemaType { + + public static final ConnectBytesType INSTANCE = new ConnectBytesType(); + + @Override + public String[] getRegistrationKeys() { + return new String[] {"BYTES"}; + } + + @Override + public Object getValue(Object sourceValue) { + if (sourceValue == null) { + return null; + } + return bytesToHexString(getByteArrayFromValue(sourceValue)); + } + + private byte[] getByteArrayFromValue(Object value) { + byte[] byteArray = null; + if (value instanceof ByteBuffer) { + final ByteBuffer buffer = ((ByteBuffer) value).slice(); + byteArray = new byte[buffer.remaining()]; + buffer.get(byteArray); + } else if (value instanceof byte[]) { + byteArray = (byte[]) value; + } + return byteArray; + } + + /** Convert hexadecimal byte array to string */ + private String bytesToHexString(byte[] bytes) { + StringBuilder sb = new StringBuilder(); + for (byte b : bytes) { + sb.append(String.format("%02X", b)); + } + return sb.toString(); + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDateType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDateType.java new file mode 100644 index 0000000..4106f8d --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDateType.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.doris.kafka.connector.converter.type.connect; + +import org.apache.doris.kafka.connector.converter.type.AbstractDateType; +import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils; +import org.apache.kafka.connect.data.Date; +import org.apache.kafka.connect.errors.ConnectException; + +public class ConnectDateType extends AbstractDateType { + + public static final ConnectDateType INSTANCE = new ConnectDateType(); + + @Override + public String[] getRegistrationKeys() { + return new String[] {Date.LOGICAL_NAME}; + } + + @Override + public Object getValue(Object sourceValue) { + if (sourceValue == null) { + return null; + } + if (sourceValue instanceof java.util.Date) { + return DateTimeUtils.toLocalDateFromDate((java.util.Date) sourceValue); + } + throw new ConnectException( + String.format( + "Unexpected %s value '%s' with type '%s'", + getClass().getSimpleName(), sourceValue, sourceValue.getClass().getName())); + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDecimalType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDecimalType.java new file mode 100644 index 0000000..8625883 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDecimalType.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.doris.kafka.connector.converter.type.connect; + +import org.apache.doris.kafka.connector.converter.type.AbstractType; +import org.apache.kafka.connect.data.Decimal; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConnectDecimalType extends AbstractType { + + private static final Logger LOGGER = LoggerFactory.getLogger(ConnectDecimalType.class); + + public static final ConnectDecimalType INSTANCE = new ConnectDecimalType(); + + @Override + public String[] getRegistrationKeys() { + return new String[] {Decimal.LOGICAL_NAME}; + } + + @Override + public boolean isNumber() { + return true; + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat32Type.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat32Type.java new file mode 100644 index 0000000..98b6936 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat32Type.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.doris.kafka.connector.converter.type.connect; + +public class ConnectFloat32Type extends AbstractConnectSchemaType { + + public static final ConnectFloat32Type INSTANCE = new ConnectFloat32Type(); + + @Override + public String[] getRegistrationKeys() { + return new String[] {"FLOAT32"}; + } + + @Override + public boolean isNumber() { + return true; + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat64Type.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat64Type.java new file mode 100644 index 0000000..f050c15 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat64Type.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.doris.kafka.connector.converter.type.connect; + +public class ConnectFloat64Type extends AbstractConnectSchemaType { + + public static final ConnectFloat64Type INSTANCE = new ConnectFloat64Type(); + + @Override + public String[] getRegistrationKeys() { + return new String[] {"FLOAT64"}; + } + + @Override + public boolean isNumber() { + return true; + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt16Type.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt16Type.java new file mode 100644 index 0000000..573813b --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt16Type.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.doris.kafka.connector.converter.type.connect; + +public class ConnectInt16Type extends AbstractConnectSchemaType { + + public static final ConnectInt16Type INSTANCE = new ConnectInt16Type(); + + @Override + public String[] getRegistrationKeys() { + return new String[] {"INT16"}; + } + + @Override + public boolean isNumber() { + return true; + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt32Type.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt32Type.java new file mode 100644 index 0000000..50dd6c7 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt32Type.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.doris.kafka.connector.converter.type.connect; + +public class ConnectInt32Type extends AbstractConnectSchemaType { + + public static final ConnectInt32Type INSTANCE = new ConnectInt32Type(); + + @Override + public String[] getRegistrationKeys() { + return new String[] {"INT32"}; + } + + @Override + public boolean isNumber() { + return true; + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt64Type.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt64Type.java new file mode 100644 index 0000000..c08abb6 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt64Type.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.doris.kafka.connector.converter.type.connect; + +public class ConnectInt64Type extends AbstractConnectSchemaType { + + public static final ConnectInt64Type INSTANCE = new ConnectInt64Type(); + + @Override + public String[] getRegistrationKeys() { + return new String[] {"INT64"}; + } + + @Override + public boolean isNumber() { + return true; + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt8Type.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt8Type.java new file mode 100644 index 0000000..55c82cf --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt8Type.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.doris.kafka.connector.converter.type.connect; + +public class ConnectInt8Type extends AbstractConnectSchemaType { + + public static final ConnectInt8Type INSTANCE = new ConnectInt8Type(); + + @Override + public String[] getRegistrationKeys() { + return new String[] {"INT8"}; + } + + @Override + public boolean isNumber() { + return true; + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectMapToConnectStringType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectMapToConnectStringType.java new file mode 100644 index 0000000..cac2624 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectMapToConnectStringType.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.doris.kafka.connector.converter.type.connect; + +import java.util.Map; + +public class ConnectMapToConnectStringType extends AbstractConnectMapType { + + public static final ConnectMapToConnectStringType INSTANCE = + new ConnectMapToConnectStringType(); + + @Override + public Object getValue(Object sourceValue) { + if (sourceValue instanceof Map) { + sourceValue = mapToJsonString(sourceValue); + } + return ConnectStringType.INSTANCE.getValue(sourceValue); + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectStringType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectStringType.java new file mode 100644 index 0000000..0353020 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectStringType.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.doris.kafka.connector.converter.type.connect; + +/** + * An implementation of {@link org.apache.doris.kafka.connector.converter.type.Type} that supports + * {@code STRING} connect schema types. + */ +public class ConnectStringType extends AbstractConnectSchemaType { + + public static final ConnectStringType INSTANCE = new ConnectStringType(); + + @Override + public String[] getRegistrationKeys() { + return new String[] {"STRING"}; + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimeType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimeType.java new file mode 100644 index 0000000..de3be44 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimeType.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.doris.kafka.connector.converter.type.connect; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.Date; +import org.apache.doris.kafka.connector.converter.type.AbstractTimeType; +import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils; +import org.apache.kafka.connect.data.Time; +import org.apache.kafka.connect.errors.ConnectException; + +public class ConnectTimeType extends AbstractTimeType { + + public static final ConnectTimeType INSTANCE = new ConnectTimeType(); + + @Override + public String[] getRegistrationKeys() { + return new String[] {Time.LOGICAL_NAME}; + } + + @Override + public Object getValue(Object sourceValue) { + if (sourceValue == null) { + return null; + } + if (sourceValue instanceof Date) { + + final LocalTime localTime = DateTimeUtils.toLocalTimeFromUtcDate((Date) sourceValue); + final LocalDateTime localDateTime = localTime.atDate(LocalDate.now()); + return localDateTime.toLocalTime(); + } + + throw new ConnectException( + String.format( + "Unexpected %s value '%s' with type '%s'", + getClass().getSimpleName(), sourceValue, sourceValue.getClass().getName())); + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimestampType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimestampType.java new file mode 100644 index 0000000..8af71b9 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimestampType.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.doris.kafka.connector.converter.type.connect; + +import org.apache.doris.kafka.connector.converter.type.AbstractTimestampType; +import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils; +import org.apache.kafka.connect.data.Timestamp; +import org.apache.kafka.connect.errors.ConnectException; + +public class ConnectTimestampType extends AbstractTimestampType { + + public static final ConnectTimestampType INSTANCE = new ConnectTimestampType(); + + @Override + public String[] getRegistrationKeys() { + return new String[] {Timestamp.LOGICAL_NAME}; + } + + @Override + public Object getValue(Object sourceValue) { + if (sourceValue == null) { + return null; + } + if (sourceValue instanceof java.util.Date) { + return DateTimeUtils.toLocalDateTimeFromDate((java.util.Date) sourceValue); + } + + throw new ConnectException( + String.format( + "Unexpected %s value '%s' with type '%s'", + getClass().getSimpleName(), sourceValue, sourceValue.getClass().getName())); + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/AbstractDebeziumTimeType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/AbstractDebeziumTimeType.java new file mode 100644 index 0000000..c0140b8 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/AbstractDebeziumTimeType.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.doris.kafka.connector.converter.type.debezium; + +import java.time.LocalDate; +import java.time.LocalTime; +import org.apache.doris.kafka.connector.converter.type.AbstractTimeType; +import org.apache.kafka.connect.errors.ConnectException; + +public abstract class AbstractDebeziumTimeType extends AbstractTimeType { + + @Override + public Object getValue(Object sourceValue) { + if (sourceValue == null) { + return null; + } + if (sourceValue instanceof Number) { + final LocalTime localTime = getLocalTime((Number) sourceValue); + return localTime.atDate(LocalDate.now()); + } + throw new ConnectException( + String.format( + "Unexpected %s value '%s' with type '%s'", + getClass().getSimpleName(), sourceValue, sourceValue.getClass().getName())); + } + + protected abstract LocalTime getLocalTime(Number value); +} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/AbstractDebeziumTimestampType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/AbstractDebeziumTimestampType.java new file mode 100644 index 0000000..0d17dd1 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/AbstractDebeziumTimestampType.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.doris.kafka.connector.converter.type.debezium; + +import java.time.LocalDateTime; +import org.apache.doris.kafka.connector.converter.type.AbstractTimestampType; +import org.apache.kafka.connect.errors.ConnectException; + +public abstract class AbstractDebeziumTimestampType extends AbstractTimestampType { + + @Override + public Object getValue(Object sourceValue) { + if (sourceValue == null) { + return null; + } + if (sourceValue instanceof Number) { + return getLocalDateTime(((Number) sourceValue).longValue()); + } + throw new ConnectException( + String.format( + "Unexpected %s value '%s' with type '%s'", + getClass().getSimpleName(), sourceValue, sourceValue.getClass().getName())); + } + + protected abstract LocalDateTime getLocalDateTime(long value); +} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/DateType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/DateType.java new file mode 100644 index 0000000..912f0a4 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/DateType.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.doris.kafka.connector.converter.type.debezium; + +import io.debezium.time.Date; +import org.apache.doris.kafka.connector.converter.type.AbstractDateType; +import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils; +import org.apache.kafka.connect.errors.ConnectException; + +public class DateType extends AbstractDateType { + + public static final DateType INSTANCE = new DateType(); + + @Override + public String[] getRegistrationKeys() { + return new String[] {Date.SCHEMA_NAME}; + } + + @Override + public Object getValue(Object sourceValue) { + if (sourceValue == null) { + return null; + } + if (sourceValue instanceof Number) { + return DateTimeUtils.toLocalDateOfEpochDays(((Number) sourceValue).longValue()); + } + + throw new ConnectException( + String.format( + "Unexpected %s value '%s' with type '%s'", + getClass().getSimpleName(), sourceValue, sourceValue.getClass().getName())); + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimeType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimeType.java new file mode 100644 index 0000000..36eeceb --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimeType.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.doris.kafka.connector.converter.type.debezium; + +import io.debezium.time.MicroTime; +import java.time.LocalTime; +import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils; + +public class MicroTimeType extends AbstractDebeziumTimeType { + + public static final MicroTimeType INSTANCE = new MicroTimeType(); + + @Override + public String[] getRegistrationKeys() { + return new String[] {MicroTime.SCHEMA_NAME}; + } + + @Override + protected LocalTime getLocalTime(Number value) { + return DateTimeUtils.toLocalTimeFromDurationMicroseconds(value.longValue()); + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimestampType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimestampType.java new file mode 100644 index 0000000..b8c71a2 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimestampType.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.doris.kafka.connector.converter.type.debezium; + +import io.debezium.time.MicroTimestamp; +import java.time.LocalDateTime; +import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils; + +public class MicroTimestampType extends AbstractDebeziumTimestampType { + + public static final MicroTimestampType INSTANCE = new MicroTimestampType(); + + @Override + public String[] getRegistrationKeys() { + return new String[] {MicroTimestamp.SCHEMA_NAME}; + } + + @Override + protected LocalDateTime getLocalDateTime(long value) { + return DateTimeUtils.toLocalDateTimeFromInstantEpochMicros(value); + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimeType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimeType.java new file mode 100644 index 0000000..9519e64 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimeType.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.doris.kafka.connector.converter.type.debezium; + +import io.debezium.time.NanoTime; +import java.time.LocalTime; +import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils; + +public class NanoTimeType extends AbstractDebeziumTimeType { + + public static final NanoTimeType INSTANCE = new NanoTimeType(); + + @Override + public String[] getRegistrationKeys() { + return new String[] {NanoTime.SCHEMA_NAME}; + } + + @Override + protected LocalTime getLocalTime(Number value) { + return DateTimeUtils.toLocalTimeFromDurationNanoseconds(value.longValue()); + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimestampType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimestampType.java new file mode 100644 index 0000000..eec06c8 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimestampType.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.doris.kafka.connector.converter.type.debezium; + +import io.debezium.time.MicroTimestamp; +import io.debezium.time.NanoTimestamp; +import java.time.LocalDateTime; +import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils; + +/** + * An implementation of {@link org.apache.doris.kafka.connector.converter.type.Type} for {@link + * MicroTimestamp} values. + */ +public class NanoTimestampType extends AbstractDebeziumTimestampType { + + public static final NanoTimestampType INSTANCE = new NanoTimestampType(); + + @Override + public String[] getRegistrationKeys() { + return new String[] {NanoTimestamp.SCHEMA_NAME}; + } + + @Override + protected LocalDateTime getLocalDateTime(long value) { + return DateTimeUtils.toLocalDateTimeFromInstantEpochNanos(value); + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/TimeType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/TimeType.java new file mode 100644 index 0000000..83e95d9 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/TimeType.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.doris.kafka.connector.converter.type.debezium; + +import io.debezium.time.Time; +import java.time.LocalTime; +import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils; + +public class TimeType extends AbstractDebeziumTimeType { + + public static final TimeType INSTANCE = new TimeType(); + + @Override + public String[] getRegistrationKeys() { + return new String[] {Time.SCHEMA_NAME}; + } + + @Override + protected LocalTime getLocalTime(Number value) { + return DateTimeUtils.toLocalTimeFromDurationMilliseconds(value.longValue()); + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/TimestampType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/TimestampType.java new file mode 100644 index 0000000..e5547b1 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/TimestampType.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.doris.kafka.connector.converter.type.debezium; + +import io.debezium.time.Timestamp; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; + +public class TimestampType extends AbstractDebeziumTimestampType { + + public static final TimestampType INSTANCE = new TimestampType(); + + @Override + public String[] getRegistrationKeys() { + return new String[] {Timestamp.SCHEMA_NAME}; + } + + @Override + protected LocalDateTime getLocalDateTime(long value) { + return LocalDateTime.ofInstant(Instant.ofEpochMilli(value), ZoneOffset.UTC); + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/VariableScaleDecimalType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/VariableScaleDecimalType.java new file mode 100644 index 0000000..e38fe41 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/VariableScaleDecimalType.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.doris.kafka.connector.converter.type.debezium; + +import io.debezium.data.VariableScaleDecimal; +import java.math.BigDecimal; +import java.util.Optional; +import org.apache.doris.kafka.connector.converter.type.AbstractType; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.ConnectException; + +public class VariableScaleDecimalType extends AbstractType { + + public static final VariableScaleDecimalType INSTANCE = new VariableScaleDecimalType(); + + @Override + public String[] getRegistrationKeys() { + return new String[] {VariableScaleDecimal.LOGICAL_NAME}; + } + + @Override + public Object getValue(Object sourceValue) { + if (sourceValue == null) { + return null; + } + if (sourceValue instanceof Struct) { + Optional<BigDecimal> bigDecimalValue = + VariableScaleDecimal.toLogical((Struct) sourceValue).getDecimalValue(); + return bigDecimalValue.get(); + } + + throw new ConnectException( + String.format( + "Unexpected %s value '%s' with type '%s'", + getClass().getSimpleName(), sourceValue, sourceValue.getClass().getName())); + } + + @Override + public boolean isNumber() { + return true; + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/ZonedTimeType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/ZonedTimeType.java new file mode 100644 index 0000000..c3528d9 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/ZonedTimeType.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.doris.kafka.connector.converter.type.debezium; + +import io.debezium.time.ZonedTime; +import java.time.LocalDate; +import java.time.OffsetTime; +import java.time.ZonedDateTime; +import org.apache.doris.kafka.connector.converter.type.AbstractTimeType; +import org.apache.kafka.connect.errors.ConnectException; + +public class ZonedTimeType extends AbstractTimeType { + + public static final ZonedTimeType INSTANCE = new ZonedTimeType(); + + @Override + public String[] getRegistrationKeys() { + return new String[] {ZonedTime.SCHEMA_NAME}; + } + + @Override + public Object getValue(Object sourceValue) { + if (sourceValue == null) { + return null; + } + if (sourceValue instanceof String) { + final ZonedDateTime zdt = + OffsetTime.parse((String) sourceValue, ZonedTime.FORMATTER) + .atDate(LocalDate.now()) + .toZonedDateTime(); + return zdt.toOffsetDateTime(); + } + + throw new ConnectException( + String.format( + "Unexpected %s value '%s' with type '%s'", + getClass().getSimpleName(), sourceValue, sourceValue.getClass().getName())); + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/ZonedTimestampType.java b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/ZonedTimestampType.java new file mode 100644 index 0000000..b03a400 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/ZonedTimestampType.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.doris.kafka.connector.converter.type.debezium; + +import io.debezium.time.ZonedTimestamp; +import java.time.ZonedDateTime; +import org.apache.doris.kafka.connector.converter.type.AbstractTimestampType; +import org.apache.kafka.connect.errors.ConnectException; + +public class ZonedTimestampType extends AbstractTimestampType { + + public static final ZonedTimestampType INSTANCE = new ZonedTimestampType(); + + @Override + public String[] getRegistrationKeys() { + return new String[] {ZonedTimestamp.SCHEMA_NAME}; + } + + @Override + public Object getValue(Object sourceValue) { + if (sourceValue == null) { + return null; + } + if (sourceValue instanceof String) { + final ZonedDateTime zdt = + ZonedDateTime.parse((String) sourceValue, ZonedTimestamp.FORMATTER) + .withZoneSameInstant(getDatabaseTimeZone().toZoneId()); + return zdt.toOffsetDateTime(); + } + + throw new ConnectException( + String.format( + "Unexpected %s value '%s' with type '%s'", + getClass().getSimpleName(), sourceValue, sourceValue.getClass().getName())); + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/utils/DateTimeUtils.java b/src/main/java/org/apache/doris/kafka/connector/converter/utils/DateTimeUtils.java new file mode 100644 index 0000000..09ee9d0 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/converter/utils/DateTimeUtils.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.doris.kafka.connector.converter.utils; + +import io.debezium.time.Conversions; +import java.sql.Timestamp; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.time.temporal.ChronoUnit; +import java.util.Date; +import java.util.TimeZone; +import java.util.concurrent.TimeUnit; + +public class DateTimeUtils { + + private DateTimeUtils() {} + + public static Instant toInstantFromNanos(long epochNanos) { + final long epochSeconds = TimeUnit.NANOSECONDS.toSeconds(epochNanos); + final long adjustment = + TimeUnit.NANOSECONDS.toNanos(epochNanos % TimeUnit.SECONDS.toNanos(1)); + return Instant.ofEpochSecond(epochSeconds, adjustment); + } + + public static ZonedDateTime toZonedDateTimeFromDate(Date date, TimeZone timeZone) { + return toZonedDateTimeFromDate(date, timeZone.toZoneId()); + } + + public static ZonedDateTime toZonedDateTimeFromDate(Date date, ZoneId zoneId) { + return date.toInstant().atZone(zoneId); + } + + public static ZonedDateTime toZonedDateTimeFromInstantEpochMicros(long epochMicros) { + return Conversions.toInstantFromMicros(epochMicros).atZone(ZoneOffset.UTC); + } + + public static ZonedDateTime toZonedDateTimeFromInstantEpochNanos(long epochNanos) { + return ZonedDateTime.ofInstant(toInstantFromNanos(epochNanos), ZoneOffset.UTC); + } + + public static LocalDate toLocalDateOfEpochDays(long epochDays) { + return LocalDate.ofEpochDay(epochDays); + } + + public static LocalDate toLocalDateFromDate(Date date) { + return toLocalDateFromInstantEpochMillis(date.getTime()); + } + + public static LocalDate toLocalDateFromInstantEpochMillis(long epochMillis) { + return LocalDate.ofEpochDay(Duration.ofMillis(epochMillis).toDays()); + } + + public static LocalTime toLocalTimeFromDurationMilliseconds(long durationMillis) { + return LocalTime.ofNanoOfDay(Duration.of(durationMillis, ChronoUnit.MILLIS).toNanos()); + } + + public static LocalTime toLocalTimeFromDurationMicroseconds(long durationMicros) { + return LocalTime.ofNanoOfDay(Duration.of(durationMicros, ChronoUnit.MICROS).toNanos()); + } + + public static LocalTime toLocalTimeFromDurationNanoseconds(long durationNanos) { + return LocalTime.ofNanoOfDay(Duration.of(durationNanos, ChronoUnit.NANOS).toNanos()); + } + + public static LocalTime toLocalTimeFromUtcDate(Date date) { + return date.toInstant().atOffset(ZoneOffset.UTC).toLocalTime(); + } + + public static LocalDateTime toLocalDateTimeFromDate(Date date) { + return toLocalDateTimeFromInstantEpochMillis(date.getTime()); + } + + public static LocalDateTime toLocalDateTimeFromInstantEpochMillis(long epochMillis) { + return LocalDateTime.ofInstant( + Conversions.toInstantFromMillis(epochMillis), ZoneOffset.UTC); + } + + public static LocalDateTime toLocalDateTimeFromInstantEpochMicros(long epochMicros) { + return LocalDateTime.ofInstant( + Conversions.toInstantFromMicros(epochMicros), ZoneOffset.UTC); + } + + public static LocalDateTime toLocalDateTimeFromInstantEpochNanos(long epochNanos) { + return LocalDateTime.ofInstant(toInstantFromNanos(epochNanos), ZoneOffset.UTC); + } + + public static Timestamp toTimestampFromMillis(long epochMilliseconds) { + final Instant instant = Conversions.toInstantFromMillis(epochMilliseconds); + final Timestamp ts = new Timestamp(instant.toEpochMilli()); + ts.setNanos(instant.getNano()); + return ts; + } +} diff --git a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java index 0804980..498feb0 100644 --- a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java +++ b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java @@ -59,6 +59,44 @@ public class TestRecordService { jsonConverter.configure(config, false); } + /** + * The mysql table schema is as follows. + * + * <p>CREATE TABLE example_table ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(50), age INT, + * email VARCHAR(100), birth_date DATE, integer_column INT, float_column FLOAT, decimal_column + * DECIMAL(10,2), datetime_column DATETIME, date_column DATE, time_column TIME, text_column + * TEXT, varchar_column VARCHAR(255), binary_column BINARY(10), blob_column BLOB, is_active + * TINYINT(1) ); + */ + @Test + public void processMysqlDebeziumStructRecord() throws IOException { + String topic = "normal.wdl_test.example_table"; + // no delete value + String noDeleteValue = + "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"age\"},{\"type\":\"string\",\"optional\":true,\"field\":\"email\"},{\"type\":\"int32\",\"optional\":true,\"name\":\"io.debezium.time.Date\",\"version\":1,\"field\":\"birth_date\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"i [...] + String expectedNoDeleteValue = + "{\"id\":8,\"name\":\"Jfohn Doe\",\"age\":430,\"email\":\"j...@example.com\",\"birth_date\":\"1994-05-20\",\"integer_column\":12323,\"float_column\":45.67,\"decimal_column\":123.45,\"datetime_column\":\"2024-04-12T10:30\",\"date_column\":\"2024-04-12\",\"time_column\":\"2024-04-12T10:30\",\"text_column\":\"Lorem ipsum dolor sit amet, consectetur adipiscing elit.\",\"varchar_column\":null,\"binary_column\":\"1234567890ABCDEF0000\",\"blob_column\":null,\"is_active\":2,\"__D [...] + buildProcessStructRecord(topic, noDeleteValue, expectedNoDeleteValue); + + // delete value + String deleteValue = + "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"age\"},{\"type\":\"string\",\"optional\":true,\"field\":\"email\"},{\"type\":\"int32\",\"optional\":true,\"name\":\"io.debezium.time.Date\",\"version\":1,\"field\":\"birth_date\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"i [...] + String expectedDeleteValue = + "{\"id\":8,\"name\":\"Jfohn Doe\",\"age\":430,\"email\":\"j...@example.com\",\"birth_date\":\"1994-05-20\",\"integer_column\":12323,\"float_column\":45.67,\"decimal_column\":123.45,\"datetime_column\":\"2024-04-12T10:30\",\"date_column\":\"2024-04-12\",\"time_column\":\"2024-04-12T10:30\",\"text_column\":\"Lorem ipsum dolor sit amet, consectetur adipiscing elit.\",\"varchar_column\":null,\"binary_column\":\"1234567890ABCDEF0000\",\"blob_column\":null,\"is_active\":2,\"__D [...] + buildProcessStructRecord(topic, deleteValue, expectedDeleteValue); + } + + private void buildProcessStructRecord(String topic, String sourceValue, String target) + throws IOException { + SchemaAndValue noDeleteSchemaValue = + jsonConverter.toConnectData(topic, sourceValue.getBytes(StandardCharsets.UTF_8)); + SinkRecord noDeleteSinkRecord = + TestRecordBuffer.newSinkRecord( + noDeleteSchemaValue.value(), 8, noDeleteSchemaValue.schema()); + String processResult = recordService.processStructRecord(noDeleteSinkRecord); + Assert.assertEquals(target, processResult); + } + @Test public void processStructRecord() throws IOException { props.remove("converter.mode"); @@ -68,56 +106,27 @@ public class TestRecordService { // no delete value String noDeleteValue = "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wdl_test.test_sink_normal.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wd [...] - SchemaAndValue noDeleteSchemaValue = - jsonConverter.toConnectData(topic, noDeleteValue.getBytes(StandardCharsets.UTF_8)); - SinkRecord noDeleteSinkRecord = - TestRecordBuffer.newSinkRecord( - noDeleteSchemaValue.value(), 8, noDeleteSchemaValue.schema()); - String noDeleteResult = recordService.processStructRecord(noDeleteSinkRecord); - Assert.assertEquals( - "{\"before\":null,\"after\":{\"id\":19,\"name\":\"fff\"},\"source\":{\"version\":\"2.5.4.Final\",\"connector\":\"mysql\",\"name\":\"normal\",\"ts_ms\":1712543697000,\"snapshot\":\"false\",\"db\":\"wdl_test\",\"sequence\":null,\"table\":\"test_sink_normal\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000061\",\"pos\":5320,\"row\":0,\"thread\":260,\"query\":null},\"op\":\"c\",\"ts_ms\":1712543697062,\"transaction\":null}", - noDeleteResult); - - // delete value - String deleteValue = - "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wdl_test.test_sink_normal.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wd [...] - SchemaAndValue deleteSchemaValue = - jsonConverter.toConnectData(topic, deleteValue.getBytes(StandardCharsets.UTF_8)); - SinkRecord record2 = - TestRecordBuffer.newSinkRecord( - deleteSchemaValue.value(), 1, deleteSchemaValue.schema()); - String s2 = recordService.processStructRecord(record2); - Assert.assertEquals( - "{\"before\":{\"id\":24,\"name\":\"bb\"},\"after\":null,\"source\":{\"version\":\"2.5.4.Final\",\"connector\":\"mysql\",\"name\":\"normal\",\"ts_ms\":1712545844000,\"snapshot\":\"false\",\"db\":\"wdl_test\",\"sequence\":null,\"table\":\"test_sink_normal\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000061\",\"pos\":5627,\"row\":0,\"thread\":260,\"query\":null},\"op\":\"d\",\"ts_ms\":1712545844948,\"transaction\":null}", - s2); + String expectedNoDeleteValue = + "{\"before\":null,\"after\":{\"id\":19,\"name\":\"fff\"},\"source\":{\"version\":\"2.5.4.Final\",\"connector\":\"mysql\",\"name\":\"normal\",\"ts_ms\":1712543697000,\"snapshot\":\"false\",\"db\":\"wdl_test\",\"sequence\":null,\"table\":\"test_sink_normal\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000061\",\"pos\":5320,\"row\":0,\"thread\":260,\"query\":null},\"op\":\"c\",\"ts_ms\":1712543697062,\"transaction\":null}"; + buildProcessStructRecord(topic, noDeleteValue, expectedNoDeleteValue); } @Test - public void processStructRecordWithDebeziumSchema() { + public void processStructRecordWithDebeziumSchema() throws IOException { String topic = "normal.wdl_test.test_sink_normal"; // no delete value String noDeleteValue = "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wdl_test.test_sink_normal.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wd [...] - SchemaAndValue noDeleteSchemaValue = - jsonConverter.toConnectData(topic, noDeleteValue.getBytes(StandardCharsets.UTF_8)); - SinkRecord noDeleteSinkRecord = - TestRecordBuffer.newSinkRecord( - noDeleteSchemaValue.value(), 8, noDeleteSchemaValue.schema()); - String noDeleteResult = recordService.processStructRecord(noDeleteSinkRecord); - Assert.assertEquals( - "{\"id\":19,\"name\":\"fff\",\"__DORIS_DELETE_SIGN__\":\"0\"}", noDeleteResult); + String expectedNoDeleteValue = + "{\"id\":19,\"name\":\"fff\",\"__DORIS_DELETE_SIGN__\":\"0\"}"; + buildProcessStructRecord(topic, noDeleteValue, expectedNoDeleteValue); // delete value String deleteValue = "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wdl_test.test_sink_normal.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wd [...] - SchemaAndValue deleteSchemaValue = - jsonConverter.toConnectData(topic, deleteValue.getBytes(StandardCharsets.UTF_8)); - SinkRecord record2 = - TestRecordBuffer.newSinkRecord( - deleteSchemaValue.value(), 1, deleteSchemaValue.schema()); - String s2 = recordService.processStructRecord(record2); - Assert.assertEquals("{\"id\":24,\"name\":\"bb\",\"__DORIS_DELETE_SIGN__\":\"1\"}", s2); + String expectedDeleteValue = "{\"id\":24,\"name\":\"bb\",\"__DORIS_DELETE_SIGN__\":\"1\"}"; + buildProcessStructRecord(topic, deleteValue, expectedDeleteValue); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org