This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b4b9ec  add sqlserver cdc (#181)
7b4b9ec is described below

commit 7b4b9ec7e3869bc2800ce83a6ee9d4a51b536ec8
Author: wudi <676366...@qq.com>
AuthorDate: Thu Aug 24 14:46:24 2023 +0800

    add sqlserver cdc (#181)
---
 flink-doris-connector/pom.xml                      |   6 +
 .../org/apache/doris/flink/tools/cdc/CdcTools.java |  13 ++
 .../flink/tools/cdc/postgres/PostgresType.java     |   2 +-
 .../tools/cdc/sqlserver/SqlServerDatabaseSync.java | 178 +++++++++++++++++++++
 .../cdc/sqlserver/SqlServerDateConverter.java      |  97 +++++++++++
 .../flink/tools/cdc/sqlserver/SqlServerSchema.java |  33 ++++
 .../flink/tools/cdc/sqlserver/SqlServerType.java   |  94 +++++++++++
 .../tools/cdc/CdcSqlServerSyncDatabaseCase.java    |  82 ++++++++++
 8 files changed, 504 insertions(+), 1 deletion(-)

diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index 73a748c..333a40b 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -260,6 +260,12 @@ under the License.
             <version>2.4.1</version>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>com.ververica</groupId>
+            <artifactId>flink-sql-connector-sqlserver-cdc</artifactId>
+            <version>2.4.1</version>
+            <scope>provided</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-runtime-web</artifactId>
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
index 754dbce..6a390ea 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -20,6 +20,7 @@ package org.apache.doris.flink.tools.cdc;
 import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
 import org.apache.doris.flink.tools.cdc.oracle.OracleDatabaseSync;
 import org.apache.doris.flink.tools.cdc.postgres.PostgresDatabaseSync;
+import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerDatabaseSync;
 import org.apache.flink.api.java.utils.MultipleParameterTool;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -37,6 +38,7 @@ public class CdcTools {
     private static final String MYSQL_SYNC_DATABASE = "mysql-sync-database";
     private static final String ORACLE_SYNC_DATABASE = "oracle-sync-database";
     private static final String POSTGRES_SYNC_DATABASE = 
"postgres-sync-database";
+    private static final String SQLSERVER_SYNC_DATABASE = 
"sqlserver-sync-database";
     private static final List<String> EMPTY_KEYS = Arrays.asList("password");
 
     public static void main(String[] args) throws Exception {
@@ -53,6 +55,9 @@ public class CdcTools {
             case POSTGRES_SYNC_DATABASE:
                 createPostgresSyncDatabase(opArgs);
                 break;
+            case SQLSERVER_SYNC_DATABASE:
+                createSqlServerSyncDatabase(opArgs);
+                break;
             default:
                 System.out.println("Unknown operation " + operation);
                 System.exit(1);
@@ -83,6 +88,14 @@ public class CdcTools {
         syncDatabase(params, databaseSync, postgresConfig, "Postgres");
     }
 
+    private static void createSqlServerSyncDatabase(String[] opArgs) throws 
Exception {
+        MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
+        Map<String, String> postgresMap = getConfigMap(params, 
"sqlserver-conf");
+        Configuration postgresConfig = Configuration.fromMap(postgresMap);
+        DatabaseSync databaseSync = new SqlServerDatabaseSync();
+        syncDatabase(params, databaseSync, postgresConfig, "SqlServer");
+    }
+
     private static void syncDatabase(MultipleParameterTool params, 
DatabaseSync databaseSync, Configuration config, String type) throws Exception {
         String jobName = params.get("job-name");
         String database = params.get("database");
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresType.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresType.java
index 87cbde2..8886c09 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresType.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresType.java
@@ -78,7 +78,7 @@ public class PostgresType {
             case BIGSERIAL:
                 return DorisType.BIGINT;
             case NUMERIC:
-                return precision != null && precision <= 38
+                return precision != null && precision > 0 && precision <= 38
                         ? String.format("%s(%s,%s)", DorisType.DECIMAL_V3, 
precision, scale != null && scale >= 0 ? scale : 0)
                         : DorisType.STRING;
             case FLOAT4:
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
new file mode 100644
index 0000000..6cf9c9d
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc.sqlserver;
+
+
+import com.ververica.cdc.connectors.base.options.JdbcSourceOptions;
+import com.ververica.cdc.connectors.base.options.SourceOptions;
+import com.ververica.cdc.connectors.base.options.StartupOptions;
+import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
+import com.ververica.cdc.connectors.sqlserver.SqlServerSource;
+import com.ververica.cdc.connectors.sqlserver.source.SqlServerSourceBuilder;
+import com.ververica.cdc.debezium.DebeziumSourceFunction;
+import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
+import com.ververica.cdc.debezium.table.DebeziumOptions;
+import org.apache.doris.flink.catalog.doris.DataModel;
+import org.apache.doris.flink.tools.cdc.DatabaseSync;
+import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static 
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE;
+import static 
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_MAX_RETRIES;
+import static 
com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT;
+import static 
com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
+import static 
com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
+import static 
com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
+import static 
com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
+import static 
com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
+
+public class SqlServerDatabaseSync extends DatabaseSync {
+    private static final Logger LOG = 
LoggerFactory.getLogger(SqlServerDatabaseSync.class);
+    private static String JDBC_URL = "jdbc:sqlserver://%s:%d;database=%s";
+    private static String PORT = "port";
+
+    public SqlServerDatabaseSync() {
+    }
+
+    @Override
+    public Connection getConnection() throws SQLException {
+        String jdbcUrl = String.format(JDBC_URL, 
config.get(JdbcSourceOptions.HOSTNAME), config.getInteger(PORT, 
1433),config.get(JdbcSourceOptions.DATABASE_NAME));
+        Properties pro = new Properties();
+        pro.setProperty("user", config.get(JdbcSourceOptions.USERNAME));
+        pro.setProperty("password", config.get(JdbcSourceOptions.PASSWORD));
+        return DriverManager.getConnection(jdbcUrl, pro);
+    }
+
+    @Override
+    public List<SourceSchema> getSchemaList() throws Exception {
+        String databaseName = config.get(JdbcSourceOptions.DATABASE_NAME);
+        String schemaName = config.get(JdbcSourceOptions.SCHEMA_NAME);
+        List<SourceSchema> schemaList = new ArrayList<>();
+        LOG.info("database-name {}, schema-name {}", databaseName, schemaName);
+        try (Connection conn = getConnection()) {
+            DatabaseMetaData metaData = conn.getMetaData();
+            try (ResultSet tables =
+                         metaData.getTables(databaseName, null, "%", new 
String[]{"TABLE"})) {
+                while (tables.next()) {
+                    String tableName = tables.getString("TABLE_NAME");
+                    String tableComment = tables.getString("REMARKS");
+                    if (!isSyncNeeded(tableName)) {
+                        continue;
+                    }
+                    SourceSchema sourceSchema =
+                            new SqlServerSchema(metaData, databaseName, null, 
tableName, tableComment);
+                    sourceSchema.setModel(sourceSchema.primaryKeys.size() > 0 
? DataModel.UNIQUE : DataModel.DUPLICATE);
+                    schemaList.add(sourceSchema);
+                }
+            }
+        }
+        return schemaList;
+    }
+
+    @Override
+    public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment 
env) {
+        String databaseName = config.get(JdbcSourceOptions.DATABASE_NAME);
+        String schemaName = config.get(JdbcSourceOptions.SCHEMA_NAME);
+        Preconditions.checkNotNull(databaseName, "database-name in sqlserver 
is required");
+        Preconditions.checkNotNull(databaseName, "schema-name in sqlserver is 
required");
+
+        String tableName = config.get(JdbcSourceOptions.TABLE_NAME);
+        String hostname = config.get(JdbcSourceOptions.HOSTNAME);
+        Integer port = config.getInteger(PORT, 1433);
+        String username = config.get(JdbcSourceOptions.USERNAME);
+        String password = config.get(JdbcSourceOptions.PASSWORD);
+
+        StartupOptions startupOptions = StartupOptions.initial();
+        String startupMode = config.get(JdbcSourceOptions.SCAN_STARTUP_MODE);
+        if ("initial".equalsIgnoreCase(startupMode)) {
+            startupOptions = StartupOptions.initial();
+        } else if ("latest-offset".equalsIgnoreCase(startupMode)) {
+            startupOptions = StartupOptions.latest();
+        }
+
+        //debezium properties set
+        Properties debeziumProperties = new Properties();
+        debeziumProperties.putAll(SqlServerDateConverter.DEFAULT_PROPS);
+        debeziumProperties.put("decimal.handling.mode", "string");
+
+        for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
+            String key = entry.getKey();
+            String value = entry.getValue();
+            if (key.startsWith(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX)) {
+                debeziumProperties.put(
+                        
key.substring(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX.length()), value);
+            }
+        }
+
+        Map<String, Object> customConverterConfigs = new HashMap<>();
+        JsonDebeziumDeserializationSchema schema =
+                new JsonDebeziumDeserializationSchema(false, 
customConverterConfigs);
+
+        if(config.getBoolean(SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED, 
false)){
+            JdbcIncrementalSource<String> incrSource = 
SqlServerSourceBuilder.SqlServerIncrementalSource.<String>builder()
+                    .hostname(hostname)
+                    .port(port)
+                    .databaseList(databaseName)
+                    .tableList(schemaName + "." + tableName)
+                    .username(username)
+                    .password(password)
+                    .startupOptions(startupOptions)
+                    .deserializer(schema)
+                    .includeSchemaChanges(true)
+                    .debeziumProperties(debeziumProperties)
+                    
.splitSize(config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE))
+                    .splitMetaGroupSize(config.get(CHUNK_META_GROUP_SIZE))
+                    .fetchSize(config.get(SCAN_SNAPSHOT_FETCH_SIZE))
+                    .connectTimeout(config.get(CONNECT_TIMEOUT))
+                    .connectionPoolSize(config.get(CONNECTION_POOL_SIZE))
+                    .connectMaxRetries(config.get(CONNECT_MAX_RETRIES))
+                    
.distributionFactorUpper(config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND))
+                    
.distributionFactorLower(config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND))
+                    .build();
+            return env.fromSource(incrSource,  
WatermarkStrategy.noWatermarks(), "SqlServer IncrSource");
+        }else{
+            DebeziumSourceFunction<String> sqlServerSource = 
SqlServerSource.<String>builder()
+                    .hostname(hostname)
+                    .port(port)
+                    .database(databaseName)
+                    .tableList(schemaName + "." + tableName)
+                    .username(username)
+                    .password(password)
+                    .debeziumProperties(debeziumProperties)
+                    .startupOptions(startupOptions)
+                    .deserializer(schema)
+                    .build();
+            return env.addSource(sqlServerSource, "SqlServer Source");
+        }
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDateConverter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDateConverter.java
new file mode 100644
index 0000000..9f8a4a0
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDateConverter.java
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc.sqlserver;
+
+import 
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
+import io.debezium.spi.converter.CustomConverter;
+import io.debezium.spi.converter.RelationalColumn;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.DateTimeException;
+import java.time.format.DateTimeFormatter;
+import java.util.Properties;
+import java.util.function.Consumer;
+
+public class SqlServerDateConverter implements CustomConverter<SchemaBuilder, 
RelationalColumn> {
+    private static final Logger log = 
LoggerFactory.getLogger(SqlServerDateConverter.class);
+    private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;
+    private DateTimeFormatter timestampFormatter = 
DateTimeFormatter.ISO_DATE_TIME;
+
+    public static Properties DEFAULT_PROPS = new Properties();
+
+    static {
+        DEFAULT_PROPS.setProperty("converters", "date");
+        DEFAULT_PROPS.setProperty("date.type", 
"org.apache.doris.flink.tools.cdc.sqlserver.SqlServerDateConverter");
+        DEFAULT_PROPS.setProperty("date.format.date", "yyyy-MM-dd");
+        DEFAULT_PROPS.setProperty("date.format.timestamp", "yyyy-MM-dd 
HH:mm:ss.SSSSSS");
+    }
+
+    @Override
+    public void configure(Properties props) {
+        readProps(props, "format.date", p -> dateFormatter = 
DateTimeFormatter.ofPattern(p));
+        readProps(props, "format.timestamp", p -> timestampFormatter = 
DateTimeFormatter.ofPattern(p));
+    }
+
+    private void readProps(Properties properties, String settingKey, 
Consumer<String> callback) {
+        String settingValue = (String) properties.get(settingKey);
+        if (settingValue == null || settingValue.length() == 0) {
+            return;
+        }
+        try {
+            callback.accept(settingValue.trim());
+        } catch (IllegalArgumentException | DateTimeException e) {
+            log.error("setting {} is illegal:{}", settingKey, settingValue);
+            throw e;
+        }
+    }
+
+    @Override
+    public void converterFor(RelationalColumn column, 
CustomConverter.ConverterRegistration<SchemaBuilder> registration) {
+        String sqlType = column.typeName().toUpperCase();
+        SchemaBuilder schemaBuilder = null;
+        CustomConverter.Converter converter = null;
+        if ("DATE".equals(sqlType)) {
+            schemaBuilder = SchemaBuilder.string().optional();
+            converter = this::convertDate;
+        }
+        if ("SMALLDATETIME".equals(sqlType) || "DATETIME".equals(sqlType) || 
"DATETIME2".equals(sqlType)) {
+            schemaBuilder = SchemaBuilder.string().optional();
+            converter = this::convertDateTime;
+        }
+        if (schemaBuilder != null) {
+            registration.register(schemaBuilder, converter);
+        }
+    }
+
+    private Object convertDateTime(Object input) {
+        if (input instanceof Timestamp) {
+            return timestampFormatter.format(((Timestamp) 
input).toLocalDateTime());
+        }
+        return null;
+    }
+
+    private String convertDate(Object input) {
+        if (input instanceof Date){
+            return dateFormatter.format(((Date) input).toLocalDate());
+        }
+        return null;
+    }
+
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerSchema.java
new file mode 100644
index 0000000..36ecb42
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerSchema.java
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc.sqlserver;
+
+import org.apache.doris.flink.tools.cdc.SourceSchema;
+
+import java.sql.DatabaseMetaData;
+
+public class SqlServerSchema extends SourceSchema {
+
+    public SqlServerSchema(DatabaseMetaData metaData, String databaseName, 
String schemaName, String tableName, String tableComment) throws Exception {
+        super(metaData, databaseName, schemaName,  tableName, tableComment);
+    }
+
+    @Override
+    public String convertToDorisType(String fieldType, Integer precision, 
Integer scale) {
+        return SqlServerType.toDorisType(fieldType, precision, scale);
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java
new file mode 100644
index 0000000..aedb16f
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc.sqlserver;
+
+import org.apache.doris.flink.catalog.doris.DorisType;
+
+public class SqlServerType {
+    private static final String BIT = "bit";
+    private static final String TINYINT = "tinyint";
+    private static final String SMALLINT = "smallint";
+    private static final String INT = "int";
+    private static final String BIGINT = "bigint";
+    private static final String REAL = "real";
+    private static final String FLOAT = "float";
+    private static final String MONEY = "money";
+    private static final String SMALLMONEY = "smallmoney";
+    private static final String DECIMAL = "decimal";
+    private static final String NUMERIC = "numeric";
+    private static final String DATE = "date";
+    private static final String DATETIME = "datetime";
+    private static final String DATETIME2 = "datetime2";
+    private static final String SMALLDATETIME = "smalldatetime";
+    private static final String CHAR = "char";
+    private static final String VARCHAR = "varchar";
+    private static final String NCHAR = "nchar";
+    private static final String NVARCHAR = "nvarchar";
+    private static final String TEXT = "text";
+    private static final String NTEXT = "ntext";
+    private static final String TIME = "time";
+    private static final String DATETIMEOFFSET = "datetimeoffset";
+    private static final String IMAGE = "image";
+    private static final String BINARY = "binary";
+    private static final String VARBINARY = "varbinary";
+
+    public static String toDorisType(String sqlServerType, Integer precision, 
Integer scale) {
+        sqlServerType = sqlServerType.toLowerCase();
+        switch (sqlServerType){
+            case BIT:
+                return DorisType.BOOLEAN;
+            case TINYINT:
+                return DorisType.TINYINT;
+            case SMALLINT:
+                return DorisType.SMALLINT;
+            case INT:
+                return DorisType.INT;
+            case BIGINT:
+                return DorisType.BIGINT;
+            case REAL:
+                return DorisType.FLOAT;
+            case FLOAT:
+                return DorisType.DOUBLE;
+            case MONEY:
+                return String.format("%s(%s,%s)", DorisType.DECIMAL_V3, 19, 4);
+            case SMALLMONEY:
+                return String.format("%s(%s,%s)", DorisType.DECIMAL_V3, 10, 4);
+            case NUMERIC:
+                return precision != null && precision > 0 && precision <= 38
+                        ? String.format("%s(%s,%s)", DorisType.DECIMAL_V3, 
precision, scale != null && scale >= 0 ? scale : 0)
+                        : DorisType.STRING;
+            case DATE:
+                return DorisType.DATE_V2;
+            case DATETIME:
+            case DATETIME2:
+            case SMALLDATETIME:
+                return String.format("%s(%s)", DorisType.DATETIME_V2, 
Math.min(scale == null ? 0 : scale, 6));
+            case CHAR:
+            case VARCHAR:
+            case NCHAR:
+            case NVARCHAR:
+                return precision * 3 > 65533 ? DorisType.STRING : 
String.format("%s(%s)", DorisType.VARCHAR, precision * 3);
+            case TEXT:
+            case NTEXT:
+            case TIME:
+            case DATETIMEOFFSET:
+                return DorisType.STRING;
+            default:
+                throw new UnsupportedOperationException("Unsupported SqlServer 
Type: " + sqlServerType);
+        }
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
new file mode 100644
index 0000000..96780aa
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc;
+
+import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerDatabaseSync;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+public class CdcSqlServerSyncDatabaseCase {
+
+    public static void main(String[] args) throws Exception{
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.disableOperatorChaining();
+        env.enableCheckpointing(10000);
+
+//        Map<String,String> flinkMap = new HashMap<>();
+//        flinkMap.put("execution.checkpointing.interval","10s");
+//        flinkMap.put("pipeline.operator-chaining","false");
+//        flinkMap.put("parallelism.default","1");
+//
+//
+//        Configuration configuration = Configuration.fromMap(flinkMap);
+//        env.configure(configuration);
+
+        String database = "db2";
+        String tablePrefix = "";
+        String tableSuffix = "";
+        Map<String,String> sourceConfig = new HashMap<>();
+        sourceConfig.put("database-name","CDC_DB");
+        sourceConfig.put("schema-name","dbo");
+        sourceConfig.put("hostname","127.0.0.1");
+        sourceConfig.put("port","1433");
+        sourceConfig.put("username","sa");
+        sourceConfig.put("password","123456");
+//        
sourceConfig.put("debezium.database.tablename.case.insensitive","false");
+//        sourceConfig.put("scan.incremental.snapshot.enabled","true");
+//        sourceConfig.put("debezium.include.schema.changes","false");
+
+        Configuration config = Configuration.fromMap(sourceConfig);
+
+        Map<String,String> sinkConfig = new HashMap<>();
+        sinkConfig.put("fenodes","127.0.0.1:8030");
+        sinkConfig.put("username","root");
+        sinkConfig.put("password","");
+        sinkConfig.put("jdbc-url","jdbc:mysql://127.0.0.1:9030");
+        sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString());
+        Configuration sinkConf = Configuration.fromMap(sinkConfig);
+
+        Map<String,String> tableConfig = new HashMap<>();
+        tableConfig.put("replication_num", "1");
+
+        String includingTables = "products_test";
+        String excludingTables = "";
+        boolean ignoreDefaultValue = false;
+        boolean useNewSchemaChange = false;
+        DatabaseSync databaseSync = new SqlServerDatabaseSync();
+        
databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig,
 false, useNewSchemaChange);
+        databaseSync.build();
+        env.execute(String.format("Postgres-Doris Database Sync: %s", 
database));
+
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to