This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new c5dbfabf22 [INLONG-10194][Sort] Sqlserver connector support audit ID 
(#10212)
c5dbfabf22 is described below

commit c5dbfabf224bee26373562e3e2987dd029b7eba9
Author: XiaoYou201 <58425449+xiaoyou...@users.noreply.github.com>
AuthorDate: Wed May 15 16:02:14 2024 +0800

    [INLONG-10194][Sort] Sqlserver connector support audit ID (#10212)
---
 .../RowDataDebeziumDeserializeSchema.java          | 674 +++++++++++++++++++++
 .../sort/sqlserver/SqlServerTableSource.java       | 254 ++++++++
 .../sort/sqlserver/SqlserverTableFactory.java      |  20 +-
 licenses/inlong-sort-connectors/LICENSE            |   5 +
 4 files changed, 951 insertions(+), 2 deletions(-)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java
new file mode 100644
index 0000000000..210a55f7c5
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java
@@ -0,0 +1,674 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.sqlserver;
+
+import org.apache.inlong.sort.base.metric.MetricsCollector;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+import com.ververica.cdc.debezium.table.AppendMetadataCollector;
+import com.ververica.cdc.debezium.table.DebeziumChangelogMode;
+import com.ververica.cdc.debezium.table.DeserializationRuntimeConverter;
+import com.ververica.cdc.debezium.table.DeserializationRuntimeConverterFactory;
+import com.ververica.cdc.debezium.table.MetadataConverter;
+import com.ververica.cdc.debezium.utils.TemporalConversions;
+import io.debezium.data.Envelope;
+import io.debezium.data.SpecialValueDecimal;
+import io.debezium.data.VariableScaleDecimal;
+import io.debezium.time.MicroTime;
+import io.debezium.time.MicroTimestamp;
+import io.debezium.time.NanoTime;
+import io.debezium.time.NanoTimestamp;
+import io.debezium.time.Timestamp;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+import org.apache.kafka.connect.data.Decimal;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Deserialization schema from Debezium object to Flink Table/SQL internal 
data structure {@link
+ * RowData}.
+ * <p>
+ * Copy from com.ververica:flink-connector-debezium:2.3.0
+ */
+public final class RowDataDebeziumDeserializeSchema implements 
DebeziumDeserializationSchema<RowData> {
+
+    private static final long serialVersionUID = 2L;
+
+    /** Custom validator to validate the row value. */
+    public interface ValueValidator extends Serializable {
+
+        void validate(RowData rowData, RowKind rowKind) throws Exception;
+    }
+
+    /** TypeInformation of the produced {@link RowData}. * */
+    private final TypeInformation<RowData> resultTypeInfo;
+
+    /**
+     * Runtime converter that converts Kafka {@link SourceRecord}s into {@link 
RowData} consisted of
+     * physical column values.
+     */
+    private final DeserializationRuntimeConverter physicalConverter;
+
+    /** Whether the deserializer needs to handle metadata columns. */
+    private final boolean hasMetadata;
+
+    /**
+     * A wrapped output collector which is used to append metadata columns 
after physical columns.
+     */
+    private final AppendMetadataCollector appendMetadataCollector;
+
+    /** Validator to validate the row value. */
+    private final ValueValidator validator;
+
+    /** Changelog Mode to use for encoding changes in Flink internal data 
structure. */
+    private final DebeziumChangelogMode changelogMode;
+    private final SourceMetricData sourceMetricData;
+
+    /** Returns a builder to build {@link RowDataDebeziumDeserializeSchema}. */
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    RowDataDebeziumDeserializeSchema(
+            RowType physicalDataType,
+            MetadataConverter[] metadataConverters,
+            TypeInformation<RowData> resultTypeInfo,
+            ValueValidator validator,
+            ZoneId serverTimeZone,
+            DeserializationRuntimeConverterFactory userDefinedConverterFactory,
+            DebeziumChangelogMode changelogMode,
+            SourceMetricData sourceMetricData) {
+        this.hasMetadata = checkNotNull(metadataConverters).length > 0;
+        this.appendMetadataCollector = new 
AppendMetadataCollector(metadataConverters);
+        this.physicalConverter =
+                createConverter(
+                        checkNotNull(physicalDataType),
+                        serverTimeZone,
+                        userDefinedConverterFactory);
+        this.resultTypeInfo = checkNotNull(resultTypeInfo);
+        this.validator = checkNotNull(validator);
+        this.changelogMode = checkNotNull(changelogMode);
+        this.sourceMetricData = sourceMetricData;
+    }
+
+    @Override
+    public void deserialize(SourceRecord record, Collector<RowData> out) 
throws Exception {
+        Envelope.Operation op = Envelope.operationFor(record);
+        Struct value = (Struct) record.value();
+        Schema valueSchema = record.valueSchema();
+        if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
+            GenericRowData insert = extractAfterRow(value, valueSchema);
+            validator.validate(insert, RowKind.INSERT);
+            insert.setRowKind(RowKind.INSERT);
+            if (sourceMetricData != null) {
+                out = new MetricsCollector<>(out, sourceMetricData);
+            }
+            emit(record, insert, out);
+        } else if (op == Envelope.Operation.DELETE) {
+            GenericRowData delete = extractBeforeRow(value, valueSchema);
+            validator.validate(delete, RowKind.DELETE);
+            delete.setRowKind(RowKind.DELETE);
+            emit(record, delete, out);
+        } else {
+            if (changelogMode == DebeziumChangelogMode.ALL) {
+                GenericRowData before = extractBeforeRow(value, valueSchema);
+                validator.validate(before, RowKind.UPDATE_BEFORE);
+                before.setRowKind(RowKind.UPDATE_BEFORE);
+                emit(record, before, out);
+            }
+
+            GenericRowData after = extractAfterRow(value, valueSchema);
+            validator.validate(after, RowKind.UPDATE_AFTER);
+            after.setRowKind(RowKind.UPDATE_AFTER);
+            if (sourceMetricData != null) {
+                out = new MetricsCollector<>(out, sourceMetricData);
+            }
+            emit(record, after, out);
+        }
+    }
+
+    private GenericRowData extractAfterRow(Struct value, Schema valueSchema) 
throws Exception {
+        Schema afterSchema = 
valueSchema.field(Envelope.FieldName.AFTER).schema();
+        Struct after = value.getStruct(Envelope.FieldName.AFTER);
+        return (GenericRowData) physicalConverter.convert(after, afterSchema);
+    }
+
+    private GenericRowData extractBeforeRow(Struct value, Schema valueSchema) 
throws Exception {
+        Schema beforeSchema = 
valueSchema.field(Envelope.FieldName.BEFORE).schema();
+        Struct before = value.getStruct(Envelope.FieldName.BEFORE);
+        return (GenericRowData) physicalConverter.convert(before, 
beforeSchema);
+    }
+
+    private void emit(SourceRecord inRecord, RowData physicalRow, 
Collector<RowData> collector) {
+        if (!hasMetadata) {
+            collector.collect(physicalRow);
+            return;
+        }
+        appendMetadataCollector.inputRecord = inRecord;
+        appendMetadataCollector.outputCollector = collector;
+        appendMetadataCollector.collect(physicalRow);
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return resultTypeInfo;
+    }
+
+    // 
-------------------------------------------------------------------------------------
+    // Builder
+    // 
-------------------------------------------------------------------------------------
+
+    /** Builder of {@link RowDataDebeziumDeserializeSchema}. */
+    public static class Builder {
+
+        private RowType physicalRowType;
+        private TypeInformation<RowData> resultTypeInfo;
+        private MetadataConverter[] metadataConverters = new 
MetadataConverter[0];
+        private final ValueValidator validator = (rowData, rowKind) -> {
+        };
+        private ZoneId serverTimeZone = ZoneId.of("UTC");
+        private DeserializationRuntimeConverterFactory 
userDefinedConverterFactory =
+                DeserializationRuntimeConverterFactory.DEFAULT;
+        private final DebeziumChangelogMode changelogMode = 
DebeziumChangelogMode.ALL;
+        private SourceMetricData sourceMetricData;
+
+        public Builder setPhysicalRowType(RowType physicalRowType) {
+            this.physicalRowType = physicalRowType;
+            return this;
+        }
+
+        public Builder setMetadataConverters(MetadataConverter[] 
metadataConverters) {
+            this.metadataConverters = metadataConverters;
+            return this;
+        }
+
+        public Builder setResultTypeInfo(TypeInformation<RowData> 
resultTypeInfo) {
+            this.resultTypeInfo = resultTypeInfo;
+            return this;
+        }
+
+        public Builder setServerTimeZone(ZoneId serverTimeZone) {
+            this.serverTimeZone = serverTimeZone;
+            return this;
+        }
+
+        public Builder setUserDefinedConverterFactory(
+                DeserializationRuntimeConverterFactory 
userDefinedConverterFactory) {
+            this.userDefinedConverterFactory = userDefinedConverterFactory;
+            return this;
+        }
+
+        public Builder setSourceMetricData(SourceMetricData sourceMetricData) {
+            this.sourceMetricData = sourceMetricData;
+            return this;
+        }
+
+        public RowDataDebeziumDeserializeSchema build() {
+            return new RowDataDebeziumDeserializeSchema(
+                    physicalRowType,
+                    metadataConverters,
+                    resultTypeInfo,
+                    validator,
+                    serverTimeZone,
+                    userDefinedConverterFactory,
+                    changelogMode,
+                    sourceMetricData);
+        }
+    }
+
+    // 
-------------------------------------------------------------------------------------
+    // Runtime Converters
+    // 
-------------------------------------------------------------------------------------
+
+    /** Creates a runtime converter which is null safe. */
+    private static DeserializationRuntimeConverter createConverter(
+            LogicalType type,
+            ZoneId serverTimeZone,
+            DeserializationRuntimeConverterFactory 
userDefinedConverterFactory) {
+        return wrapIntoNullableConverter(
+                createNotNullConverter(type, serverTimeZone, 
userDefinedConverterFactory));
+    }
+
+    // 
--------------------------------------------------------------------------------
+    // IMPORTANT! We use anonymous classes instead of lambdas for a reason 
here. It is
+    // necessary because the maven shade plugin cannot relocate classes in
+    // SerializedLambdas (MSHADE-260).
+    // 
--------------------------------------------------------------------------------
+
+    /** Creates a runtime converter which assuming input object is not null. */
+    public static DeserializationRuntimeConverter createNotNullConverter(
+            LogicalType type,
+            ZoneId serverTimeZone,
+            DeserializationRuntimeConverterFactory 
userDefinedConverterFactory) {
+        // user defined converter has a higher resolve order
+        Optional<DeserializationRuntimeConverter> converter =
+                userDefinedConverterFactory.createUserDefinedConverter(type, 
serverTimeZone);
+        if (converter.isPresent()) {
+            return converter.get();
+        }
+
+        // if no matched user defined converter, fallback to the default 
converter
+        switch (type.getTypeRoot()) {
+            case NULL:
+                return new DeserializationRuntimeConverter() {
+
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(Object dbzObj, Schema schema) {
+                        return null;
+                    }
+                };
+            case BOOLEAN:
+                return convertToBoolean();
+            case TINYINT:
+                return new DeserializationRuntimeConverter() {
+
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(Object dbzObj, Schema schema) {
+                        return Byte.parseByte(dbzObj.toString());
+                    }
+                };
+            case SMALLINT:
+                return new DeserializationRuntimeConverter() {
+
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(Object dbzObj, Schema schema) {
+                        return Short.parseShort(dbzObj.toString());
+                    }
+                };
+            case INTEGER:
+            case INTERVAL_YEAR_MONTH:
+                return convertToInt();
+            case BIGINT:
+            case INTERVAL_DAY_TIME:
+                return convertToLong();
+            case DATE:
+                return convertToDate();
+            case TIME_WITHOUT_TIME_ZONE:
+                return convertToTime();
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return convertToTimestamp(serverTimeZone);
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return convertToLocalTimeZoneTimestamp(serverTimeZone);
+            case FLOAT:
+                return convertToFloat();
+            case DOUBLE:
+                return convertToDouble();
+            case CHAR:
+            case VARCHAR:
+                return convertToString();
+            case BINARY:
+            case VARBINARY:
+                return convertToBinary();
+            case DECIMAL:
+                return createDecimalConverter((DecimalType) type);
+            case ROW:
+                return createRowConverter(
+                        (RowType) type, serverTimeZone, 
userDefinedConverterFactory);
+            case ARRAY:
+            case MAP:
+            case MULTISET:
+            case RAW:
+            default:
+                throw new UnsupportedOperationException("Unsupported type: " + 
type);
+        }
+    }
+
+    private static DeserializationRuntimeConverter convertToBoolean() {
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                if (dbzObj instanceof Boolean) {
+                    return dbzObj;
+                } else if (dbzObj instanceof Byte) {
+                    return (byte) dbzObj == 1;
+                } else if (dbzObj instanceof Short) {
+                    return (short) dbzObj == 1;
+                } else {
+                    return Boolean.parseBoolean(dbzObj.toString());
+                }
+            }
+        };
+    }
+
+    private static DeserializationRuntimeConverter convertToInt() {
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                if (dbzObj instanceof Integer) {
+                    return dbzObj;
+                } else if (dbzObj instanceof Long) {
+                    return ((Long) dbzObj).intValue();
+                } else {
+                    return Integer.parseInt(dbzObj.toString());
+                }
+            }
+        };
+    }
+
+    private static DeserializationRuntimeConverter convertToLong() {
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                if (dbzObj instanceof Integer) {
+                    return ((Integer) dbzObj).longValue();
+                } else if (dbzObj instanceof Long) {
+                    return dbzObj;
+                } else {
+                    return Long.parseLong(dbzObj.toString());
+                }
+            }
+        };
+    }
+
+    private static DeserializationRuntimeConverter convertToDouble() {
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                if (dbzObj instanceof Float) {
+                    return ((Float) dbzObj).doubleValue();
+                } else if (dbzObj instanceof Double) {
+                    return dbzObj;
+                } else {
+                    return Double.parseDouble(dbzObj.toString());
+                }
+            }
+        };
+    }
+
+    private static DeserializationRuntimeConverter convertToFloat() {
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                if (dbzObj instanceof Float) {
+                    return dbzObj;
+                } else if (dbzObj instanceof Double) {
+                    return ((Double) dbzObj).floatValue();
+                } else {
+                    return Float.parseFloat(dbzObj.toString());
+                }
+            }
+        };
+    }
+
+    private static DeserializationRuntimeConverter convertToDate() {
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                return (int) 
TemporalConversions.toLocalDate(dbzObj).toEpochDay();
+            }
+        };
+    }
+
+    private static DeserializationRuntimeConverter convertToTime() {
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                if (dbzObj instanceof Long) {
+                    switch (schema.name()) {
+                        case MicroTime.SCHEMA_NAME:
+                            return (int) ((long) dbzObj / 1000);
+                        case NanoTime.SCHEMA_NAME:
+                            return (int) ((long) dbzObj / 1000_000);
+                    }
+                } else if (dbzObj instanceof Integer) {
+                    return dbzObj;
+                }
+                // get number of milliseconds of the day
+                return TemporalConversions.toLocalTime(dbzObj).toSecondOfDay() 
* 1000;
+            }
+        };
+    }
+
+    private static DeserializationRuntimeConverter convertToTimestamp(ZoneId 
serverTimeZone) {
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                if (dbzObj instanceof Long) {
+                    switch (schema.name()) {
+                        case Timestamp.SCHEMA_NAME:
+                            return TimestampData.fromEpochMillis((Long) 
dbzObj);
+                        case MicroTimestamp.SCHEMA_NAME:
+                            long micro = (long) dbzObj;
+                            return TimestampData.fromEpochMillis(
+                                    micro / 1000, (int) (micro % 1000 * 1000));
+                        case NanoTimestamp.SCHEMA_NAME:
+                            long nano = (long) dbzObj;
+                            return TimestampData.fromEpochMillis(
+                                    nano / 1000_000, (int) (nano % 1000_000));
+                    }
+                }
+                LocalDateTime localDateTime =
+                        TemporalConversions.toLocalDateTime(dbzObj, 
serverTimeZone);
+                return TimestampData.fromLocalDateTime(localDateTime);
+            }
+        };
+    }
+
+    private static DeserializationRuntimeConverter 
convertToLocalTimeZoneTimestamp(
+            ZoneId serverTimeZone) {
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                if (dbzObj instanceof String) {
+                    String str = (String) dbzObj;
+                    // TIMESTAMP_LTZ type is encoded in string type
+                    Instant instant = Instant.parse(str);
+                    return TimestampData.fromLocalDateTime(
+                            LocalDateTime.ofInstant(instant, serverTimeZone));
+                }
+                throw new IllegalArgumentException(
+                        "Unable to convert to TimestampData from unexpected 
value '"
+                                + dbzObj
+                                + "' of type "
+                                + dbzObj.getClass().getName());
+            }
+        };
+    }
+
+    private static DeserializationRuntimeConverter convertToString() {
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                return StringData.fromString(dbzObj.toString());
+            }
+        };
+    }
+
+    private static DeserializationRuntimeConverter convertToBinary() {
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                if (dbzObj instanceof byte[]) {
+                    return dbzObj;
+                } else if (dbzObj instanceof ByteBuffer) {
+                    ByteBuffer byteBuffer = (ByteBuffer) dbzObj;
+                    byte[] bytes = new byte[byteBuffer.remaining()];
+                    byteBuffer.get(bytes);
+                    return bytes;
+                } else {
+                    throw new UnsupportedOperationException(
+                            "Unsupported BYTES value type: " + 
dbzObj.getClass().getSimpleName());
+                }
+            }
+        };
+    }
+
+    private static DeserializationRuntimeConverter 
createDecimalConverter(DecimalType decimalType) {
+        final int precision = decimalType.getPrecision();
+        final int scale = decimalType.getScale();
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                BigDecimal bigDecimal;
+                if (dbzObj instanceof byte[]) {
+                    // decimal.handling.mode=precise
+                    bigDecimal = Decimal.toLogical(schema, (byte[]) dbzObj);
+                } else if (dbzObj instanceof String) {
+                    // decimal.handling.mode=string
+                    bigDecimal = new BigDecimal((String) dbzObj);
+                } else if (dbzObj instanceof Double) {
+                    // decimal.handling.mode=double
+                    bigDecimal = BigDecimal.valueOf((Double) dbzObj);
+                } else {
+                    if 
(VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) {
+                        SpecialValueDecimal decimal =
+                                VariableScaleDecimal.toLogical((Struct) 
dbzObj);
+                        bigDecimal = 
decimal.getDecimalValue().orElse(BigDecimal.ZERO);
+                    } else {
+                        // fallback to string
+                        bigDecimal = new BigDecimal(dbzObj.toString());
+                    }
+                }
+                return DecimalData.fromBigDecimal(bigDecimal, precision, 
scale);
+            }
+        };
+    }
+
+    private static DeserializationRuntimeConverter createRowConverter(
+            RowType rowType,
+            ZoneId serverTimeZone,
+            DeserializationRuntimeConverterFactory 
userDefinedConverterFactory) {
+        final DeserializationRuntimeConverter[] fieldConverters =
+                rowType.getFields().stream()
+                        .map(RowType.RowField::getType)
+                        .map(
+                                logicType -> createConverter(
+                                        logicType,
+                                        serverTimeZone,
+                                        userDefinedConverterFactory))
+                        .toArray(DeserializationRuntimeConverter[]::new);
+        final String[] fieldNames = rowType.getFieldNames().toArray(new 
String[0]);
+
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) throws 
Exception {
+                Struct struct = (Struct) dbzObj;
+                int arity = fieldNames.length;
+                GenericRowData row = new GenericRowData(arity);
+                for (int i = 0; i < arity; i++) {
+                    String fieldName = fieldNames[i];
+                    Field field = schema.field(fieldName);
+                    if (field == null) {
+                        row.setField(i, null);
+                    } else {
+                        Object fieldValue = 
struct.getWithoutDefault(fieldName);
+                        Schema fieldSchema = schema.field(fieldName).schema();
+                        Object convertedField =
+                                convertField(fieldConverters[i], fieldValue, 
fieldSchema);
+                        row.setField(i, convertedField);
+                    }
+                }
+                return row;
+            }
+        };
+    }
+
+    private static Object convertField(
+            DeserializationRuntimeConverter fieldConverter, Object fieldValue, 
Schema fieldSchema)
+            throws Exception {
+        if (fieldValue == null) {
+            return null;
+        } else {
+            return fieldConverter.convert(fieldValue, fieldSchema);
+        }
+    }
+
+    private static DeserializationRuntimeConverter wrapIntoNullableConverter(
+            DeserializationRuntimeConverter converter) {
+        return new DeserializationRuntimeConverter() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) throws 
Exception {
+                if (dbzObj == null) {
+                    return null;
+                }
+                return converter.convert(dbzObj, schema);
+            }
+        };
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerTableSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerTableSource.java
new file mode 100644
index 0000000000..635ea46529
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerTableSource.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.sqlserver;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+
+import com.ververica.cdc.connectors.sqlserver.SqlServerSource;
+import 
com.ververica.cdc.connectors.sqlserver.table.SqlServerDeserializationConverterFactory;
+import com.ververica.cdc.connectors.sqlserver.table.SqlServerReadableMetadata;
+import com.ververica.cdc.connectors.sqlserver.table.StartupOptions;
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+import com.ververica.cdc.debezium.DebeziumSourceFunction;
+import com.ververica.cdc.debezium.table.MetadataConverter;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+
+import java.time.ZoneId;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link DynamicTableSource} that describes how to create a SqlServer 
source from a logical
+ * description.
+ * <p>
+ * Copy from com.ververica:flink-connector-sqlserver-cdc:2.3.0
+ */
+public class SqlServerTableSource implements ScanTableSource, 
SupportsReadingMetadata {
+
+    private final ResolvedSchema physicalSchema;
+    private final int port;
+    private final String hostname;
+    private final String database;
+    private final String schemaName;
+    private final String tableName;
+    private final ZoneId serverTimeZone;
+    private final String username;
+    private final String password;
+    private final Properties dbzProperties;
+    private final StartupOptions startupOptions;
+
+    // 
--------------------------------------------------------------------------------------------
+    // Mutable attributes
+    // 
--------------------------------------------------------------------------------------------
+
+    /** Data type that describes the final output of the source. */
+    protected DataType producedDataType;
+
+    /** Metadata that is appended at the end of a physical source row. */
+    protected List<String> metadataKeys;
+    private final MetricOption metricOption;
+
+    public SqlServerTableSource(
+            ResolvedSchema physicalSchema,
+            int port,
+            String hostname,
+            String database,
+            String schemaName,
+            String tableName,
+            ZoneId serverTimeZone,
+            String username,
+            String password,
+            Properties dbzProperties,
+            StartupOptions startupOptions,
+            MetricOption metricOption) {
+        this.physicalSchema = physicalSchema;
+        this.port = port;
+        this.hostname = checkNotNull(hostname);
+        this.database = checkNotNull(database);
+        this.schemaName = checkNotNull(schemaName);
+        this.tableName = checkNotNull(tableName);
+        this.serverTimeZone = serverTimeZone;
+        this.username = checkNotNull(username);
+        this.password = checkNotNull(password);
+        this.dbzProperties = dbzProperties;
+        this.producedDataType = physicalSchema.toPhysicalRowDataType();
+        this.metadataKeys = Collections.emptyList();
+        this.startupOptions = startupOptions;
+        this.metricOption = metricOption;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return ChangelogMode.newBuilder()
+                .addContainedKind(RowKind.INSERT)
+                .addContainedKind(RowKind.UPDATE_BEFORE)
+                .addContainedKind(RowKind.UPDATE_AFTER)
+                .addContainedKind(RowKind.DELETE)
+                .build();
+    }
+
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) 
{
+        RowType physicalDataType =
+                (RowType) 
physicalSchema.toPhysicalRowDataType().getLogicalType();
+        MetadataConverter[] metadataConverters = getMetadataConverters();
+        TypeInformation<RowData> typeInfo = 
scanContext.createTypeInformation(producedDataType);
+
+        DebeziumDeserializationSchema<RowData> deserializer =
+                RowDataDebeziumDeserializeSchema.newBuilder()
+                        .setPhysicalRowType(physicalDataType)
+                        .setMetadataConverters(metadataConverters)
+                        .setResultTypeInfo(typeInfo)
+                        .setServerTimeZone(serverTimeZone)
+                        .setUserDefinedConverterFactory(
+                                
SqlServerDeserializationConverterFactory.instance())
+                        .setSourceMetricData(metricOption == null ? null : new 
SourceMetricData(metricOption))
+                        .build();
+        DebeziumSourceFunction<RowData> sourceFunction =
+                SqlServerSource.<RowData>builder()
+                        .hostname(hostname)
+                        .port(port)
+                        .database(database)
+                        .tableList(schemaName + "." + tableName)
+                        .username(username)
+                        .password(password)
+                        .debeziumProperties(dbzProperties)
+                        .startupOptions(startupOptions)
+                        .deserializer(deserializer)
+                        .build();
+        return SourceFunctionProvider.of(sourceFunction, false);
+    }
+
+    private MetadataConverter[] getMetadataConverters() {
+        if (metadataKeys.isEmpty()) {
+            return new MetadataConverter[0];
+        }
+
+        return metadataKeys.stream()
+                .map(
+                        key -> Stream.of(SqlServerReadableMetadata.values())
+                                .filter(m -> m.getKey().equals(key))
+                                .findFirst()
+                                .orElseThrow(IllegalStateException::new))
+                .map(SqlServerReadableMetadata::getConverter)
+                .toArray(MetadataConverter[]::new);
+    }
+
+    @Override
+    public DynamicTableSource copy() {
+        SqlServerTableSource source =
+                new SqlServerTableSource(
+                        physicalSchema,
+                        port,
+                        hostname,
+                        database,
+                        schemaName,
+                        tableName,
+                        serverTimeZone,
+                        username,
+                        password,
+                        dbzProperties,
+                        startupOptions,
+                        metricOption);
+        source.metadataKeys = metadataKeys;
+        source.producedDataType = producedDataType;
+        return source;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        SqlServerTableSource that = (SqlServerTableSource) o;
+        return port == that.port
+                && Objects.equals(physicalSchema, that.physicalSchema)
+                && Objects.equals(hostname, that.hostname)
+                && Objects.equals(database, that.database)
+                && Objects.equals(schemaName, that.schemaName)
+                && Objects.equals(tableName, that.tableName)
+                && Objects.equals(serverTimeZone, that.serverTimeZone)
+                && Objects.equals(username, that.username)
+                && Objects.equals(password, that.password)
+                && Objects.equals(dbzProperties, that.dbzProperties)
+                && Objects.equals(startupOptions, that.startupOptions)
+                && Objects.equals(producedDataType, that.producedDataType)
+                && Objects.equals(metadataKeys, that.metadataKeys)
+                && Objects.equals(metricOption, that.metricOption);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                physicalSchema,
+                port,
+                hostname,
+                database,
+                schemaName,
+                tableName,
+                serverTimeZone,
+                username,
+                password,
+                dbzProperties,
+                startupOptions,
+                producedDataType,
+                metadataKeys,
+                metricOption);
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "SqlServer-CDC";
+    }
+
+    @Override
+    public Map<String, DataType> listReadableMetadata() {
+        return Stream.of(SqlServerReadableMetadata.values())
+                .collect(
+                        Collectors.toMap(
+                                SqlServerReadableMetadata::getKey,
+                                SqlServerReadableMetadata::getDataType));
+    }
+
+    @Override
+    public void applyReadableMetadata(List<String> metadataKeys, DataType 
producedDataType) {
+        this.metadataKeys = metadataKeys;
+        this.producedDataType = producedDataType;
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlserverTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlserverTableFactory.java
index bab8511709..dfa28d6376 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlserverTableFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlserverTableFactory.java
@@ -17,7 +17,8 @@
 
 package org.apache.inlong.sort.sqlserver;
 
-import com.ververica.cdc.connectors.sqlserver.table.SqlServerTableSource;
+import org.apache.inlong.sort.base.metric.MetricOption;
+
 import com.ververica.cdc.connectors.sqlserver.table.StartupOptions;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
@@ -35,6 +36,7 @@ import java.util.Set;
 import static 
com.ververica.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX;
 import static 
com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
 import static 
com.ververica.cdc.debezium.utils.ResolvedSchemaUtils.getPhysicalSchema;
+import static org.apache.inlong.sort.base.Constants.*;
 
 /** Factory for creating configured instance of {@link 
com.ververica.cdc.connectors.sqlserver.SqlServerSource}. */
 public class SqlserverTableFactory implements DynamicTableSourceFactory {
@@ -122,6 +124,9 @@ public class SqlserverTableFactory implements 
DynamicTableSourceFactory {
         options.add(PORT);
         options.add(SERVER_TIME_ZONE);
         options.add(SCAN_STARTUP_MODE);
+        options.add(INLONG_METRIC);
+        options.add(INLONG_AUDIT);
+        options.add(AUDIT_KEYS);
 
         return options;
     }
@@ -144,6 +149,16 @@ public class SqlserverTableFactory implements 
DynamicTableSourceFactory {
         ResolvedSchema physicalSchema =
                 
getPhysicalSchema(context.getCatalogTable().getResolvedSchema());
 
+        String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
+        String auditHostAndPorts = config.get(INLONG_AUDIT);
+        String auditKeys = config.get(AUDIT_KEYS);
+
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withAuditAddress(auditHostAndPorts)
+                .withAuditKeys(auditKeys)
+                .build();
+
         return new SqlServerTableSource(
                 physicalSchema,
                 port,
@@ -155,7 +170,8 @@ public class SqlserverTableFactory implements 
DynamicTableSourceFactory {
                 username,
                 password,
                 getDebeziumProperties(context.getCatalogTable().getOptions()),
-                getStartupOptions(config));
+                getStartupOptions(config),
+                metricOption);
     }
 
     private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial";
diff --git a/licenses/inlong-sort-connectors/LICENSE 
b/licenses/inlong-sort-connectors/LICENSE
index 4c8eca6491..970ef97ce6 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -850,6 +850,11 @@
     Source  : org.apache.flink:flink-connector-pulsar:4.1.0-1.18 (Please note 
that the software have been modified.)
     License : 
https://github.com/apache/flink-connector-pulsar/blob/main/LICENSE
 
+1.3.24 
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerTableSource.java
+Source  : com.ververica:flink-connector-mongodb-cdc:2.3.0 (Please note that 
the software have been modified.)
+License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
+
 =======================================================================
 Apache InLong Subcomponents:
 


Reply via email to