This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 2acf82d174 [INLONG-10238][Sort] MySQL connector support audit ID 
(#10239)
2acf82d174 is described below

commit 2acf82d174c6358447422006f271785d9c59e2ab
Author: XiaoYou201 <58425449+xiaoyou...@users.noreply.github.com>
AuthorDate: Wed May 22 09:46:55 2024 +0800

    [INLONG-10238][Sort] MySQL connector support audit ID (#10239)
---
 .../apache/inlong/sort/mysql/MySqlTableSource.java | 372 +++++++++++
 .../inlong/sort/mysql/MysqlTableFactory.java       |  18 +-
 .../mysql/RowDataDebeziumDeserializeSchema.java    | 687 +++++++++++++++++++++
 licenses/inlong-sort-connectors/LICENSE            |  14 +-
 4 files changed, 1082 insertions(+), 9 deletions(-)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MySqlTableSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MySqlTableSource.java
new file mode 100644
index 0000000000..c244378aca
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MySqlTableSource.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.mysql;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+import 
com.ververica.cdc.connectors.mysql.table.MySqlDeserializationConverterFactory;
+import com.ververica.cdc.connectors.mysql.table.MySqlReadableMetadata;
+import com.ververica.cdc.connectors.mysql.table.StartupOptions;
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+import com.ververica.cdc.debezium.DebeziumSourceFunction;
+import com.ververica.cdc.debezium.table.MetadataConverter;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.connector.source.SourceProvider;
+import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.time.ZoneId;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link DynamicTableSource} that describes how to create a MySQL binlog 
source from a logical
+ * description.
+ * <p>
+ * Copy from com.ververica:flink-connector-mysql-cdc-2.3.0
+ */
+public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadata {
+
+    private final ResolvedSchema physicalSchema;
+    private final int port;
+    private final String hostname;
+    private final String database;
+    private final String username;
+    private final String password;
+    private final String serverId;
+    private final String tableName;
+    private final ZoneId serverTimeZone;
+    private final Properties dbzProperties;
+    private final boolean enableParallelRead;
+    private final int splitSize;
+    private final int splitMetaGroupSize;
+    private final int fetchSize;
+    private final Duration connectTimeout;
+    private final int connectionPoolSize;
+    private final int connectMaxRetries;
+    private final double distributionFactorUpper;
+    private final double distributionFactorLower;
+    private final StartupOptions startupOptions;
+    private final boolean scanNewlyAddedTableEnabled;
+    private final Properties jdbcProperties;
+    private final Duration heartbeatInterval;
+    private final String chunkKeyColumn;
+
+    // 
--------------------------------------------------------------------------------------------
+    // Mutable attributes
+    // 
--------------------------------------------------------------------------------------------
+
+    /** Data type that describes the final output of the source. */
+    protected DataType producedDataType;
+
+    /** Metadata that is appended at the end of a physical source row. */
+    protected List<String> metadataKeys;
+
+    private final MetricOption metricOption;
+
+    public MySqlTableSource(
+            ResolvedSchema physicalSchema,
+            int port,
+            String hostname,
+            String database,
+            String tableName,
+            String username,
+            String password,
+            ZoneId serverTimeZone,
+            Properties dbzProperties,
+            @Nullable String serverId,
+            boolean enableParallelRead,
+            int splitSize,
+            int splitMetaGroupSize,
+            int fetchSize,
+            Duration connectTimeout,
+            int connectMaxRetries,
+            int connectionPoolSize,
+            double distributionFactorUpper,
+            double distributionFactorLower,
+            StartupOptions startupOptions,
+            boolean scanNewlyAddedTableEnabled,
+            Properties jdbcProperties,
+            Duration heartbeatInterval,
+            @Nullable String chunkKeyColumn,
+            MetricOption metricOption) {
+        this.physicalSchema = physicalSchema;
+        this.port = port;
+        this.hostname = checkNotNull(hostname);
+        this.database = checkNotNull(database);
+        this.tableName = checkNotNull(tableName);
+        this.username = checkNotNull(username);
+        this.password = checkNotNull(password);
+        this.serverId = serverId;
+        this.serverTimeZone = serverTimeZone;
+        this.dbzProperties = dbzProperties;
+        this.enableParallelRead = enableParallelRead;
+        this.splitSize = splitSize;
+        this.splitMetaGroupSize = splitMetaGroupSize;
+        this.fetchSize = fetchSize;
+        this.connectTimeout = connectTimeout;
+        this.connectMaxRetries = connectMaxRetries;
+        this.connectionPoolSize = connectionPoolSize;
+        this.distributionFactorUpper = distributionFactorUpper;
+        this.distributionFactorLower = distributionFactorLower;
+        this.startupOptions = startupOptions;
+        this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
+        this.jdbcProperties = jdbcProperties;
+        // Mutable attributes
+        this.producedDataType = physicalSchema.toPhysicalRowDataType();
+        this.metadataKeys = Collections.emptyList();
+        this.heartbeatInterval = heartbeatInterval;
+        this.chunkKeyColumn = chunkKeyColumn;
+        this.metricOption = metricOption;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return ChangelogMode.newBuilder()
+                .addContainedKind(RowKind.INSERT)
+                .addContainedKind(RowKind.UPDATE_BEFORE)
+                .addContainedKind(RowKind.UPDATE_AFTER)
+                .addContainedKind(RowKind.DELETE)
+                .build();
+    }
+
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) 
{
+        RowType physicalDataType =
+                (RowType) 
physicalSchema.toPhysicalRowDataType().getLogicalType();
+        MetadataConverter[] metadataConverters = getMetadataConverters();
+        final TypeInformation<RowData> typeInfo =
+                scanContext.createTypeInformation(producedDataType);
+
+        DebeziumDeserializationSchema<RowData> deserializer =
+                RowDataDebeziumDeserializeSchema.newBuilder()
+                        .setPhysicalRowType(physicalDataType)
+                        .setMetadataConverters(metadataConverters)
+                        .setResultTypeInfo(typeInfo)
+                        .setServerTimeZone(serverTimeZone)
+                        .setUserDefinedConverterFactory(
+                                
MySqlDeserializationConverterFactory.instance())
+                        .setSourceMetricData(metricOption == null ? null : new 
SourceMetricData(metricOption))
+                        .build();
+        if (enableParallelRead) {
+            MySqlSource<RowData> parallelSource =
+                    MySqlSource.<RowData>builder()
+                            .hostname(hostname)
+                            .port(port)
+                            .databaseList(database)
+                            .tableList(database + "." + tableName)
+                            .username(username)
+                            .password(password)
+                            .serverTimeZone(serverTimeZone.toString())
+                            .serverId(serverId)
+                            .splitSize(splitSize)
+                            .splitMetaGroupSize(splitMetaGroupSize)
+                            .distributionFactorUpper(distributionFactorUpper)
+                            .distributionFactorLower(distributionFactorLower)
+                            .fetchSize(fetchSize)
+                            .connectTimeout(connectTimeout)
+                            .connectMaxRetries(connectMaxRetries)
+                            .connectionPoolSize(connectionPoolSize)
+                            .debeziumProperties(dbzProperties)
+                            .startupOptions(startupOptions)
+                            .deserializer(deserializer)
+                            
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
+                            .jdbcProperties(jdbcProperties)
+                            .heartbeatInterval(heartbeatInterval)
+                            .chunkKeyColumn(chunkKeyColumn)
+                            .build();
+            return SourceProvider.of(parallelSource);
+        } else {
+            com.ververica.cdc.connectors.mysql.MySqlSource.Builder<RowData> 
builder =
+                    
com.ververica.cdc.connectors.mysql.MySqlSource.<RowData>builder()
+                            .hostname(hostname)
+                            .port(port)
+                            .databaseList(database)
+                            .tableList(database + "." + tableName)
+                            .username(username)
+                            .password(password)
+                            .serverTimeZone(serverTimeZone.toString())
+                            .debeziumProperties(dbzProperties)
+                            .startupOptions(startupOptions)
+                            .deserializer(deserializer);
+            Optional.ofNullable(serverId)
+                    .ifPresent(serverId -> 
builder.serverId(Integer.parseInt(serverId)));
+            DebeziumSourceFunction<RowData> sourceFunction = builder.build();
+            return SourceFunctionProvider.of(sourceFunction, false);
+        }
+    }
+
+    protected MetadataConverter[] getMetadataConverters() {
+        if (metadataKeys.isEmpty()) {
+            return new MetadataConverter[0];
+        }
+
+        return metadataKeys.stream()
+                .map(
+                        key -> Stream.of(MySqlReadableMetadata.values())
+                                .filter(m -> m.getKey().equals(key))
+                                .findFirst()
+                                .orElseThrow(IllegalStateException::new))
+                .map(MySqlReadableMetadata::getConverter)
+                .toArray(MetadataConverter[]::new);
+    }
+
+    @Override
+    public Map<String, DataType> listReadableMetadata() {
+        return Stream.of(MySqlReadableMetadata.values())
+                .collect(
+                        Collectors.toMap(
+                                MySqlReadableMetadata::getKey, 
MySqlReadableMetadata::getDataType));
+    }
+
+    @Override
+    public void applyReadableMetadata(List<String> metadataKeys, DataType 
producedDataType) {
+        this.metadataKeys = metadataKeys;
+        this.producedDataType = producedDataType;
+    }
+
+    @Override
+    public DynamicTableSource copy() {
+        MySqlTableSource source =
+                new MySqlTableSource(
+                        physicalSchema,
+                        port,
+                        hostname,
+                        database,
+                        tableName,
+                        username,
+                        password,
+                        serverTimeZone,
+                        dbzProperties,
+                        serverId,
+                        enableParallelRead,
+                        splitSize,
+                        splitMetaGroupSize,
+                        fetchSize,
+                        connectTimeout,
+                        connectMaxRetries,
+                        connectionPoolSize,
+                        distributionFactorUpper,
+                        distributionFactorLower,
+                        startupOptions,
+                        scanNewlyAddedTableEnabled,
+                        jdbcProperties,
+                        heartbeatInterval,
+                        chunkKeyColumn,
+                        metricOption);
+        source.metadataKeys = metadataKeys;
+        source.producedDataType = producedDataType;
+        return source;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof MySqlTableSource)) {
+            return false;
+        }
+        MySqlTableSource that = (MySqlTableSource) o;
+        return port == that.port
+                && enableParallelRead == that.enableParallelRead
+                && splitSize == that.splitSize
+                && splitMetaGroupSize == that.splitMetaGroupSize
+                && fetchSize == that.fetchSize
+                && distributionFactorUpper == that.distributionFactorUpper
+                && distributionFactorLower == that.distributionFactorLower
+                && scanNewlyAddedTableEnabled == 
that.scanNewlyAddedTableEnabled
+                && Objects.equals(physicalSchema, that.physicalSchema)
+                && Objects.equals(hostname, that.hostname)
+                && Objects.equals(database, that.database)
+                && Objects.equals(username, that.username)
+                && Objects.equals(password, that.password)
+                && Objects.equals(serverId, that.serverId)
+                && Objects.equals(tableName, that.tableName)
+                && Objects.equals(serverTimeZone, that.serverTimeZone)
+                && Objects.equals(dbzProperties, that.dbzProperties)
+                && Objects.equals(connectTimeout, that.connectTimeout)
+                && Objects.equals(connectMaxRetries, that.connectMaxRetries)
+                && Objects.equals(connectionPoolSize, that.connectionPoolSize)
+                && Objects.equals(startupOptions, that.startupOptions)
+                && Objects.equals(producedDataType, that.producedDataType)
+                && Objects.equals(metadataKeys, that.metadataKeys)
+                && Objects.equals(jdbcProperties, that.jdbcProperties)
+                && Objects.equals(heartbeatInterval, that.heartbeatInterval)
+                && Objects.equals(chunkKeyColumn, that.chunkKeyColumn)
+                && Objects.equals(metricOption, that.metricOption);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                physicalSchema,
+                port,
+                hostname,
+                database,
+                username,
+                password,
+                serverId,
+                tableName,
+                serverTimeZone,
+                dbzProperties,
+                enableParallelRead,
+                splitSize,
+                splitMetaGroupSize,
+                fetchSize,
+                connectTimeout,
+                connectMaxRetries,
+                connectionPoolSize,
+                distributionFactorUpper,
+                distributionFactorLower,
+                startupOptions,
+                producedDataType,
+                metadataKeys,
+                scanNewlyAddedTableEnabled,
+                jdbcProperties,
+                heartbeatInterval,
+                chunkKeyColumn,
+                metricOption);
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "MySQL-CDC";
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java
index c684cc3ae4..f903780a36 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java
@@ -17,11 +17,12 @@
 
 package org.apache.inlong.sort.mysql;
 
+import org.apache.inlong.sort.base.metric.MetricOption;
+
 import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
 import com.ververica.cdc.connectors.mysql.source.config.ServerIdRange;
 import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
 import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder;
-import com.ververica.cdc.connectors.mysql.table.MySqlTableSource;
 import com.ververica.cdc.connectors.mysql.table.StartupOptions;
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.configuration.ConfigOption;
@@ -48,7 +49,8 @@ import static 
com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProper
 import static 
com.ververica.cdc.debezium.utils.ResolvedSchemaUtils.getPhysicalSchema;
 import static org.apache.flink.util.Preconditions.checkState;
 import static 
org.apache.inlong.common.constant.Constants.METRICS_AUDIT_PROXY_HOSTS_KEY;
-import static org.apache.inlong.sort.base.Constants.*;
+import static org.apache.inlong.sort.base.Constants.GH_OST_DDL_CHANGE;
+import static org.apache.inlong.sort.base.Constants.GH_OST_TABLE_REGEX;
 
 public class MysqlTableFactory implements DynamicTableSourceFactory {
 
@@ -97,6 +99,15 @@ public class MysqlTableFactory implements 
DynamicTableSourceFactory {
             validateDistributionFactorLower(distributionFactorLower);
         }
 
+        String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
+        String auditHostAndPorts = config.get(INLONG_AUDIT);
+        String auditKeys = config.get(AUDIT_KEYS);
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withAuditAddress(auditHostAndPorts)
+                .withAuditKeys(auditKeys)
+                .build();
+
         return new MySqlTableSource(physicalSchema,
                 port,
                 hostname,
@@ -120,7 +131,8 @@ public class MysqlTableFactory implements 
DynamicTableSourceFactory {
                 scanNewlyAddedTableEnabled,
                 getJdbcProperties(context.getCatalogTable().getOptions()),
                 heartbeatInterval,
-                chunkKeyColumn);
+                chunkKeyColumn,
+                metricOption);
     }
 
     @Override
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java
new file mode 100644
index 0000000000..e98a700b62
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java
@@ -0,0 +1,687 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.mysql;
+
+import org.apache.inlong.sort.base.metric.MetricsCollector;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+import com.ververica.cdc.debezium.table.AppendMetadataCollector;
+import com.ververica.cdc.debezium.table.DebeziumChangelogMode;
+import com.ververica.cdc.debezium.table.DeserializationRuntimeConverter;
+import com.ververica.cdc.debezium.table.DeserializationRuntimeConverterFactory;
+import com.ververica.cdc.debezium.table.MetadataConverter;
+import com.ververica.cdc.debezium.utils.TemporalConversions;
+import io.debezium.data.Envelope;
+import io.debezium.data.SpecialValueDecimal;
+import io.debezium.data.VariableScaleDecimal;
+import io.debezium.time.MicroTime;
+import io.debezium.time.MicroTimestamp;
+import io.debezium.time.NanoTime;
+import io.debezium.time.NanoTimestamp;
+import io.debezium.time.Timestamp;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+import org.apache.kafka.connect.data.Decimal;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Deserialization schema from Debezium object to Flink Table/SQL internal 
data structure {@link
+ * RowData}.
+ * <p>
+ * Copy from com.ververica:flink-connector-mysql-cdc-2.3.0
+ */
+public final class RowDataDebeziumDeserializeSchema implements 
DebeziumDeserializationSchema<RowData> {
+
+    private final static Logger LOG = 
LoggerFactory.getLogger(RowDataDebeziumDeserializeSchema.class);
+
+    private static final long serialVersionUID = 2L;
+
+    /** Custom validator to validate the row value. */
+    public interface ValueValidator extends Serializable {
+
+        void validate(RowData rowData, RowKind rowKind) throws Exception;
+    }
+
+    /** TypeInformation of the produced {@link RowData}. * */
+    private final TypeInformation<RowData> resultTypeInfo;
+
+    /**
+     * Runtime converter that converts Kafka {@link SourceRecord}s into {@link 
RowData} consisted of
+     * physical column values.
+     */
+    private final DeserializationRuntimeConverter physicalConverter;
+
+    /** Whether the deserializer needs to handle metadata columns. */
+    private final boolean hasMetadata;
+
+    /**
+     * A wrapped output collector which is used to append metadata columns 
after physical columns.
+     */
+    private final AppendMetadataCollector appendMetadataCollector;
+
+    /** Validator to validate the row value. */
+    private final ValueValidator validator;
+
+    /** Changelog Mode to use for encoding changes in Flink internal data 
structure. */
+    private final DebeziumChangelogMode changelogMode;
+    private final SourceMetricData sourceMetricData;
+
+    /** Returns a builder to build {@link RowDataDebeziumDeserializeSchema}. */
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    RowDataDebeziumDeserializeSchema(
+            RowType physicalDataType,
+            MetadataConverter[] metadataConverters,
+            TypeInformation<RowData> resultTypeInfo,
+            ValueValidator validator,
+            ZoneId serverTimeZone,
+            DeserializationRuntimeConverterFactory userDefinedConverterFactory,
+            DebeziumChangelogMode changelogMode,
+            SourceMetricData sourceMetricData) {
+        this.hasMetadata = checkNotNull(metadataConverters).length > 0;
+        this.appendMetadataCollector = new 
AppendMetadataCollector(metadataConverters);
+        this.physicalConverter =
+                createConverter(
+                        checkNotNull(physicalDataType),
+                        serverTimeZone,
+                        userDefinedConverterFactory);
+        this.resultTypeInfo = checkNotNull(resultTypeInfo);
+        this.validator = checkNotNull(validator);
+        this.changelogMode = checkNotNull(changelogMode);
+        this.sourceMetricData = sourceMetricData;
+    }
+
+    @Override
+    public void deserialize(SourceRecord record, Collector<RowData> out) 
throws Exception {
+        Envelope.Operation op = Envelope.operationFor(record);
+        Struct value = (Struct) record.value();
+        Schema valueSchema = record.valueSchema();
+        if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
+            GenericRowData insert = extractAfterRow(value, valueSchema);
+            validator.validate(insert, RowKind.INSERT);
+            insert.setRowKind(RowKind.INSERT);
+            if (sourceMetricData != null) {
+                out = new MetricsCollector<>(out, sourceMetricData);
+            }
+            emit(record, insert, out);
+        } else if (op == Envelope.Operation.DELETE) {
+            GenericRowData delete = extractBeforeRow(value, valueSchema);
+            validator.validate(delete, RowKind.DELETE);
+            delete.setRowKind(RowKind.DELETE);
+            emit(record, delete, out);
+        } else {
+            if (changelogMode == DebeziumChangelogMode.ALL) {
+                GenericRowData before = extractBeforeRow(value, valueSchema);
+                validator.validate(before, RowKind.UPDATE_BEFORE);
+                before.setRowKind(RowKind.UPDATE_BEFORE);
+                emit(record, before, out);
+            }
+
+            GenericRowData after = extractAfterRow(value, valueSchema);
+            validator.validate(after, RowKind.UPDATE_AFTER);
+            after.setRowKind(RowKind.UPDATE_AFTER);
+            if (sourceMetricData != null) {
+                out = new MetricsCollector<>(out, sourceMetricData);
+            }
+            emit(record, after, out);
+        }
+    }
+
+    private GenericRowData extractAfterRow(Struct value, Schema valueSchema) 
throws Exception {
+        Schema afterSchema = 
valueSchema.field(Envelope.FieldName.AFTER).schema();
+        Struct after = value.getStruct(Envelope.FieldName.AFTER);
+        return (GenericRowData) physicalConverter.convert(after, afterSchema);
+    }
+
+    private GenericRowData extractBeforeRow(Struct value, Schema valueSchema) 
throws Exception {
+        Schema beforeSchema = 
valueSchema.field(Envelope.FieldName.BEFORE).schema();
+        Struct before = value.getStruct(Envelope.FieldName.BEFORE);
+        return (GenericRowData) physicalConverter.convert(before, 
beforeSchema);
+    }
+
+    private void emit(SourceRecord inRecord, RowData physicalRow, 
Collector<RowData> collector) {
+        if (!hasMetadata) {
+            collector.collect(physicalRow);
+            return;
+        }
+        appendMetadataCollector.inputRecord = inRecord;
+        appendMetadataCollector.outputCollector = collector;
+        appendMetadataCollector.collect(physicalRow);
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return resultTypeInfo;
+    }
+
+    // 
-------------------------------------------------------------------------------------
+    // Builder
+    // 
-------------------------------------------------------------------------------------
+
+    /** Builder of {@link RowDataDebeziumDeserializeSchema}. */
+    public static class Builder {
+
+        private RowType physicalRowType;
+        private TypeInformation<RowData> resultTypeInfo;
+        private MetadataConverter[] metadataConverters = new 
MetadataConverter[0];
+        private ValueValidator validator = (rowData, rowKind) -> {
+        };
+        private ZoneId serverTimeZone = ZoneId.of("UTC");
+        private DeserializationRuntimeConverterFactory 
userDefinedConverterFactory =
+                DeserializationRuntimeConverterFactory.DEFAULT;
+        private DebeziumChangelogMode changelogMode = 
DebeziumChangelogMode.ALL;
+        private SourceMetricData sourceMetricData;
+
+        public Builder setPhysicalRowType(RowType physicalRowType) {
+            this.physicalRowType = physicalRowType;
+            return this;
+        }
+
+        public Builder setMetadataConverters(MetadataConverter[] 
metadataConverters) {
+            this.metadataConverters = metadataConverters;
+            return this;
+        }
+
+        public Builder setResultTypeInfo(TypeInformation<RowData> 
resultTypeInfo) {
+            this.resultTypeInfo = resultTypeInfo;
+            return this;
+        }
+
+        public Builder setValueValidator(ValueValidator validator) {
+            this.validator = validator;
+            return this;
+        }
+
+        public Builder setServerTimeZone(ZoneId serverTimeZone) {
+            this.serverTimeZone = serverTimeZone;
+            return this;
+        }
+
+        public Builder setUserDefinedConverterFactory(
+                DeserializationRuntimeConverterFactory 
userDefinedConverterFactory) {
+            this.userDefinedConverterFactory = userDefinedConverterFactory;
+            return this;
+        }
+
+        public Builder setChangelogMode(DebeziumChangelogMode changelogMode) {
+            this.changelogMode = changelogMode;
+            return this;
+        }
+        public Builder setSourceMetricData(SourceMetricData sourceMetricData) {
+            this.sourceMetricData = sourceMetricData;
+            return this;
+        }
+
+        public RowDataDebeziumDeserializeSchema build() {
+            return new RowDataDebeziumDeserializeSchema(
+                    physicalRowType,
+                    metadataConverters,
+                    resultTypeInfo,
+                    validator,
+                    serverTimeZone,
+                    userDefinedConverterFactory,
+                    changelogMode,
+                    sourceMetricData);
+        }
+    }
+
+    // 
-------------------------------------------------------------------------------------
+    // Runtime Converters
+    // 
-------------------------------------------------------------------------------------
+
+    /** Creates a runtime converter which is null safe. */
+    private static DeserializationRuntimeConverter createConverter(
+            LogicalType type,
+            ZoneId serverTimeZone,
+            DeserializationRuntimeConverterFactory 
userDefinedConverterFactory) {
+        return wrapIntoNullableConverter(
+                createNotNullConverter(type, serverTimeZone, 
userDefinedConverterFactory));
+    }
+
+    // 
--------------------------------------------------------------------------------
+    // IMPORTANT! We use anonymous classes instead of lambdas for a reason 
here. It is
+    // necessary because the maven shade plugin cannot relocate classes in
+    // SerializedLambdas (MSHADE-260).
+    // 
--------------------------------------------------------------------------------
+
+    /** Creates a runtime converter which assuming input object is not null. */
+    public static DeserializationRuntimeConverter createNotNullConverter(
+            LogicalType type,
+            ZoneId serverTimeZone,
+            DeserializationRuntimeConverterFactory 
userDefinedConverterFactory) {
+        // user defined converter has a higher resolve order
+        Optional<DeserializationRuntimeConverter> converter =
+                userDefinedConverterFactory.createUserDefinedConverter(type, 
serverTimeZone);
+        if (converter.isPresent()) {
+            return converter.get();
+        }
+
+        // if no matched user defined converter, fallback to the default 
converter
+        switch (type.getTypeRoot()) {
+            case NULL:
+                return new DeserializationRuntimeConverter() {
+
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(Object dbzObj, Schema schema) {
+                        return null;
+                    }
+                };
+            case BOOLEAN:
+                return convertToBoolean();
+            case TINYINT:
+                return new DeserializationRuntimeConverter() {
+
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(Object dbzObj, Schema schema) {
+                        return Byte.parseByte(dbzObj.toString());
+                    }
+                };
+            case SMALLINT:
+                return new DeserializationRuntimeConverter() {
+
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(Object dbzObj, Schema schema) {
+                        return Short.parseShort(dbzObj.toString());
+                    }
+                };
+            case INTEGER:
+            case INTERVAL_YEAR_MONTH:
+                return convertToInt();
+            case BIGINT:
+            case INTERVAL_DAY_TIME:
+                return convertToLong();
+            case DATE:
+                return convertToDate();
+            case TIME_WITHOUT_TIME_ZONE:
+                return convertToTime();
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return convertToTimestamp(serverTimeZone);
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return convertToLocalTimeZoneTimestamp(serverTimeZone);
+            case FLOAT:
+                return convertToFloat();
+            case DOUBLE:
+                return convertToDouble();
+            case CHAR:
+            case VARCHAR:
+                return convertToString();
+            case BINARY:
+            case VARBINARY:
+                return convertToBinary();
+            case DECIMAL:
+                return createDecimalConverter((DecimalType) type);
+            case ROW:
+                return createRowConverter(
+                        (RowType) type, serverTimeZone, 
userDefinedConverterFactory);
+            case ARRAY:
+            case MAP:
+            case MULTISET:
+            case RAW:
+            default:
+                throw new UnsupportedOperationException("Unsupported type: " + 
type);
+        }
+    }
+
+    private static DeserializationRuntimeConverter convertToBoolean() {
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                if (dbzObj instanceof Boolean) {
+                    return dbzObj;
+                } else if (dbzObj instanceof Byte) {
+                    return (byte) dbzObj == 1;
+                } else if (dbzObj instanceof Short) {
+                    return (short) dbzObj == 1;
+                } else {
+                    return Boolean.parseBoolean(dbzObj.toString());
+                }
+            }
+        };
+    }
+
+    private static DeserializationRuntimeConverter convertToInt() {
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                if (dbzObj instanceof Integer) {
+                    return dbzObj;
+                } else if (dbzObj instanceof Long) {
+                    return ((Long) dbzObj).intValue();
+                } else {
+                    return Integer.parseInt(dbzObj.toString());
+                }
+            }
+        };
+    }
+
+    private static DeserializationRuntimeConverter convertToLong() {
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                if (dbzObj instanceof Integer) {
+                    return ((Integer) dbzObj).longValue();
+                } else if (dbzObj instanceof Long) {
+                    return dbzObj;
+                } else {
+                    return Long.parseLong(dbzObj.toString());
+                }
+            }
+        };
+    }
+
+    private static DeserializationRuntimeConverter convertToDouble() {
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                if (dbzObj instanceof Float) {
+                    return ((Float) dbzObj).doubleValue();
+                } else if (dbzObj instanceof Double) {
+                    return dbzObj;
+                } else {
+                    return Double.parseDouble(dbzObj.toString());
+                }
+            }
+        };
+    }
+
+    private static DeserializationRuntimeConverter convertToFloat() {
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                if (dbzObj instanceof Float) {
+                    return dbzObj;
+                } else if (dbzObj instanceof Double) {
+                    return ((Double) dbzObj).floatValue();
+                } else {
+                    return Float.parseFloat(dbzObj.toString());
+                }
+            }
+        };
+    }
+
+    private static DeserializationRuntimeConverter convertToDate() {
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                return (int) 
TemporalConversions.toLocalDate(dbzObj).toEpochDay();
+            }
+        };
+    }
+
+    private static DeserializationRuntimeConverter convertToTime() {
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                if (dbzObj instanceof Long) {
+                    switch (schema.name()) {
+                        case MicroTime.SCHEMA_NAME:
+                            return (int) ((long) dbzObj / 1000);
+                        case NanoTime.SCHEMA_NAME:
+                            return (int) ((long) dbzObj / 1000_000);
+                    }
+                } else if (dbzObj instanceof Integer) {
+                    return dbzObj;
+                }
+                // get number of milliseconds of the day
+                return TemporalConversions.toLocalTime(dbzObj).toSecondOfDay() 
* 1000;
+            }
+        };
+    }
+
+    private static DeserializationRuntimeConverter convertToTimestamp(ZoneId 
serverTimeZone) {
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                if (dbzObj instanceof Long) {
+                    switch (schema.name()) {
+                        case Timestamp.SCHEMA_NAME:
+                            return TimestampData.fromEpochMillis((Long) 
dbzObj);
+                        case MicroTimestamp.SCHEMA_NAME:
+                            long micro = (long) dbzObj;
+                            return TimestampData.fromEpochMillis(
+                                    micro / 1000, (int) (micro % 1000 * 1000));
+                        case NanoTimestamp.SCHEMA_NAME:
+                            long nano = (long) dbzObj;
+                            return TimestampData.fromEpochMillis(
+                                    nano / 1000_000, (int) (nano % 1000_000));
+                    }
+                }
+                LocalDateTime localDateTime =
+                        TemporalConversions.toLocalDateTime(dbzObj, 
serverTimeZone);
+                return TimestampData.fromLocalDateTime(localDateTime);
+            }
+        };
+    }
+
+    private static DeserializationRuntimeConverter 
convertToLocalTimeZoneTimestamp(
+            ZoneId serverTimeZone) {
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                if (dbzObj instanceof String) {
+                    String str = (String) dbzObj;
+                    // TIMESTAMP_LTZ type is encoded in string type
+                    Instant instant = Instant.parse(str);
+                    return TimestampData.fromLocalDateTime(
+                            LocalDateTime.ofInstant(instant, serverTimeZone));
+                }
+                throw new IllegalArgumentException(
+                        "Unable to convert to TimestampData from unexpected 
value '"
+                                + dbzObj
+                                + "' of type "
+                                + dbzObj.getClass().getName());
+            }
+        };
+    }
+
+    private static DeserializationRuntimeConverter convertToString() {
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                return StringData.fromString(dbzObj.toString());
+            }
+        };
+    }
+
+    private static DeserializationRuntimeConverter convertToBinary() {
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                if (dbzObj instanceof byte[]) {
+                    return dbzObj;
+                } else if (dbzObj instanceof ByteBuffer) {
+                    ByteBuffer byteBuffer = (ByteBuffer) dbzObj;
+                    byte[] bytes = new byte[byteBuffer.remaining()];
+                    byteBuffer.get(bytes);
+                    return bytes;
+                } else {
+                    throw new UnsupportedOperationException(
+                            "Unsupported BYTES value type: " + 
dbzObj.getClass().getSimpleName());
+                }
+            }
+        };
+    }
+
+    private static DeserializationRuntimeConverter 
createDecimalConverter(DecimalType decimalType) {
+        final int precision = decimalType.getPrecision();
+        final int scale = decimalType.getScale();
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                BigDecimal bigDecimal;
+                if (dbzObj instanceof byte[]) {
+                    // decimal.handling.mode=precise
+                    bigDecimal = Decimal.toLogical(schema, (byte[]) dbzObj);
+                } else if (dbzObj instanceof String) {
+                    // decimal.handling.mode=string
+                    bigDecimal = new BigDecimal((String) dbzObj);
+                } else if (dbzObj instanceof Double) {
+                    // decimal.handling.mode=double
+                    bigDecimal = BigDecimal.valueOf((Double) dbzObj);
+                } else {
+                    if 
(VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) {
+                        SpecialValueDecimal decimal =
+                                VariableScaleDecimal.toLogical((Struct) 
dbzObj);
+                        bigDecimal = 
decimal.getDecimalValue().orElse(BigDecimal.ZERO);
+                    } else {
+                        // fallback to string
+                        bigDecimal = new BigDecimal(dbzObj.toString());
+                    }
+                }
+                return DecimalData.fromBigDecimal(bigDecimal, precision, 
scale);
+            }
+        };
+    }
+
+    private static DeserializationRuntimeConverter createRowConverter(
+            RowType rowType,
+            ZoneId serverTimeZone,
+            DeserializationRuntimeConverterFactory 
userDefinedConverterFactory) {
+        final DeserializationRuntimeConverter[] fieldConverters =
+                rowType.getFields().stream()
+                        .map(RowType.RowField::getType)
+                        .map(
+                                logicType -> createConverter(
+                                        logicType,
+                                        serverTimeZone,
+                                        userDefinedConverterFactory))
+                        .toArray(DeserializationRuntimeConverter[]::new);
+        final String[] fieldNames = rowType.getFieldNames().toArray(new 
String[0]);
+
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) throws 
Exception {
+                Struct struct = (Struct) dbzObj;
+                int arity = fieldNames.length;
+                GenericRowData row = new GenericRowData(arity);
+                for (int i = 0; i < arity; i++) {
+                    String fieldName = fieldNames[i];
+                    Field field = schema.field(fieldName);
+                    if (field == null) {
+                        row.setField(i, null);
+                    } else {
+                        Object fieldValue = 
struct.getWithoutDefault(fieldName);
+                        Schema fieldSchema = schema.field(fieldName).schema();
+                        Object convertedField =
+                                convertField(fieldConverters[i], fieldValue, 
fieldSchema);
+                        row.setField(i, convertedField);
+                    }
+                }
+                return row;
+            }
+        };
+    }
+
+    private static Object convertField(
+            DeserializationRuntimeConverter fieldConverter, Object fieldValue, 
Schema fieldSchema)
+            throws Exception {
+        if (fieldValue == null) {
+            return null;
+        } else {
+            return fieldConverter.convert(fieldValue, fieldSchema);
+        }
+    }
+
+    private static DeserializationRuntimeConverter wrapIntoNullableConverter(
+            DeserializationRuntimeConverter converter) {
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) throws 
Exception {
+                if (dbzObj == null) {
+                    return null;
+                }
+                return converter.convert(dbzObj, schema);
+            }
+        };
+    }
+}
diff --git a/licenses/inlong-sort-connectors/LICENSE 
b/licenses/inlong-sort-connectors/LICENSE
index 3546604c06..e3a995cc65 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -857,25 +857,27 @@
     Source  : org.apache.flink:flink-connector-pulsar:4.1.0-1.18 (Please note 
that the software have been modified.)
     License : 
https://github.com/apache/flink-connector-pulsar/blob/main/LICENSE
 
-
 1.3.23 
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBConnectorDeserializationSchema.java
        
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodbMongoDBTableSource.java
     Source  : com.ververica:flink-connector-mongodb-cdc:2.3.0 (Please note 
that the software have been modified.)
     License : 
https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
 
-
 1.3.24 
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableFactory.java
              
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java
              
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgresValueValidator.java
              
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java
-Source  : com.ververica:flink-connector-mongodb-cdc:2.3.0 (Please note that 
the software have been modified.)
+Source  : com.ververica:flink-connector-postgres-cdc:2.3.0 (Please note that 
the software have been modified.)
 License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
 
-
-
 1.3.25 
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java
        
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerTableSource.java
-Source  : com.ververica:flink-connector-mongodb-cdc:2.3.0 (Please note that 
the software have been modified.)
+Source  : com.ververica:flink-connector-sqlserver-cdc:2.3.0 (Please note that 
the software have been modified.)
+License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
+
+1.3.27 
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MySqlTableSource.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java
+Source  : com.ververica:flink-connector-mysql-cdc:2.3.0 (Please note that the 
software have been modified.)
 License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
 
 

Reply via email to