This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 7b4b9ec add sqlserver cdc (#181) 7b4b9ec is described below commit 7b4b9ec7e3869bc2800ce83a6ee9d4a51b536ec8 Author: wudi <676366...@qq.com> AuthorDate: Thu Aug 24 14:46:24 2023 +0800 add sqlserver cdc (#181) --- flink-doris-connector/pom.xml | 6 + .../org/apache/doris/flink/tools/cdc/CdcTools.java | 13 ++ .../flink/tools/cdc/postgres/PostgresType.java | 2 +- .../tools/cdc/sqlserver/SqlServerDatabaseSync.java | 178 +++++++++++++++++++++ .../cdc/sqlserver/SqlServerDateConverter.java | 97 +++++++++++ .../flink/tools/cdc/sqlserver/SqlServerSchema.java | 33 ++++ .../flink/tools/cdc/sqlserver/SqlServerType.java | 94 +++++++++++ .../tools/cdc/CdcSqlServerSyncDatabaseCase.java | 82 ++++++++++ 8 files changed, 504 insertions(+), 1 deletion(-) diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml index 73a748c..333a40b 100644 --- a/flink-doris-connector/pom.xml +++ b/flink-doris-connector/pom.xml @@ -260,6 +260,12 @@ under the License. <version>2.4.1</version> <scope>provided</scope> </dependency> + <dependency> + <groupId>com.ververica</groupId> + <artifactId>flink-sql-connector-sqlserver-cdc</artifactId> + <version>2.4.1</version> + <scope>provided</scope> + </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web</artifactId> diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java index 754dbce..6a390ea 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java @@ -20,6 +20,7 @@ package org.apache.doris.flink.tools.cdc; import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync; import org.apache.doris.flink.tools.cdc.oracle.OracleDatabaseSync; import org.apache.doris.flink.tools.cdc.postgres.PostgresDatabaseSync; +import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerDatabaseSync; import org.apache.flink.api.java.utils.MultipleParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -37,6 +38,7 @@ public class CdcTools { private static final String MYSQL_SYNC_DATABASE = "mysql-sync-database"; private static final String ORACLE_SYNC_DATABASE = "oracle-sync-database"; private static final String POSTGRES_SYNC_DATABASE = "postgres-sync-database"; + private static final String SQLSERVER_SYNC_DATABASE = "sqlserver-sync-database"; private static final List<String> EMPTY_KEYS = Arrays.asList("password"); public static void main(String[] args) throws Exception { @@ -53,6 +55,9 @@ public class CdcTools { case POSTGRES_SYNC_DATABASE: createPostgresSyncDatabase(opArgs); break; + case SQLSERVER_SYNC_DATABASE: + createSqlServerSyncDatabase(opArgs); + break; default: System.out.println("Unknown operation " + operation); System.exit(1); @@ -83,6 +88,14 @@ public class CdcTools { syncDatabase(params, databaseSync, postgresConfig, "Postgres"); } + private static void createSqlServerSyncDatabase(String[] opArgs) throws Exception { + MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs); + Map<String, String> postgresMap = getConfigMap(params, "sqlserver-conf"); + Configuration postgresConfig = Configuration.fromMap(postgresMap); + DatabaseSync databaseSync = new SqlServerDatabaseSync(); + syncDatabase(params, databaseSync, postgresConfig, "SqlServer"); + } + private static void syncDatabase(MultipleParameterTool params, DatabaseSync databaseSync, Configuration config, String type) throws Exception { String jobName = params.get("job-name"); String database = params.get("database"); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresType.java index 87cbde2..8886c09 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresType.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresType.java @@ -78,7 +78,7 @@ public class PostgresType { case BIGSERIAL: return DorisType.BIGINT; case NUMERIC: - return precision != null && precision <= 38 + return precision != null && precision > 0 && precision <= 38 ? String.format("%s(%s,%s)", DorisType.DECIMAL_V3, precision, scale != null && scale >= 0 ? scale : 0) : DorisType.STRING; case FLOAT4: diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java new file mode 100644 index 0000000..6cf9c9d --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java @@ -0,0 +1,178 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.tools.cdc.sqlserver; + + +import com.ververica.cdc.connectors.base.options.JdbcSourceOptions; +import com.ververica.cdc.connectors.base.options.SourceOptions; +import com.ververica.cdc.connectors.base.options.StartupOptions; +import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource; +import com.ververica.cdc.connectors.sqlserver.SqlServerSource; +import com.ververica.cdc.connectors.sqlserver.source.SqlServerSourceBuilder; +import com.ververica.cdc.debezium.DebeziumSourceFunction; +import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; +import com.ververica.cdc.debezium.table.DebeziumOptions; +import org.apache.doris.flink.catalog.doris.DataModel; +import org.apache.doris.flink.tools.cdc.DatabaseSync; +import org.apache.doris.flink.tools.cdc.SourceSchema; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE; +import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_MAX_RETRIES; +import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT; +import static com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE; +import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; +import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; +import static com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; +import static com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; + +public class SqlServerDatabaseSync extends DatabaseSync { + private static final Logger LOG = LoggerFactory.getLogger(SqlServerDatabaseSync.class); + private static String JDBC_URL = "jdbc:sqlserver://%s:%d;database=%s"; + private static String PORT = "port"; + + public SqlServerDatabaseSync() { + } + + @Override + public Connection getConnection() throws SQLException { + String jdbcUrl = String.format(JDBC_URL, config.get(JdbcSourceOptions.HOSTNAME), config.getInteger(PORT, 1433),config.get(JdbcSourceOptions.DATABASE_NAME)); + Properties pro = new Properties(); + pro.setProperty("user", config.get(JdbcSourceOptions.USERNAME)); + pro.setProperty("password", config.get(JdbcSourceOptions.PASSWORD)); + return DriverManager.getConnection(jdbcUrl, pro); + } + + @Override + public List<SourceSchema> getSchemaList() throws Exception { + String databaseName = config.get(JdbcSourceOptions.DATABASE_NAME); + String schemaName = config.get(JdbcSourceOptions.SCHEMA_NAME); + List<SourceSchema> schemaList = new ArrayList<>(); + LOG.info("database-name {}, schema-name {}", databaseName, schemaName); + try (Connection conn = getConnection()) { + DatabaseMetaData metaData = conn.getMetaData(); + try (ResultSet tables = + metaData.getTables(databaseName, null, "%", new String[]{"TABLE"})) { + while (tables.next()) { + String tableName = tables.getString("TABLE_NAME"); + String tableComment = tables.getString("REMARKS"); + if (!isSyncNeeded(tableName)) { + continue; + } + SourceSchema sourceSchema = + new SqlServerSchema(metaData, databaseName, null, tableName, tableComment); + sourceSchema.setModel(sourceSchema.primaryKeys.size() > 0 ? DataModel.UNIQUE : DataModel.DUPLICATE); + schemaList.add(sourceSchema); + } + } + } + return schemaList; + } + + @Override + public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env) { + String databaseName = config.get(JdbcSourceOptions.DATABASE_NAME); + String schemaName = config.get(JdbcSourceOptions.SCHEMA_NAME); + Preconditions.checkNotNull(databaseName, "database-name in sqlserver is required"); + Preconditions.checkNotNull(databaseName, "schema-name in sqlserver is required"); + + String tableName = config.get(JdbcSourceOptions.TABLE_NAME); + String hostname = config.get(JdbcSourceOptions.HOSTNAME); + Integer port = config.getInteger(PORT, 1433); + String username = config.get(JdbcSourceOptions.USERNAME); + String password = config.get(JdbcSourceOptions.PASSWORD); + + StartupOptions startupOptions = StartupOptions.initial(); + String startupMode = config.get(JdbcSourceOptions.SCAN_STARTUP_MODE); + if ("initial".equalsIgnoreCase(startupMode)) { + startupOptions = StartupOptions.initial(); + } else if ("latest-offset".equalsIgnoreCase(startupMode)) { + startupOptions = StartupOptions.latest(); + } + + //debezium properties set + Properties debeziumProperties = new Properties(); + debeziumProperties.putAll(SqlServerDateConverter.DEFAULT_PROPS); + debeziumProperties.put("decimal.handling.mode", "string"); + + for (Map.Entry<String, String> entry : config.toMap().entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + if (key.startsWith(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX)) { + debeziumProperties.put( + key.substring(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX.length()), value); + } + } + + Map<String, Object> customConverterConfigs = new HashMap<>(); + JsonDebeziumDeserializationSchema schema = + new JsonDebeziumDeserializationSchema(false, customConverterConfigs); + + if(config.getBoolean(SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED, false)){ + JdbcIncrementalSource<String> incrSource = SqlServerSourceBuilder.SqlServerIncrementalSource.<String>builder() + .hostname(hostname) + .port(port) + .databaseList(databaseName) + .tableList(schemaName + "." + tableName) + .username(username) + .password(password) + .startupOptions(startupOptions) + .deserializer(schema) + .includeSchemaChanges(true) + .debeziumProperties(debeziumProperties) + .splitSize(config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE)) + .splitMetaGroupSize(config.get(CHUNK_META_GROUP_SIZE)) + .fetchSize(config.get(SCAN_SNAPSHOT_FETCH_SIZE)) + .connectTimeout(config.get(CONNECT_TIMEOUT)) + .connectionPoolSize(config.get(CONNECTION_POOL_SIZE)) + .connectMaxRetries(config.get(CONNECT_MAX_RETRIES)) + .distributionFactorUpper(config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND)) + .distributionFactorLower(config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND)) + .build(); + return env.fromSource(incrSource, WatermarkStrategy.noWatermarks(), "SqlServer IncrSource"); + }else{ + DebeziumSourceFunction<String> sqlServerSource = SqlServerSource.<String>builder() + .hostname(hostname) + .port(port) + .database(databaseName) + .tableList(schemaName + "." + tableName) + .username(username) + .password(password) + .debeziumProperties(debeziumProperties) + .startupOptions(startupOptions) + .deserializer(schema) + .build(); + return env.addSource(sqlServerSource, "SqlServer Source"); + } + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDateConverter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDateConverter.java new file mode 100644 index 0000000..9f8a4a0 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDateConverter.java @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.tools.cdc.sqlserver; + +import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder; +import io.debezium.spi.converter.CustomConverter; +import io.debezium.spi.converter.RelationalColumn; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Date; +import java.sql.Timestamp; +import java.time.DateTimeException; +import java.time.format.DateTimeFormatter; +import java.util.Properties; +import java.util.function.Consumer; + +public class SqlServerDateConverter implements CustomConverter<SchemaBuilder, RelationalColumn> { + private static final Logger log = LoggerFactory.getLogger(SqlServerDateConverter.class); + private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE; + private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME; + + public static Properties DEFAULT_PROPS = new Properties(); + + static { + DEFAULT_PROPS.setProperty("converters", "date"); + DEFAULT_PROPS.setProperty("date.type", "org.apache.doris.flink.tools.cdc.sqlserver.SqlServerDateConverter"); + DEFAULT_PROPS.setProperty("date.format.date", "yyyy-MM-dd"); + DEFAULT_PROPS.setProperty("date.format.timestamp", "yyyy-MM-dd HH:mm:ss.SSSSSS"); + } + + @Override + public void configure(Properties props) { + readProps(props, "format.date", p -> dateFormatter = DateTimeFormatter.ofPattern(p)); + readProps(props, "format.timestamp", p -> timestampFormatter = DateTimeFormatter.ofPattern(p)); + } + + private void readProps(Properties properties, String settingKey, Consumer<String> callback) { + String settingValue = (String) properties.get(settingKey); + if (settingValue == null || settingValue.length() == 0) { + return; + } + try { + callback.accept(settingValue.trim()); + } catch (IllegalArgumentException | DateTimeException e) { + log.error("setting {} is illegal:{}", settingKey, settingValue); + throw e; + } + } + + @Override + public void converterFor(RelationalColumn column, CustomConverter.ConverterRegistration<SchemaBuilder> registration) { + String sqlType = column.typeName().toUpperCase(); + SchemaBuilder schemaBuilder = null; + CustomConverter.Converter converter = null; + if ("DATE".equals(sqlType)) { + schemaBuilder = SchemaBuilder.string().optional(); + converter = this::convertDate; + } + if ("SMALLDATETIME".equals(sqlType) || "DATETIME".equals(sqlType) || "DATETIME2".equals(sqlType)) { + schemaBuilder = SchemaBuilder.string().optional(); + converter = this::convertDateTime; + } + if (schemaBuilder != null) { + registration.register(schemaBuilder, converter); + } + } + + private Object convertDateTime(Object input) { + if (input instanceof Timestamp) { + return timestampFormatter.format(((Timestamp) input).toLocalDateTime()); + } + return null; + } + + private String convertDate(Object input) { + if (input instanceof Date){ + return dateFormatter.format(((Date) input).toLocalDate()); + } + return null; + } + +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerSchema.java new file mode 100644 index 0000000..36ecb42 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerSchema.java @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.tools.cdc.sqlserver; + +import org.apache.doris.flink.tools.cdc.SourceSchema; + +import java.sql.DatabaseMetaData; + +public class SqlServerSchema extends SourceSchema { + + public SqlServerSchema(DatabaseMetaData metaData, String databaseName, String schemaName, String tableName, String tableComment) throws Exception { + super(metaData, databaseName, schemaName, tableName, tableComment); + } + + @Override + public String convertToDorisType(String fieldType, Integer precision, Integer scale) { + return SqlServerType.toDorisType(fieldType, precision, scale); + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java new file mode 100644 index 0000000..aedb16f --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.tools.cdc.sqlserver; + +import org.apache.doris.flink.catalog.doris.DorisType; + +public class SqlServerType { + private static final String BIT = "bit"; + private static final String TINYINT = "tinyint"; + private static final String SMALLINT = "smallint"; + private static final String INT = "int"; + private static final String BIGINT = "bigint"; + private static final String REAL = "real"; + private static final String FLOAT = "float"; + private static final String MONEY = "money"; + private static final String SMALLMONEY = "smallmoney"; + private static final String DECIMAL = "decimal"; + private static final String NUMERIC = "numeric"; + private static final String DATE = "date"; + private static final String DATETIME = "datetime"; + private static final String DATETIME2 = "datetime2"; + private static final String SMALLDATETIME = "smalldatetime"; + private static final String CHAR = "char"; + private static final String VARCHAR = "varchar"; + private static final String NCHAR = "nchar"; + private static final String NVARCHAR = "nvarchar"; + private static final String TEXT = "text"; + private static final String NTEXT = "ntext"; + private static final String TIME = "time"; + private static final String DATETIMEOFFSET = "datetimeoffset"; + private static final String IMAGE = "image"; + private static final String BINARY = "binary"; + private static final String VARBINARY = "varbinary"; + + public static String toDorisType(String sqlServerType, Integer precision, Integer scale) { + sqlServerType = sqlServerType.toLowerCase(); + switch (sqlServerType){ + case BIT: + return DorisType.BOOLEAN; + case TINYINT: + return DorisType.TINYINT; + case SMALLINT: + return DorisType.SMALLINT; + case INT: + return DorisType.INT; + case BIGINT: + return DorisType.BIGINT; + case REAL: + return DorisType.FLOAT; + case FLOAT: + return DorisType.DOUBLE; + case MONEY: + return String.format("%s(%s,%s)", DorisType.DECIMAL_V3, 19, 4); + case SMALLMONEY: + return String.format("%s(%s,%s)", DorisType.DECIMAL_V3, 10, 4); + case NUMERIC: + return precision != null && precision > 0 && precision <= 38 + ? String.format("%s(%s,%s)", DorisType.DECIMAL_V3, precision, scale != null && scale >= 0 ? scale : 0) + : DorisType.STRING; + case DATE: + return DorisType.DATE_V2; + case DATETIME: + case DATETIME2: + case SMALLDATETIME: + return String.format("%s(%s)", DorisType.DATETIME_V2, Math.min(scale == null ? 0 : scale, 6)); + case CHAR: + case VARCHAR: + case NCHAR: + case NVARCHAR: + return precision * 3 > 65533 ? DorisType.STRING : String.format("%s(%s)", DorisType.VARCHAR, precision * 3); + case TEXT: + case NTEXT: + case TIME: + case DATETIMEOFFSET: + return DorisType.STRING; + default: + throw new UnsupportedOperationException("Unsupported SqlServer Type: " + sqlServerType); + } + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java new file mode 100644 index 0000000..96780aa --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.tools.cdc; + +import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerDatabaseSync; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +public class CdcSqlServerSyncDatabaseCase { + + public static void main(String[] args) throws Exception{ + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.disableOperatorChaining(); + env.enableCheckpointing(10000); + +// Map<String,String> flinkMap = new HashMap<>(); +// flinkMap.put("execution.checkpointing.interval","10s"); +// flinkMap.put("pipeline.operator-chaining","false"); +// flinkMap.put("parallelism.default","1"); +// +// +// Configuration configuration = Configuration.fromMap(flinkMap); +// env.configure(configuration); + + String database = "db2"; + String tablePrefix = ""; + String tableSuffix = ""; + Map<String,String> sourceConfig = new HashMap<>(); + sourceConfig.put("database-name","CDC_DB"); + sourceConfig.put("schema-name","dbo"); + sourceConfig.put("hostname","127.0.0.1"); + sourceConfig.put("port","1433"); + sourceConfig.put("username","sa"); + sourceConfig.put("password","123456"); +// sourceConfig.put("debezium.database.tablename.case.insensitive","false"); +// sourceConfig.put("scan.incremental.snapshot.enabled","true"); +// sourceConfig.put("debezium.include.schema.changes","false"); + + Configuration config = Configuration.fromMap(sourceConfig); + + Map<String,String> sinkConfig = new HashMap<>(); + sinkConfig.put("fenodes","127.0.0.1:8030"); + sinkConfig.put("username","root"); + sinkConfig.put("password",""); + sinkConfig.put("jdbc-url","jdbc:mysql://127.0.0.1:9030"); + sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString()); + Configuration sinkConf = Configuration.fromMap(sinkConfig); + + Map<String,String> tableConfig = new HashMap<>(); + tableConfig.put("replication_num", "1"); + + String includingTables = "products_test"; + String excludingTables = ""; + boolean ignoreDefaultValue = false; + boolean useNewSchemaChange = false; + DatabaseSync databaseSync = new SqlServerDatabaseSync(); + databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig, false, useNewSchemaChange); + databaseSync.build(); + env.execute(String.format("Postgres-Doris Database Sync: %s", database)); + + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org