This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 3589528  [Feature]Support kafka type converter (#12)
3589528 is described below

commit 3589528b5052f159aaa27b9efaf55fc28884bd87
Author: wudongliang <46414265+donglian...@users.noreply.github.com>
AuthorDate: Mon Apr 15 10:06:56 2024 +0800

    [Feature]Support kafka type converter (#12)
---
 pom.xml                                            |   6 +
 .../doris/kafka/connector/cfg/DorisOptions.java    |   9 +
 .../connector/cfg/DorisSinkConnectorConfig.java    |   2 +
 .../connector/converter/RecordDescriptor.java      | 225 +++++++++++++++++++++
 .../kafka/connector/converter/RecordService.java   | 107 +++++++---
 .../connector/converter/RecordTypeRegister.java    | 113 +++++++++++
 .../connector/converter/type/AbstractDateType.java |  22 ++
 .../converter/type/AbstractTemporalType.java       |  51 +++++
 .../connector/converter/type/AbstractTimeType.java |  22 ++
 .../converter/type/AbstractTimestampType.java      |  22 ++
 .../connector/converter/type/AbstractType.java     |  43 ++++
 .../doris/kafka/connector/converter/type/Type.java |  46 +++++
 .../type/connect/AbstractConnectMapType.java       |  45 +++++
 .../type/connect/AbstractConnectSchemaType.java    |  32 +++
 .../converter/type/connect/ConnectBooleanType.java |  29 +++
 .../converter/type/connect/ConnectBytesType.java   |  60 ++++++
 .../converter/type/connect/ConnectDateType.java    |  48 +++++
 .../converter/type/connect/ConnectDecimalType.java |  41 ++++
 .../converter/type/connect/ConnectFloat32Type.java |  34 ++++
 .../converter/type/connect/ConnectFloat64Type.java |  34 ++++
 .../converter/type/connect/ConnectInt16Type.java   |  34 ++++
 .../converter/type/connect/ConnectInt32Type.java   |  34 ++++
 .../converter/type/connect/ConnectInt64Type.java   |  34 ++++
 .../converter/type/connect/ConnectInt8Type.java    |  34 ++++
 .../connect/ConnectMapToConnectStringType.java     |  35 ++++
 .../converter/type/connect/ConnectStringType.java  |  33 +++
 .../converter/type/connect/ConnectTimeType.java    |  56 +++++
 .../type/connect/ConnectTimestampType.java         |  49 +++++
 .../type/debezium/AbstractDebeziumTimeType.java    |  44 ++++
 .../debezium/AbstractDebeziumTimestampType.java    |  42 ++++
 .../converter/type/debezium/DateType.java          |  49 +++++
 .../converter/type/debezium/MicroTimeType.java     |  38 ++++
 .../type/debezium/MicroTimestampType.java          |  38 ++++
 .../converter/type/debezium/NanoTimeType.java      |  38 ++++
 .../converter/type/debezium/NanoTimestampType.java |  43 ++++
 .../converter/type/debezium/TimeType.java          |  38 ++++
 .../converter/type/debezium/TimestampType.java     |  39 ++++
 .../type/debezium/VariableScaleDecimalType.java    |  58 ++++++
 .../converter/type/debezium/ZonedTimeType.java     |  55 +++++
 .../type/debezium/ZonedTimestampType.java          |  52 +++++
 .../connector/converter/utils/DateTimeUtils.java   | 115 +++++++++++
 .../connector/converter/TestRecordService.java     |  85 ++++----
 42 files changed, 1973 insertions(+), 61 deletions(-)

diff --git a/pom.xml b/pom.xml
index 6f8dbab..2ceea3c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -250,6 +250,12 @@
             <artifactId>kafka-connect-avro-converter</artifactId>
             <version>${confluent.version}</version>
         </dependency>
+        <!-- https://mvnrepository.com/artifact/io.debezium/debezium-core -->
+        <dependency>
+            <groupId>io.debezium</groupId>
+            <artifactId>debezium-core</artifactId>
+            <version>1.9.8.Final</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java 
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
index c47ea1f..6183869 100644
--- a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
+++ b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisOptions.java
@@ -58,6 +58,7 @@ public class DorisOptions {
     private final Properties streamLoadProp = new Properties();
 
     private String labelPrefix;
+    private String databaseTimeZone;
     private LoadModel loadModel;
     private DeliveryGuarantee deliveryGuarantee;
     private ConverterMode converterMode;
@@ -71,6 +72,10 @@ public class DorisOptions {
         this.password = config.get(DorisSinkConnectorConfig.DORIS_PASSWORD);
         this.database = config.get(DorisSinkConnectorConfig.DORIS_DATABASE);
         this.taskId = Integer.parseInt(config.get(ConfigCheckUtils.TASK_ID));
+        this.databaseTimeZone = 
DorisSinkConnectorConfig.DATABASE_TIME_ZONE_DEFAULT;
+        if (config.containsKey(DorisSinkConnectorConfig.DATABASE_TIME_ZONE)) {
+            this.databaseTimeZone = 
config.get(DorisSinkConnectorConfig.DATABASE_TIME_ZONE);
+        }
         this.loadModel =
                 LoadModel.of(
                         config.getOrDefault(
@@ -275,6 +280,10 @@ public class DorisOptions {
         return autoRedirect;
     }
 
+    public String getDatabaseTimeZone() {
+        return databaseTimeZone;
+    }
+
     public boolean isEnableDelete() {
         return enableDelete;
     }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
 
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
index 5ca55ef..58b6a62 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java
@@ -69,6 +69,8 @@ public class DorisSinkConnectorConfig {
     public static final String REQUEST_CONNECT_TIMEOUT_MS = 
"request.connect.timeout.ms";
     public static final Integer DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT = 30 * 
1000;
     public static final Integer DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT = 30 
* 1000;
+    public static final String DATABASE_TIME_ZONE = "database.time_zone";
+    public static final String DATABASE_TIME_ZONE_DEFAULT = "UTC";
     public static final String LOAD_MODEL = "load.model";
     public static final String LOAD_MODEL_DEFAULT = 
LoadModel.STREAM_LOAD.name();
     public static final String AUTO_REDIRECT = "auto.redirect";
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java
new file mode 100644
index 0000000..66a7c21
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/RecordDescriptor.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+public class RecordDescriptor {
+    private final SinkRecord record;
+    private final String topicName;
+    private final List<String> keyFieldNames;
+    private final List<String> nonKeyFieldNames;
+    private final Map<String, FieldDescriptor> fields;
+    private final boolean flattened;
+
+    private RecordDescriptor(
+            SinkRecord record,
+            String topicName,
+            List<String> keyFieldNames,
+            List<String> nonKeyFieldNames,
+            Map<String, FieldDescriptor> fields,
+            boolean flattened) {
+        this.record = record;
+        this.topicName = topicName;
+        this.keyFieldNames = keyFieldNames;
+        this.nonKeyFieldNames = nonKeyFieldNames;
+        this.fields = fields;
+        this.flattened = flattened;
+    }
+
+    public String getTopicName() {
+        return topicName;
+    }
+
+    public Integer getPartition() {
+        return record.kafkaPartition();
+    }
+
+    public long getOffset() {
+        return record.kafkaOffset();
+    }
+
+    public List<String> getKeyFieldNames() {
+        return keyFieldNames;
+    }
+
+    public List<String> getNonKeyFieldNames() {
+        return nonKeyFieldNames;
+    }
+
+    public Map<String, FieldDescriptor> getFields() {
+        return fields;
+    }
+
+    public boolean isDebeziumSinkRecord() {
+        return !flattened;
+    }
+
+    public boolean isTombstone() {
+        // NOTE
+        // Debezium TOMBSTONE has both value and valueSchema to null, instead 
the
+        // ExtractNewRecordState SMT with delete.handling.mode=none will 
generate
+        // a record only with value null that by JDBC connector is treated as 
a flattened delete.
+        // See isDelete method.
+        return record.value() == null && record.valueSchema() == null;
+    }
+
+    public boolean isDelete() {
+        if (!isDebeziumSinkRecord()) {
+            return record.value() == null;
+        } else if (record.value() != null) {
+            final Struct value = (Struct) record.value();
+            return "d".equals(value.getString("op"));
+        }
+        return false;
+    }
+
+    public Struct getAfterStruct() {
+        if (isDebeziumSinkRecord()) {
+            return ((Struct) record.value()).getStruct("after");
+        } else {
+            return ((Struct) record.value());
+        }
+    }
+
+    public Struct getBeforeStruct() {
+        if (isDebeziumSinkRecord()) {
+            return ((Struct) record.value()).getStruct("before");
+        } else {
+            return ((Struct) record.value());
+        }
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static class FieldDescriptor {
+        private final Schema schema;
+        private final String name;
+        private final String schemaTypeName;
+        private final String schemaName;
+
+        public FieldDescriptor(
+                Schema schema, String name, String schemaTypeName, String 
schemaName) {
+            this.schema = schema;
+            this.name = name;
+            this.schemaTypeName = schemaTypeName;
+            this.schemaName = schemaName;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public String getSchemaName() {
+            return schemaName;
+        }
+
+        public Schema getSchema() {
+            return schema;
+        }
+
+        public String getSchemaTypeName() {
+            return schemaTypeName;
+        }
+    }
+
+    public static class Builder {
+
+        private SinkRecord sinkRecord;
+
+        // Internal build state
+        private final List<String> keyFieldNames = new ArrayList<>();
+        private final List<String> nonKeyFieldNames = new ArrayList<>();
+        private final Map<String, FieldDescriptor> allFields = new 
LinkedHashMap<>();
+
+        public Builder withSinkRecord(SinkRecord record) {
+            this.sinkRecord = record;
+            return this;
+        }
+
+        public RecordDescriptor build() {
+            Objects.requireNonNull(sinkRecord, "The sink record must be 
provided.");
+
+            final boolean flattened = !isTombstone(sinkRecord) && 
isFlattened(sinkRecord);
+            readSinkRecordNonKeyData(sinkRecord, flattened);
+
+            return new RecordDescriptor(
+                    sinkRecord,
+                    sinkRecord.topic(),
+                    keyFieldNames,
+                    nonKeyFieldNames,
+                    allFields,
+                    flattened);
+        }
+
+        private boolean isFlattened(SinkRecord record) {
+            return record.valueSchema().name() == null
+                    || !record.valueSchema().name().contains("Envelope");
+        }
+
+        private boolean isTombstone(SinkRecord record) {
+
+            return record.value() == null && record.valueSchema() == null;
+        }
+
+        private void readSinkRecordNonKeyData(SinkRecord record, boolean 
flattened) {
+            final Schema valueSchema = record.valueSchema();
+            if (valueSchema != null) {
+                if (flattened) {
+                    // In a flattened event type, it's safe to read the field 
names directly
+                    // from the schema as this isn't a complex Debezium 
message type.
+                    applyNonKeyFields(valueSchema);
+                } else {
+                    final Field after = valueSchema.field("after");
+                    if (after == null) {
+                        throw new ConnectException(
+                                "Received an unexpected message type that does 
not have an 'after' Debezium block");
+                    }
+                    applyNonKeyFields(after.schema());
+                }
+            }
+        }
+
+        private void applyNonKeyFields(Schema schema) {
+            for (Field field : schema.fields()) {
+                if (!keyFieldNames.contains(field.name())) {
+                    applyNonKeyField(field.name(), field.schema());
+                }
+            }
+        }
+
+        private void applyNonKeyField(String name, Schema schema) {
+            FieldDescriptor fieldDescriptor =
+                    new FieldDescriptor(schema, name, schema.type().name(), 
schema.name());
+            nonKeyFieldNames.add(fieldDescriptor.getName());
+            allFields.put(fieldDescriptor.getName(), fieldDescriptor);
+        }
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java 
b/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java
index bd55297..1390761 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java
@@ -20,19 +20,22 @@
 package org.apache.doris.kafka.connector.converter;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.StringJoiner;
 import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.converter.type.Type;
 import org.apache.doris.kafka.connector.exception.DataFormatException;
 import org.apache.doris.kafka.connector.writer.LoadConstants;
 import org.apache.doris.kafka.connector.writer.RecordBuffer;
 import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.json.JsonConverter;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.slf4j.Logger;
@@ -43,6 +46,7 @@ public class RecordService {
     private static final ObjectMapper MAPPER = new ObjectMapper();
     private final JsonConverter converter;
     private DorisOptions dorisOptions;
+    private RecordTypeRegister recordTypeRegister;
 
     public RecordService() {
         this.converter = new JsonConverter();
@@ -54,6 +58,7 @@ public class RecordService {
     public RecordService(DorisOptions dorisOptions) {
         this();
         this.dorisOptions = dorisOptions;
+        this.recordTypeRegister = new RecordTypeRegister(dorisOptions);
     }
 
     /**
@@ -61,27 +66,34 @@ public class RecordService {
      * "optional": false, "name": "" }, "payload": { "name": "doris", 
"__deleted": "true" } }
      */
     public String processStructRecord(SinkRecord record) {
-        byte[] bytes =
-                converter.fromConnectData(record.topic(), 
record.valueSchema(), record.value());
-        String recordValue = new String(bytes, StandardCharsets.UTF_8);
+        String processedRecord;
         if (ConverterMode.DEBEZIUM_INGESTION == 
dorisOptions.getConverterMode()) {
-            try {
-                Map<String, Object> recordMap =
-                        MAPPER.readValue(recordValue, new 
TypeReference<Map<String, Object>>() {});
-                // delete sign sync
-                if ("d".equals(recordMap.get("op"))) {
-                    Map<String, Object> beforeValue = (Map<String, Object>) 
recordMap.get("before");
-                    beforeValue.put(LoadConstants.DORIS_DELETE_SIGN, 
LoadConstants.DORIS_DEL_TRUE);
-                    return MAPPER.writeValueAsString(beforeValue);
-                }
-                Map<String, Object> afterValue = (Map<String, Object>) 
recordMap.get("after");
-                afterValue.put(LoadConstants.DORIS_DELETE_SIGN, 
LoadConstants.DORIS_DEL_FALSE);
-                return MAPPER.writeValueAsString(afterValue);
-            } catch (JsonProcessingException e) {
-                LOG.error("parse record failed, cause by parse json error: 
{}", recordValue);
+            RecordDescriptor recordDescriptor = buildRecordDescriptor(record);
+            if (recordDescriptor.isTombstone()) {
+                return null;
             }
+            List<String> nonKeyFieldNames = 
recordDescriptor.getNonKeyFieldNames();
+            if (recordDescriptor.isDelete()) {
+                processedRecord =
+                        parseFieldValues(
+                                recordDescriptor,
+                                recordDescriptor.getBeforeStruct(),
+                                nonKeyFieldNames,
+                                true);
+            } else {
+                processedRecord =
+                        parseFieldValues(
+                                recordDescriptor,
+                                recordDescriptor.getAfterStruct(),
+                                nonKeyFieldNames,
+                                false);
+            }
+        } else {
+            byte[] bytes =
+                    converter.fromConnectData(record.topic(), 
record.valueSchema(), record.value());
+            processedRecord = new String(bytes, StandardCharsets.UTF_8);
         }
-        return recordValue;
+        return processedRecord;
     }
 
     /** process list record from kafka [{"name":"doris1"},{"name":"doris2"}] */
@@ -114,6 +126,43 @@ public class RecordService {
         return record.value().toString();
     }
 
+    private String parseFieldValues(
+            RecordDescriptor record, Struct source, List<String> fields, 
boolean isDelete) {
+        Map<String, Object> filedMapping = new LinkedHashMap<>();
+        String filedResult = null;
+        final Map<String, Type> typeRegistry = 
recordTypeRegister.getTypeRegistry();
+        for (String fieldName : fields) {
+            final RecordDescriptor.FieldDescriptor field = 
record.getFields().get(fieldName);
+            String fieldSchemaName = field.getSchemaName();
+            String fieldSchemaTypeName = field.getSchemaTypeName();
+            Object value =
+                    field.getSchema().isOptional()
+                            ? source.getWithoutDefault(fieldName)
+                            : source.get(fieldName);
+            Type type =
+                    Objects.nonNull(fieldSchemaName)
+                            ? typeRegistry.get(fieldSchemaName)
+                            : typeRegistry.get(fieldSchemaTypeName);
+            Object convertValue = type.getValue(value);
+            if (Objects.nonNull(convertValue) && !type.isNumber()) {
+                filedMapping.put(fieldName, convertValue.toString());
+            } else {
+                filedMapping.put(fieldName, convertValue);
+            }
+        }
+        try {
+            if (isDelete) {
+                filedMapping.put(LoadConstants.DORIS_DELETE_SIGN, 
LoadConstants.DORIS_DEL_TRUE);
+            } else {
+                filedMapping.put(LoadConstants.DORIS_DELETE_SIGN, 
LoadConstants.DORIS_DEL_FALSE);
+            }
+            filedResult = MAPPER.writeValueAsString(filedMapping);
+        } catch (JsonProcessingException e) {
+            LOG.error("parse record failed, cause by parse json error: {}", 
filedMapping);
+        }
+        return filedResult;
+    }
+
     /**
      * Given a single Record from put API, process it and convert it into a 
Json String.
      *
@@ -121,14 +170,26 @@ public class RecordService {
      * @return Json String
      */
     public String getProcessedRecord(SinkRecord record) {
+        String processedRecord;
         if (record.value() instanceof Struct) {
-            return processStructRecord(record);
+            processedRecord = processStructRecord(record);
         } else if (record.value() instanceof List) {
-            return processListRecord(record);
+            processedRecord = processListRecord(record);
         } else if (record.value() instanceof Map) {
-            return processMapRecord(record);
+            processedRecord = processMapRecord(record);
         } else {
-            return processStringRecord(record);
+            processedRecord = record.value().toString();
+        }
+        return processedRecord;
+    }
+
+    private RecordDescriptor buildRecordDescriptor(SinkRecord record) {
+        RecordDescriptor recordDescriptor;
+        try {
+            recordDescriptor = 
RecordDescriptor.builder().withSinkRecord(record).build();
+        } catch (Exception e) {
+            throw new ConnectException("Failed to process a sink record", e);
         }
+        return recordDescriptor;
     }
 }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/RecordTypeRegister.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/RecordTypeRegister.java
new file mode 100644
index 0000000..e78909d
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/RecordTypeRegister.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.apache.doris.kafka.connector.converter.type.Type;
+import 
org.apache.doris.kafka.connector.converter.type.connect.ConnectBooleanType;
+import 
org.apache.doris.kafka.connector.converter.type.connect.ConnectBytesType;
+import org.apache.doris.kafka.connector.converter.type.connect.ConnectDateType;
+import 
org.apache.doris.kafka.connector.converter.type.connect.ConnectDecimalType;
+import 
org.apache.doris.kafka.connector.converter.type.connect.ConnectFloat32Type;
+import 
org.apache.doris.kafka.connector.converter.type.connect.ConnectFloat64Type;
+import 
org.apache.doris.kafka.connector.converter.type.connect.ConnectInt16Type;
+import 
org.apache.doris.kafka.connector.converter.type.connect.ConnectInt32Type;
+import 
org.apache.doris.kafka.connector.converter.type.connect.ConnectInt64Type;
+import org.apache.doris.kafka.connector.converter.type.connect.ConnectInt8Type;
+import 
org.apache.doris.kafka.connector.converter.type.connect.ConnectMapToConnectStringType;
+import 
org.apache.doris.kafka.connector.converter.type.connect.ConnectStringType;
+import org.apache.doris.kafka.connector.converter.type.connect.ConnectTimeType;
+import 
org.apache.doris.kafka.connector.converter.type.connect.ConnectTimestampType;
+import org.apache.doris.kafka.connector.converter.type.debezium.DateType;
+import org.apache.doris.kafka.connector.converter.type.debezium.MicroTimeType;
+import 
org.apache.doris.kafka.connector.converter.type.debezium.MicroTimestampType;
+import org.apache.doris.kafka.connector.converter.type.debezium.NanoTimeType;
+import 
org.apache.doris.kafka.connector.converter.type.debezium.NanoTimestampType;
+import org.apache.doris.kafka.connector.converter.type.debezium.TimeType;
+import org.apache.doris.kafka.connector.converter.type.debezium.TimestampType;
+import 
org.apache.doris.kafka.connector.converter.type.debezium.VariableScaleDecimalType;
+import org.apache.doris.kafka.connector.converter.type.debezium.ZonedTimeType;
+import 
org.apache.doris.kafka.connector.converter.type.debezium.ZonedTimestampType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RecordTypeRegister {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(RecordTypeRegister.class);
+
+    private final Map<String, Type> typeRegistry = new HashMap<>();
+    private final DorisOptions dorisOptions;
+
+    public RecordTypeRegister(DorisOptions dorisOptions) {
+        this.dorisOptions = dorisOptions;
+        registerTypes();
+    }
+
+    protected void registerTypes() {
+        // Supported common Debezium data types
+        registerType(DateType.INSTANCE);
+        registerType(TimeType.INSTANCE);
+        registerType(MicroTimeType.INSTANCE);
+        registerType(TimestampType.INSTANCE);
+        registerType(MicroTimestampType.INSTANCE);
+        registerType(NanoTimeType.INSTANCE);
+        registerType(NanoTimestampType.INSTANCE);
+        registerType(ZonedTimeType.INSTANCE);
+        registerType(ZonedTimestampType.INSTANCE);
+        registerType(VariableScaleDecimalType.INSTANCE);
+
+        // Supported connect data types
+        registerType(ConnectBooleanType.INSTANCE);
+        registerType(ConnectBytesType.INSTANCE);
+        registerType(ConnectDateType.INSTANCE);
+        registerType(ConnectDecimalType.INSTANCE);
+        registerType(ConnectFloat32Type.INSTANCE);
+        registerType(ConnectFloat64Type.INSTANCE);
+        registerType(ConnectInt8Type.INSTANCE);
+        registerType(ConnectInt16Type.INSTANCE);
+        registerType(ConnectInt32Type.INSTANCE);
+        registerType(ConnectInt64Type.INSTANCE);
+        registerType(ConnectStringType.INSTANCE);
+        registerType(ConnectTimestampType.INSTANCE);
+        registerType(ConnectTimeType.INSTANCE);
+        registerType(ConnectMapToConnectStringType.INSTANCE);
+    }
+
+    protected void registerType(Type type) {
+        type.configure(dorisOptions);
+        for (String key : type.getRegistrationKeys()) {
+            final Type existing = typeRegistry.put(key, type);
+            if (existing != null) {
+                LOG.debug(
+                        "Type replaced [{}]: {} -> {}",
+                        key,
+                        existing.getClass().getName(),
+                        type.getClass().getName());
+            } else {
+                LOG.debug("Type registered [{}]: {}", key, 
type.getClass().getName());
+            }
+        }
+    }
+
+    public Map<String, Type> getTypeRegistry() {
+        return typeRegistry;
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractDateType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractDateType.java
new file mode 100644
index 0000000..de32cd6
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractDateType.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type;
+
+/** An abstract base class for all temporal date implementations of {@link 
Type}. */
+public abstract class AbstractDateType extends AbstractTemporalType {}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTemporalType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTemporalType.java
new file mode 100644
index 0000000..4b50a79
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTemporalType.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type;
+
+import java.time.ZoneId;
+import java.util.TimeZone;
+import org.apache.doris.kafka.connector.cfg.DorisOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** An abstract base class for all temporal implementations of {@link Type}. */
+public abstract class AbstractTemporalType extends AbstractType {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractTemporalType.class);
+
+    private TimeZone databaseTimeZone;
+
+    @Override
+    public void configure(DorisOptions dorisOptions) {
+        final String databaseTimeZone = dorisOptions.getDatabaseTimeZone();
+        try {
+            this.databaseTimeZone = 
TimeZone.getTimeZone(ZoneId.of(databaseTimeZone));
+        } catch (Exception e) {
+            LOGGER.error(
+                    "Failed to resolve time zone '{}', please specify a 
correct time zone value",
+                    databaseTimeZone,
+                    e);
+            throw e;
+        }
+    }
+
+    protected TimeZone getDatabaseTimeZone() {
+        return databaseTimeZone;
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimeType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimeType.java
new file mode 100644
index 0000000..533f1e1
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimeType.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type;
+
+/** An abstract temporal implementation of {@link Type} for {@code TIME} based 
columns. */
+public abstract class AbstractTimeType extends AbstractTemporalType {}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimestampType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimestampType.java
new file mode 100644
index 0000000..3d50376
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractTimestampType.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type;
+
+/** An abstract temporal implementation of {@link Type} for {@code TIMESTAMP} 
based columns. */
+public abstract class AbstractTimestampType extends AbstractTemporalType {}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractType.java
new file mode 100644
index 0000000..d915a89
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/AbstractType.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type;
+
+import org.apache.doris.kafka.connector.cfg.DorisOptions;
+
+/** An abstract implementation of {@link Type}, which all types should extend. 
*/
+public abstract class AbstractType implements Type {
+
+    @Override
+    public void configure(DorisOptions dorisOptions) {}
+
+    @Override
+    public Object getValue(Object sourceValue) {
+        return sourceValue;
+    }
+
+    @Override
+    public boolean isNumber() {
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return getClass().getSimpleName();
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/Type.java 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/Type.java
new file mode 100644
index 0000000..c284f0e
--- /dev/null
+++ b/src/main/java/org/apache/doris/kafka/connector/converter/type/Type.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type;
+
+import org.apache.doris.kafka.connector.cfg.DorisOptions;
+
+/**
+ * A type indicates the type of each column of kafka record, including various 
column types of
+ * debezium and connect.
+ */
+public interface Type {
+
+    /** Allows a type to perform initialization/configuration tasks based on 
user configs. */
+    void configure(DorisOptions dorisOptions);
+
+    /**
+     * Returns the names that this type will be mapped as.
+     *
+     * <p>For example, when creating a custom mapping for {@code 
io.debezium.data.Bits}, a type
+     * could be registered using the {@code LOGICAL_NAME} of the schema if the 
type is to be used
+     * when a schema name is identified; otherwise it could be registered as 
the raw column type
+     * when column type propagation is enabled.
+     */
+    String[] getRegistrationKeys();
+
+    /** Get the actual converted value based on the column type. */
+    Object getValue(Object sourceValue);
+
+    boolean isNumber();
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/AbstractConnectMapType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/AbstractConnectMapType.java
new file mode 100644
index 0000000..53c64d1
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/AbstractConnectMapType.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.connect;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.connect.errors.ConnectException;
+
+/**
+ * An implementation of {@link 
org.apache.doris.kafka.connector.converter.type.Type} for {@code MAP}
+ * schema types.
+ */
+public abstract class AbstractConnectMapType extends AbstractConnectSchemaType 
{
+
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+
+    @Override
+    public String[] getRegistrationKeys() {
+        return new String[] {"MAP"};
+    }
+
+    protected String mapToJsonString(Object value) {
+        try {
+            return MAPPER.writeValueAsString(value);
+        } catch (JsonProcessingException e) {
+            throw new ConnectException("Failed to deserialize MAP data to 
JSON", e);
+        }
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/AbstractConnectSchemaType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/AbstractConnectSchemaType.java
new file mode 100644
index 0000000..d544f75
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/AbstractConnectSchemaType.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.connect;
+
+import org.apache.doris.kafka.connector.converter.type.AbstractType;
+
+/**
+ * An abstract implementation of {@link 
org.apache.doris.kafka.connector.converter.type.Type} that
+ * all Kafka Connect based schema types should be derived.
+ *
+ * <p>This abstract implementation is used as a marker object to designate 
types that are operating
+ * on the raw {@link org.apache.kafka.connect.data.Schema.Type} values rather 
than custom schema
+ * types that are contributed by Kafka Connect, such as Date or Time, or other 
third parties such as
+ * Debezium.
+ */
+public abstract class AbstractConnectSchemaType extends AbstractType {}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBooleanType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBooleanType.java
new file mode 100644
index 0000000..18c5af3
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBooleanType.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.connect;
+
+public class ConnectBooleanType extends AbstractConnectSchemaType {
+
+    public static final ConnectBooleanType INSTANCE = new ConnectBooleanType();
+
+    @Override
+    public String[] getRegistrationKeys() {
+        return new String[] {"BOOLEAN"};
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBytesType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBytesType.java
new file mode 100644
index 0000000..6c2701c
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectBytesType.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.connect;
+
+import java.nio.ByteBuffer;
+
+public class ConnectBytesType extends AbstractConnectSchemaType {
+
+    public static final ConnectBytesType INSTANCE = new ConnectBytesType();
+
+    @Override
+    public String[] getRegistrationKeys() {
+        return new String[] {"BYTES"};
+    }
+
+    @Override
+    public Object getValue(Object sourceValue) {
+        if (sourceValue == null) {
+            return null;
+        }
+        return bytesToHexString(getByteArrayFromValue(sourceValue));
+    }
+
+    private byte[] getByteArrayFromValue(Object value) {
+        byte[] byteArray = null;
+        if (value instanceof ByteBuffer) {
+            final ByteBuffer buffer = ((ByteBuffer) value).slice();
+            byteArray = new byte[buffer.remaining()];
+            buffer.get(byteArray);
+        } else if (value instanceof byte[]) {
+            byteArray = (byte[]) value;
+        }
+        return byteArray;
+    }
+
+    /** Convert hexadecimal byte array to string */
+    private String bytesToHexString(byte[] bytes) {
+        StringBuilder sb = new StringBuilder();
+        for (byte b : bytes) {
+            sb.append(String.format("%02X", b));
+        }
+        return sb.toString();
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDateType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDateType.java
new file mode 100644
index 0000000..4106f8d
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDateType.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.connect;
+
+import org.apache.doris.kafka.connector.converter.type.AbstractDateType;
+import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils;
+import org.apache.kafka.connect.data.Date;
+import org.apache.kafka.connect.errors.ConnectException;
+
+public class ConnectDateType extends AbstractDateType {
+
+    public static final ConnectDateType INSTANCE = new ConnectDateType();
+
+    @Override
+    public String[] getRegistrationKeys() {
+        return new String[] {Date.LOGICAL_NAME};
+    }
+
+    @Override
+    public Object getValue(Object sourceValue) {
+        if (sourceValue == null) {
+            return null;
+        }
+        if (sourceValue instanceof java.util.Date) {
+            return DateTimeUtils.toLocalDateFromDate((java.util.Date) 
sourceValue);
+        }
+        throw new ConnectException(
+                String.format(
+                        "Unexpected %s value '%s' with type '%s'",
+                        getClass().getSimpleName(), sourceValue, 
sourceValue.getClass().getName()));
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDecimalType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDecimalType.java
new file mode 100644
index 0000000..8625883
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectDecimalType.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.connect;
+
+import org.apache.doris.kafka.connector.converter.type.AbstractType;
+import org.apache.kafka.connect.data.Decimal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConnectDecimalType extends AbstractType {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ConnectDecimalType.class);
+
+    public static final ConnectDecimalType INSTANCE = new ConnectDecimalType();
+
+    @Override
+    public String[] getRegistrationKeys() {
+        return new String[] {Decimal.LOGICAL_NAME};
+    }
+
+    @Override
+    public boolean isNumber() {
+        return true;
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat32Type.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat32Type.java
new file mode 100644
index 0000000..98b6936
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat32Type.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.connect;
+
+public class ConnectFloat32Type extends AbstractConnectSchemaType {
+
+    public static final ConnectFloat32Type INSTANCE = new ConnectFloat32Type();
+
+    @Override
+    public String[] getRegistrationKeys() {
+        return new String[] {"FLOAT32"};
+    }
+
+    @Override
+    public boolean isNumber() {
+        return true;
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat64Type.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat64Type.java
new file mode 100644
index 0000000..f050c15
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectFloat64Type.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.connect;
+
+public class ConnectFloat64Type extends AbstractConnectSchemaType {
+
+    public static final ConnectFloat64Type INSTANCE = new ConnectFloat64Type();
+
+    @Override
+    public String[] getRegistrationKeys() {
+        return new String[] {"FLOAT64"};
+    }
+
+    @Override
+    public boolean isNumber() {
+        return true;
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt16Type.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt16Type.java
new file mode 100644
index 0000000..573813b
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt16Type.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.connect;
+
+public class ConnectInt16Type extends AbstractConnectSchemaType {
+
+    public static final ConnectInt16Type INSTANCE = new ConnectInt16Type();
+
+    @Override
+    public String[] getRegistrationKeys() {
+        return new String[] {"INT16"};
+    }
+
+    @Override
+    public boolean isNumber() {
+        return true;
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt32Type.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt32Type.java
new file mode 100644
index 0000000..50dd6c7
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt32Type.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.connect;
+
+public class ConnectInt32Type extends AbstractConnectSchemaType {
+
+    public static final ConnectInt32Type INSTANCE = new ConnectInt32Type();
+
+    @Override
+    public String[] getRegistrationKeys() {
+        return new String[] {"INT32"};
+    }
+
+    @Override
+    public boolean isNumber() {
+        return true;
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt64Type.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt64Type.java
new file mode 100644
index 0000000..c08abb6
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt64Type.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.connect;
+
+public class ConnectInt64Type extends AbstractConnectSchemaType {
+
+    public static final ConnectInt64Type INSTANCE = new ConnectInt64Type();
+
+    @Override
+    public String[] getRegistrationKeys() {
+        return new String[] {"INT64"};
+    }
+
+    @Override
+    public boolean isNumber() {
+        return true;
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt8Type.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt8Type.java
new file mode 100644
index 0000000..55c82cf
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectInt8Type.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.connect;
+
+public class ConnectInt8Type extends AbstractConnectSchemaType {
+
+    public static final ConnectInt8Type INSTANCE = new ConnectInt8Type();
+
+    @Override
+    public String[] getRegistrationKeys() {
+        return new String[] {"INT8"};
+    }
+
+    @Override
+    public boolean isNumber() {
+        return true;
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectMapToConnectStringType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectMapToConnectStringType.java
new file mode 100644
index 0000000..cac2624
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectMapToConnectStringType.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.connect;
+
+import java.util.Map;
+
+public class ConnectMapToConnectStringType extends AbstractConnectMapType {
+
+    public static final ConnectMapToConnectStringType INSTANCE =
+            new ConnectMapToConnectStringType();
+
+    @Override
+    public Object getValue(Object sourceValue) {
+        if (sourceValue instanceof Map) {
+            sourceValue = mapToJsonString(sourceValue);
+        }
+        return ConnectStringType.INSTANCE.getValue(sourceValue);
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectStringType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectStringType.java
new file mode 100644
index 0000000..0353020
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectStringType.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.connect;
+
+/**
+ * An implementation of {@link 
org.apache.doris.kafka.connector.converter.type.Type} that supports
+ * {@code STRING} connect schema types.
+ */
+public class ConnectStringType extends AbstractConnectSchemaType {
+
+    public static final ConnectStringType INSTANCE = new ConnectStringType();
+
+    @Override
+    public String[] getRegistrationKeys() {
+        return new String[] {"STRING"};
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimeType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimeType.java
new file mode 100644
index 0000000..de3be44
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimeType.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.connect;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.Date;
+import org.apache.doris.kafka.connector.converter.type.AbstractTimeType;
+import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils;
+import org.apache.kafka.connect.data.Time;
+import org.apache.kafka.connect.errors.ConnectException;
+
+public class ConnectTimeType extends AbstractTimeType {
+
+    public static final ConnectTimeType INSTANCE = new ConnectTimeType();
+
+    @Override
+    public String[] getRegistrationKeys() {
+        return new String[] {Time.LOGICAL_NAME};
+    }
+
+    @Override
+    public Object getValue(Object sourceValue) {
+        if (sourceValue == null) {
+            return null;
+        }
+        if (sourceValue instanceof Date) {
+
+            final LocalTime localTime = 
DateTimeUtils.toLocalTimeFromUtcDate((Date) sourceValue);
+            final LocalDateTime localDateTime = 
localTime.atDate(LocalDate.now());
+            return localDateTime.toLocalTime();
+        }
+
+        throw new ConnectException(
+                String.format(
+                        "Unexpected %s value '%s' with type '%s'",
+                        getClass().getSimpleName(), sourceValue, 
sourceValue.getClass().getName()));
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimestampType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimestampType.java
new file mode 100644
index 0000000..8af71b9
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/connect/ConnectTimestampType.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.connect;
+
+import org.apache.doris.kafka.connector.converter.type.AbstractTimestampType;
+import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils;
+import org.apache.kafka.connect.data.Timestamp;
+import org.apache.kafka.connect.errors.ConnectException;
+
+public class ConnectTimestampType extends AbstractTimestampType {
+
+    public static final ConnectTimestampType INSTANCE = new 
ConnectTimestampType();
+
+    @Override
+    public String[] getRegistrationKeys() {
+        return new String[] {Timestamp.LOGICAL_NAME};
+    }
+
+    @Override
+    public Object getValue(Object sourceValue) {
+        if (sourceValue == null) {
+            return null;
+        }
+        if (sourceValue instanceof java.util.Date) {
+            return DateTimeUtils.toLocalDateTimeFromDate((java.util.Date) 
sourceValue);
+        }
+
+        throw new ConnectException(
+                String.format(
+                        "Unexpected %s value '%s' with type '%s'",
+                        getClass().getSimpleName(), sourceValue, 
sourceValue.getClass().getName()));
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/AbstractDebeziumTimeType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/AbstractDebeziumTimeType.java
new file mode 100644
index 0000000..c0140b8
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/AbstractDebeziumTimeType.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.debezium;
+
+import java.time.LocalDate;
+import java.time.LocalTime;
+import org.apache.doris.kafka.connector.converter.type.AbstractTimeType;
+import org.apache.kafka.connect.errors.ConnectException;
+
+public abstract class AbstractDebeziumTimeType extends AbstractTimeType {
+
+    @Override
+    public Object getValue(Object sourceValue) {
+        if (sourceValue == null) {
+            return null;
+        }
+        if (sourceValue instanceof Number) {
+            final LocalTime localTime = getLocalTime((Number) sourceValue);
+            return localTime.atDate(LocalDate.now());
+        }
+        throw new ConnectException(
+                String.format(
+                        "Unexpected %s value '%s' with type '%s'",
+                        getClass().getSimpleName(), sourceValue, 
sourceValue.getClass().getName()));
+    }
+
+    protected abstract LocalTime getLocalTime(Number value);
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/AbstractDebeziumTimestampType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/AbstractDebeziumTimestampType.java
new file mode 100644
index 0000000..0d17dd1
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/AbstractDebeziumTimestampType.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.debezium;
+
+import java.time.LocalDateTime;
+import org.apache.doris.kafka.connector.converter.type.AbstractTimestampType;
+import org.apache.kafka.connect.errors.ConnectException;
+
+public abstract class AbstractDebeziumTimestampType extends 
AbstractTimestampType {
+
+    @Override
+    public Object getValue(Object sourceValue) {
+        if (sourceValue == null) {
+            return null;
+        }
+        if (sourceValue instanceof Number) {
+            return getLocalDateTime(((Number) sourceValue).longValue());
+        }
+        throw new ConnectException(
+                String.format(
+                        "Unexpected %s value '%s' with type '%s'",
+                        getClass().getSimpleName(), sourceValue, 
sourceValue.getClass().getName()));
+    }
+
+    protected abstract LocalDateTime getLocalDateTime(long value);
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/DateType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/DateType.java
new file mode 100644
index 0000000..912f0a4
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/DateType.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.debezium;
+
+import io.debezium.time.Date;
+import org.apache.doris.kafka.connector.converter.type.AbstractDateType;
+import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils;
+import org.apache.kafka.connect.errors.ConnectException;
+
+public class DateType extends AbstractDateType {
+
+    public static final DateType INSTANCE = new DateType();
+
+    @Override
+    public String[] getRegistrationKeys() {
+        return new String[] {Date.SCHEMA_NAME};
+    }
+
+    @Override
+    public Object getValue(Object sourceValue) {
+        if (sourceValue == null) {
+            return null;
+        }
+        if (sourceValue instanceof Number) {
+            return DateTimeUtils.toLocalDateOfEpochDays(((Number) 
sourceValue).longValue());
+        }
+
+        throw new ConnectException(
+                String.format(
+                        "Unexpected %s value '%s' with type '%s'",
+                        getClass().getSimpleName(), sourceValue, 
sourceValue.getClass().getName()));
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimeType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimeType.java
new file mode 100644
index 0000000..36eeceb
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimeType.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.debezium;
+
+import io.debezium.time.MicroTime;
+import java.time.LocalTime;
+import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils;
+
+public class MicroTimeType extends AbstractDebeziumTimeType {
+
+    public static final MicroTimeType INSTANCE = new MicroTimeType();
+
+    @Override
+    public String[] getRegistrationKeys() {
+        return new String[] {MicroTime.SCHEMA_NAME};
+    }
+
+    @Override
+    protected LocalTime getLocalTime(Number value) {
+        return 
DateTimeUtils.toLocalTimeFromDurationMicroseconds(value.longValue());
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimestampType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimestampType.java
new file mode 100644
index 0000000..b8c71a2
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/MicroTimestampType.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.debezium;
+
+import io.debezium.time.MicroTimestamp;
+import java.time.LocalDateTime;
+import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils;
+
+public class MicroTimestampType extends AbstractDebeziumTimestampType {
+
+    public static final MicroTimestampType INSTANCE = new MicroTimestampType();
+
+    @Override
+    public String[] getRegistrationKeys() {
+        return new String[] {MicroTimestamp.SCHEMA_NAME};
+    }
+
+    @Override
+    protected LocalDateTime getLocalDateTime(long value) {
+        return DateTimeUtils.toLocalDateTimeFromInstantEpochMicros(value);
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimeType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimeType.java
new file mode 100644
index 0000000..9519e64
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimeType.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.debezium;
+
+import io.debezium.time.NanoTime;
+import java.time.LocalTime;
+import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils;
+
+public class NanoTimeType extends AbstractDebeziumTimeType {
+
+    public static final NanoTimeType INSTANCE = new NanoTimeType();
+
+    @Override
+    public String[] getRegistrationKeys() {
+        return new String[] {NanoTime.SCHEMA_NAME};
+    }
+
+    @Override
+    protected LocalTime getLocalTime(Number value) {
+        return 
DateTimeUtils.toLocalTimeFromDurationNanoseconds(value.longValue());
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimestampType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimestampType.java
new file mode 100644
index 0000000..eec06c8
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/NanoTimestampType.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.debezium;
+
+import io.debezium.time.MicroTimestamp;
+import io.debezium.time.NanoTimestamp;
+import java.time.LocalDateTime;
+import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils;
+
+/**
+ * An implementation of {@link 
org.apache.doris.kafka.connector.converter.type.Type} for {@link
+ * MicroTimestamp} values.
+ */
+public class NanoTimestampType extends AbstractDebeziumTimestampType {
+
+    public static final NanoTimestampType INSTANCE = new NanoTimestampType();
+
+    @Override
+    public String[] getRegistrationKeys() {
+        return new String[] {NanoTimestamp.SCHEMA_NAME};
+    }
+
+    @Override
+    protected LocalDateTime getLocalDateTime(long value) {
+        return DateTimeUtils.toLocalDateTimeFromInstantEpochNanos(value);
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/TimeType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/TimeType.java
new file mode 100644
index 0000000..83e95d9
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/TimeType.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.debezium;
+
+import io.debezium.time.Time;
+import java.time.LocalTime;
+import org.apache.doris.kafka.connector.converter.utils.DateTimeUtils;
+
+public class TimeType extends AbstractDebeziumTimeType {
+
+    public static final TimeType INSTANCE = new TimeType();
+
+    @Override
+    public String[] getRegistrationKeys() {
+        return new String[] {Time.SCHEMA_NAME};
+    }
+
+    @Override
+    protected LocalTime getLocalTime(Number value) {
+        return 
DateTimeUtils.toLocalTimeFromDurationMilliseconds(value.longValue());
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/TimestampType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/TimestampType.java
new file mode 100644
index 0000000..e5547b1
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/TimestampType.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.debezium;
+
+import io.debezium.time.Timestamp;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+
+public class TimestampType extends AbstractDebeziumTimestampType {
+
+    public static final TimestampType INSTANCE = new TimestampType();
+
+    @Override
+    public String[] getRegistrationKeys() {
+        return new String[] {Timestamp.SCHEMA_NAME};
+    }
+
+    @Override
+    protected LocalDateTime getLocalDateTime(long value) {
+        return LocalDateTime.ofInstant(Instant.ofEpochMilli(value), 
ZoneOffset.UTC);
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/VariableScaleDecimalType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/VariableScaleDecimalType.java
new file mode 100644
index 0000000..e38fe41
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/VariableScaleDecimalType.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.debezium;
+
+import io.debezium.data.VariableScaleDecimal;
+import java.math.BigDecimal;
+import java.util.Optional;
+import org.apache.doris.kafka.connector.converter.type.AbstractType;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.ConnectException;
+
+public class VariableScaleDecimalType extends AbstractType {
+
+    public static final VariableScaleDecimalType INSTANCE = new 
VariableScaleDecimalType();
+
+    @Override
+    public String[] getRegistrationKeys() {
+        return new String[] {VariableScaleDecimal.LOGICAL_NAME};
+    }
+
+    @Override
+    public Object getValue(Object sourceValue) {
+        if (sourceValue == null) {
+            return null;
+        }
+        if (sourceValue instanceof Struct) {
+            Optional<BigDecimal> bigDecimalValue =
+                    VariableScaleDecimal.toLogical((Struct) 
sourceValue).getDecimalValue();
+            return bigDecimalValue.get();
+        }
+
+        throw new ConnectException(
+                String.format(
+                        "Unexpected %s value '%s' with type '%s'",
+                        getClass().getSimpleName(), sourceValue, 
sourceValue.getClass().getName()));
+    }
+
+    @Override
+    public boolean isNumber() {
+        return true;
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/ZonedTimeType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/ZonedTimeType.java
new file mode 100644
index 0000000..c3528d9
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/ZonedTimeType.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.debezium;
+
+import io.debezium.time.ZonedTime;
+import java.time.LocalDate;
+import java.time.OffsetTime;
+import java.time.ZonedDateTime;
+import org.apache.doris.kafka.connector.converter.type.AbstractTimeType;
+import org.apache.kafka.connect.errors.ConnectException;
+
+public class ZonedTimeType extends AbstractTimeType {
+
+    public static final ZonedTimeType INSTANCE = new ZonedTimeType();
+
+    @Override
+    public String[] getRegistrationKeys() {
+        return new String[] {ZonedTime.SCHEMA_NAME};
+    }
+
+    @Override
+    public Object getValue(Object sourceValue) {
+        if (sourceValue == null) {
+            return null;
+        }
+        if (sourceValue instanceof String) {
+            final ZonedDateTime zdt =
+                    OffsetTime.parse((String) sourceValue, ZonedTime.FORMATTER)
+                            .atDate(LocalDate.now())
+                            .toZonedDateTime();
+            return zdt.toOffsetDateTime();
+        }
+
+        throw new ConnectException(
+                String.format(
+                        "Unexpected %s value '%s' with type '%s'",
+                        getClass().getSimpleName(), sourceValue, 
sourceValue.getClass().getName()));
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/ZonedTimestampType.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/ZonedTimestampType.java
new file mode 100644
index 0000000..b03a400
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/type/debezium/ZonedTimestampType.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.type.debezium;
+
+import io.debezium.time.ZonedTimestamp;
+import java.time.ZonedDateTime;
+import org.apache.doris.kafka.connector.converter.type.AbstractTimestampType;
+import org.apache.kafka.connect.errors.ConnectException;
+
+public class ZonedTimestampType extends AbstractTimestampType {
+
+    public static final ZonedTimestampType INSTANCE = new ZonedTimestampType();
+
+    @Override
+    public String[] getRegistrationKeys() {
+        return new String[] {ZonedTimestamp.SCHEMA_NAME};
+    }
+
+    @Override
+    public Object getValue(Object sourceValue) {
+        if (sourceValue == null) {
+            return null;
+        }
+        if (sourceValue instanceof String) {
+            final ZonedDateTime zdt =
+                    ZonedDateTime.parse((String) sourceValue, 
ZonedTimestamp.FORMATTER)
+                            
.withZoneSameInstant(getDatabaseTimeZone().toZoneId());
+            return zdt.toOffsetDateTime();
+        }
+
+        throw new ConnectException(
+                String.format(
+                        "Unexpected %s value '%s' with type '%s'",
+                        getClass().getSimpleName(), sourceValue, 
sourceValue.getClass().getName()));
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/utils/DateTimeUtils.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/utils/DateTimeUtils.java
new file mode 100644
index 0000000..09ee9d0
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/utils/DateTimeUtils.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.doris.kafka.connector.converter.utils;
+
+import io.debezium.time.Conversions;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.time.temporal.ChronoUnit;
+import java.util.Date;
+import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
+
+public class DateTimeUtils {
+
+    private DateTimeUtils() {}
+
+    public static Instant toInstantFromNanos(long epochNanos) {
+        final long epochSeconds = TimeUnit.NANOSECONDS.toSeconds(epochNanos);
+        final long adjustment =
+                TimeUnit.NANOSECONDS.toNanos(epochNanos % 
TimeUnit.SECONDS.toNanos(1));
+        return Instant.ofEpochSecond(epochSeconds, adjustment);
+    }
+
+    public static ZonedDateTime toZonedDateTimeFromDate(Date date, TimeZone 
timeZone) {
+        return toZonedDateTimeFromDate(date, timeZone.toZoneId());
+    }
+
+    public static ZonedDateTime toZonedDateTimeFromDate(Date date, ZoneId 
zoneId) {
+        return date.toInstant().atZone(zoneId);
+    }
+
+    public static ZonedDateTime toZonedDateTimeFromInstantEpochMicros(long 
epochMicros) {
+        return 
Conversions.toInstantFromMicros(epochMicros).atZone(ZoneOffset.UTC);
+    }
+
+    public static ZonedDateTime toZonedDateTimeFromInstantEpochNanos(long 
epochNanos) {
+        return ZonedDateTime.ofInstant(toInstantFromNanos(epochNanos), 
ZoneOffset.UTC);
+    }
+
+    public static LocalDate toLocalDateOfEpochDays(long epochDays) {
+        return LocalDate.ofEpochDay(epochDays);
+    }
+
+    public static LocalDate toLocalDateFromDate(Date date) {
+        return toLocalDateFromInstantEpochMillis(date.getTime());
+    }
+
+    public static LocalDate toLocalDateFromInstantEpochMillis(long 
epochMillis) {
+        return LocalDate.ofEpochDay(Duration.ofMillis(epochMillis).toDays());
+    }
+
+    public static LocalTime toLocalTimeFromDurationMilliseconds(long 
durationMillis) {
+        return LocalTime.ofNanoOfDay(Duration.of(durationMillis, 
ChronoUnit.MILLIS).toNanos());
+    }
+
+    public static LocalTime toLocalTimeFromDurationMicroseconds(long 
durationMicros) {
+        return LocalTime.ofNanoOfDay(Duration.of(durationMicros, 
ChronoUnit.MICROS).toNanos());
+    }
+
+    public static LocalTime toLocalTimeFromDurationNanoseconds(long 
durationNanos) {
+        return LocalTime.ofNanoOfDay(Duration.of(durationNanos, 
ChronoUnit.NANOS).toNanos());
+    }
+
+    public static LocalTime toLocalTimeFromUtcDate(Date date) {
+        return date.toInstant().atOffset(ZoneOffset.UTC).toLocalTime();
+    }
+
+    public static LocalDateTime toLocalDateTimeFromDate(Date date) {
+        return toLocalDateTimeFromInstantEpochMillis(date.getTime());
+    }
+
+    public static LocalDateTime toLocalDateTimeFromInstantEpochMillis(long 
epochMillis) {
+        return LocalDateTime.ofInstant(
+                Conversions.toInstantFromMillis(epochMillis), ZoneOffset.UTC);
+    }
+
+    public static LocalDateTime toLocalDateTimeFromInstantEpochMicros(long 
epochMicros) {
+        return LocalDateTime.ofInstant(
+                Conversions.toInstantFromMicros(epochMicros), ZoneOffset.UTC);
+    }
+
+    public static LocalDateTime toLocalDateTimeFromInstantEpochNanos(long 
epochNanos) {
+        return LocalDateTime.ofInstant(toInstantFromNanos(epochNanos), 
ZoneOffset.UTC);
+    }
+
+    public static Timestamp toTimestampFromMillis(long epochMilliseconds) {
+        final Instant instant = 
Conversions.toInstantFromMillis(epochMilliseconds);
+        final Timestamp ts = new Timestamp(instant.toEpochMilli());
+        ts.setNanos(instant.getNano());
+        return ts;
+    }
+}
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
 
b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
index 0804980..498feb0 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/converter/TestRecordService.java
@@ -59,6 +59,44 @@ public class TestRecordService {
         jsonConverter.configure(config, false);
     }
 
+    /**
+     * The mysql table schema is as follows.
+     *
+     * <p>CREATE TABLE example_table ( id INT AUTO_INCREMENT PRIMARY KEY, name 
VARCHAR(50), age INT,
+     * email VARCHAR(100), birth_date DATE, integer_column INT, float_column 
FLOAT, decimal_column
+     * DECIMAL(10,2), datetime_column DATETIME, date_column DATE, time_column 
TIME, text_column
+     * TEXT, varchar_column VARCHAR(255), binary_column BINARY(10), 
blob_column BLOB, is_active
+     * TINYINT(1) );
+     */
+    @Test
+    public void processMysqlDebeziumStructRecord() throws IOException {
+        String topic = "normal.wdl_test.example_table";
+        // no delete value
+        String noDeleteValue =
+                
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"age\"},{\"type\":\"string\",\"optional\":true,\"field\":\"email\"},{\"type\":\"int32\",\"optional\":true,\"name\":\"io.debezium.time.Date\",\"version\":1,\"field\":\"birth_date\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"i
 [...]
+        String expectedNoDeleteValue =
+                "{\"id\":8,\"name\":\"Jfohn 
Doe\",\"age\":430,\"email\":\"j...@example.com\",\"birth_date\":\"1994-05-20\",\"integer_column\":12323,\"float_column\":45.67,\"decimal_column\":123.45,\"datetime_column\":\"2024-04-12T10:30\",\"date_column\":\"2024-04-12\",\"time_column\":\"2024-04-12T10:30\",\"text_column\":\"Lorem
 ipsum dolor sit amet, consectetur adipiscing 
elit.\",\"varchar_column\":null,\"binary_column\":\"1234567890ABCDEF0000\",\"blob_column\":null,\"is_active\":2,\"__D
 [...]
+        buildProcessStructRecord(topic, noDeleteValue, expectedNoDeleteValue);
+
+        // delete value
+        String deleteValue =
+                
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"age\"},{\"type\":\"string\",\"optional\":true,\"field\":\"email\"},{\"type\":\"int32\",\"optional\":true,\"name\":\"io.debezium.time.Date\",\"version\":1,\"field\":\"birth_date\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"i
 [...]
+        String expectedDeleteValue =
+                "{\"id\":8,\"name\":\"Jfohn 
Doe\",\"age\":430,\"email\":\"j...@example.com\",\"birth_date\":\"1994-05-20\",\"integer_column\":12323,\"float_column\":45.67,\"decimal_column\":123.45,\"datetime_column\":\"2024-04-12T10:30\",\"date_column\":\"2024-04-12\",\"time_column\":\"2024-04-12T10:30\",\"text_column\":\"Lorem
 ipsum dolor sit amet, consectetur adipiscing 
elit.\",\"varchar_column\":null,\"binary_column\":\"1234567890ABCDEF0000\",\"blob_column\":null,\"is_active\":2,\"__D
 [...]
+        buildProcessStructRecord(topic, deleteValue, expectedDeleteValue);
+    }
+
+    private void buildProcessStructRecord(String topic, String sourceValue, 
String target)
+            throws IOException {
+        SchemaAndValue noDeleteSchemaValue =
+                jsonConverter.toConnectData(topic, 
sourceValue.getBytes(StandardCharsets.UTF_8));
+        SinkRecord noDeleteSinkRecord =
+                TestRecordBuffer.newSinkRecord(
+                        noDeleteSchemaValue.value(), 8, 
noDeleteSchemaValue.schema());
+        String processResult = 
recordService.processStructRecord(noDeleteSinkRecord);
+        Assert.assertEquals(target, processResult);
+    }
+
     @Test
     public void processStructRecord() throws IOException {
         props.remove("converter.mode");
@@ -68,56 +106,27 @@ public class TestRecordService {
         // no delete value
         String noDeleteValue =
                 
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wdl_test.test_sink_normal.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wd
 [...]
-        SchemaAndValue noDeleteSchemaValue =
-                jsonConverter.toConnectData(topic, 
noDeleteValue.getBytes(StandardCharsets.UTF_8));
-        SinkRecord noDeleteSinkRecord =
-                TestRecordBuffer.newSinkRecord(
-                        noDeleteSchemaValue.value(), 8, 
noDeleteSchemaValue.schema());
-        String noDeleteResult = 
recordService.processStructRecord(noDeleteSinkRecord);
-        Assert.assertEquals(
-                
"{\"before\":null,\"after\":{\"id\":19,\"name\":\"fff\"},\"source\":{\"version\":\"2.5.4.Final\",\"connector\":\"mysql\",\"name\":\"normal\",\"ts_ms\":1712543697000,\"snapshot\":\"false\",\"db\":\"wdl_test\",\"sequence\":null,\"table\":\"test_sink_normal\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000061\",\"pos\":5320,\"row\":0,\"thread\":260,\"query\":null},\"op\":\"c\",\"ts_ms\":1712543697062,\"transaction\":null}",
-                noDeleteResult);
-
-        // delete value
-        String deleteValue =
-                
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wdl_test.test_sink_normal.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wd
 [...]
-        SchemaAndValue deleteSchemaValue =
-                jsonConverter.toConnectData(topic, 
deleteValue.getBytes(StandardCharsets.UTF_8));
-        SinkRecord record2 =
-                TestRecordBuffer.newSinkRecord(
-                        deleteSchemaValue.value(), 1, 
deleteSchemaValue.schema());
-        String s2 = recordService.processStructRecord(record2);
-        Assert.assertEquals(
-                
"{\"before\":{\"id\":24,\"name\":\"bb\"},\"after\":null,\"source\":{\"version\":\"2.5.4.Final\",\"connector\":\"mysql\",\"name\":\"normal\",\"ts_ms\":1712545844000,\"snapshot\":\"false\",\"db\":\"wdl_test\",\"sequence\":null,\"table\":\"test_sink_normal\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000061\",\"pos\":5627,\"row\":0,\"thread\":260,\"query\":null},\"op\":\"d\",\"ts_ms\":1712545844948,\"transaction\":null}",
-                s2);
+        String expectedNoDeleteValue =
+                
"{\"before\":null,\"after\":{\"id\":19,\"name\":\"fff\"},\"source\":{\"version\":\"2.5.4.Final\",\"connector\":\"mysql\",\"name\":\"normal\",\"ts_ms\":1712543697000,\"snapshot\":\"false\",\"db\":\"wdl_test\",\"sequence\":null,\"table\":\"test_sink_normal\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000061\",\"pos\":5320,\"row\":0,\"thread\":260,\"query\":null},\"op\":\"c\",\"ts_ms\":1712543697062,\"transaction\":null}";
+        buildProcessStructRecord(topic, noDeleteValue, expectedNoDeleteValue);
     }
 
     @Test
-    public void processStructRecordWithDebeziumSchema() {
+    public void processStructRecordWithDebeziumSchema() throws IOException {
         String topic = "normal.wdl_test.test_sink_normal";
 
         // no delete value
         String noDeleteValue =
                 
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wdl_test.test_sink_normal.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wd
 [...]
-        SchemaAndValue noDeleteSchemaValue =
-                jsonConverter.toConnectData(topic, 
noDeleteValue.getBytes(StandardCharsets.UTF_8));
-        SinkRecord noDeleteSinkRecord =
-                TestRecordBuffer.newSinkRecord(
-                        noDeleteSchemaValue.value(), 8, 
noDeleteSchemaValue.schema());
-        String noDeleteResult = 
recordService.processStructRecord(noDeleteSinkRecord);
-        Assert.assertEquals(
-                
"{\"id\":19,\"name\":\"fff\",\"__DORIS_DELETE_SIGN__\":\"0\"}", noDeleteResult);
+        String expectedNoDeleteValue =
+                "{\"id\":19,\"name\":\"fff\",\"__DORIS_DELETE_SIGN__\":\"0\"}";
+        buildProcessStructRecord(topic, noDeleteValue, expectedNoDeleteValue);
 
         // delete value
         String deleteValue =
                 
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wdl_test.test_sink_normal.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.wd
 [...]
-        SchemaAndValue deleteSchemaValue =
-                jsonConverter.toConnectData(topic, 
deleteValue.getBytes(StandardCharsets.UTF_8));
-        SinkRecord record2 =
-                TestRecordBuffer.newSinkRecord(
-                        deleteSchemaValue.value(), 1, 
deleteSchemaValue.schema());
-        String s2 = recordService.processStructRecord(record2);
-        
Assert.assertEquals("{\"id\":24,\"name\":\"bb\",\"__DORIS_DELETE_SIGN__\":\"1\"}",
 s2);
+        String expectedDeleteValue = 
"{\"id\":24,\"name\":\"bb\",\"__DORIS_DELETE_SIGN__\":\"1\"}";
+        buildProcessStructRecord(topic, deleteValue, expectedDeleteValue);
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to