This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 590e46ebe3 [INLONG-10289][Agent] Update the SQLServer Source (#10735)
590e46ebe3 is described below

commit 590e46ebe3471a8f72acc199cb8cad120b49fe42
Author: zoy0 <105140381+z...@users.noreply.github.com>
AuthorDate: Fri Aug 2 09:48:25 2024 +0800

    [INLONG-10289][Agent] Update the SQLServer Source (#10735)
---
 .../inlong/agent/constant/TaskConstants.java       |  11 ++
 .../apache/inlong/agent/pojo/SqlServerTask.java    |   5 +
 .../apache/inlong/agent/pojo/TaskProfileDto.java   |   5 +
 .../agent/plugin/instance/SQLServerInstance.java   |  31 ++++
 .../agent/plugin/sources/SQLServerSource.java      | 172 +++++++++++++++----
 .../inlong/agent/plugin/task/SQLServerTask.java    | 111 ++++++++++++
 .../agent/plugin/utils/SQLServerTimeConverter.java | 124 ++++++++++++++
 .../agent/plugin/sources/TestSQLServerSource.java  | 188 +++++++++++++++++----
 8 files changed, 585 insertions(+), 62 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index 4cd6ac56ed..1f142f839e 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -153,6 +153,17 @@ public class TaskConstants extends CommonConstants {
     public static final String TASK_MQTT_AUTOMATIC_RECONNECT = 
"task.mqttTask.automaticReconnect";
     public static final String TASK_MQTT_VERSION = "task.mqttTask.mqttVersion";
 
+    // SQLServer task
+    public static final String TASK_SQLSERVER_HOSTNAME = 
"task.sqlserverTask.hostname";
+    public static final String TASK_SQLSERVER_PORT = "task.sqlserverTask.port";
+    public static final String TASK_SQLSERVER_USER = "task.sqlserverTask.user";
+    public static final String TASK_SQLSERVER_PASSWORD = 
"task.sqlserverTask.password";
+    public static final String TASK_SQLSERVER_DB_NAME = 
"task.sqlserverTask.dbname";
+    public static final String TASK_SQLSERVER_SNAPSHOT_MODE = 
"task.sqlserverTask.snapshot.mode";
+    public static final String TASK_SQLSERVER_SERVER_NAME = 
"task.sqlserverTask.serverName";
+    public static final String TASK_SQLSERVER_SCHEMA_NAME = 
"task.sqlserverTask.schemaName";
+    public static final String TASK_SQLSERVER_TABLE_NAME = 
"task.sqlserverTask.tableName";
+
     public static final String TASK_STATE = "task.state";
 
     public static final String INSTANCE_STATE = "instance.state";
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/SqlServerTask.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/SqlServerTask.java
index 56e4a9b920..9240e8366a 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/SqlServerTask.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/SqlServerTask.java
@@ -28,6 +28,9 @@ public class SqlServerTask {
     private String port;
     private String serverName;
     private String dbname;
+    private String schemaName;
+    private String tableName;
+    private String serverTimezone;
 
     private SqlServerTask.Snapshot snapshot;
     private SqlServerTask.Offset offset;
@@ -63,6 +66,8 @@ public class SqlServerTask {
         private String port;
         private String database;
         private String schemaName;
+        private String tableName;
+        private String serverTimezone;
 
         private String snapshotMode;
         private String intervalMs;
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
index cc6cfe8244..039acea32d 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
@@ -54,6 +54,7 @@ public class TaskProfileDto {
     public static final String DEFAULT_MONGODB_TASK = 
"org.apache.inlong.agent.plugin.task.MongoDBTask";
     public static final String DEFAULT_POSTGRESQL_TASK = 
"org.apache.inlong.agent.plugin.task.PostgreSQLTask";
     public static final String DEFAULT_MQTT_TASK = 
"org.apache.inlong.agent.plugin.task.MqttTask";
+    public static final String DEFAULT_SQLSERVER_TASK = 
"org.apache.inlong.agent.plugin.task.SQLServerTask";
     public static final String DEFAULT_CHANNEL = 
"org.apache.inlong.agent.plugin.channel.MemoryChannel";
     public static final String MANAGER_JOB = "MANAGER_JOB";
     public static final String DEFAULT_DATA_PROXY_SINK = 
"org.apache.inlong.agent.plugin.sinks.ProxySink";
@@ -343,6 +344,9 @@ public class TaskProfileDto {
         sqlServerTask.setPort(config.getPort());
         sqlServerTask.setServerName(config.getSchemaName());
         sqlServerTask.setDbname(config.getDatabase());
+        sqlServerTask.setSchemaName(config.getSchemaName());
+        sqlServerTask.setTableName(config.getSchemaName() + "." + 
config.getTableName());
+        sqlServerTask.setServerTimezone(config.getServerTimezone());
 
         SqlServerTask.Offset offset = new SqlServerTask.Offset();
         offset.setFilename(config.getOffsetFilename());
@@ -495,6 +499,7 @@ public class TaskProfileDto {
                 profileDto.setTask(task);
                 break;
             case SQLSERVER:
+                task.setTaskClass(DEFAULT_SQLSERVER_TASK);
                 SqlServerTask sqlserverTask = getSqlServerTask(dataConfig);
                 task.setSqlserverTask(sqlserverTask);
                 task.setSource(SQLSERVER_SOURCE);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/SQLServerInstance.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/SQLServerInstance.java
new file mode 100644
index 0000000000..735f416ff9
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/SQLServerInstance.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.instance;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
+
+import java.io.IOException;
+
+public class SQLServerInstance extends CommonInstance {
+
+    @Override
+    public void setInodeInfo(InstanceProfile profile) throws IOException {
+        profile.set(TaskConstants.INODE_INFO, "");
+    }
+}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java
index e067833c6b..01e61a99bd 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java
@@ -17,66 +17,184 @@
 
 package org.apache.inlong.agent.plugin.sources;
 
+import org.apache.inlong.agent.common.AgentThreadFactory;
+import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
-import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.constant.SqlServerConstants;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.except.FileException;
 import org.apache.inlong.agent.plugin.file.Reader;
 import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
-import org.apache.inlong.agent.plugin.sources.reader.SQLServerReader;
 
+import io.debezium.connector.sqlserver.SqlServerConnector;
+import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
+import io.debezium.engine.ChangeEvent;
+import io.debezium.engine.DebeziumEngine;
+import io.debezium.engine.format.Json;
+import io.debezium.engine.spi.OffsetCommitPolicy;
+import io.debezium.relational.history.FileDatabaseHistory;
+import org.apache.kafka.connect.storage.FileOffsetBackingStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 /**
  * SQLServer source
  */
 public class SQLServerSource extends AbstractSource {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(SQLServerSource.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SQLServerSource.class);
+    private static final Integer DEBEZIUM_QUEUE_SIZE = 100;
+    private ExecutorService executor;
+    public InstanceProfile profile;
+    private BlockingQueue<SourceData> debeziumQueue;
+    private final Properties props = new Properties();
+    private String dbName;
+    private String schemaName;
+    private String tableName;
 
     public SQLServerSource() {
     }
 
-    @Override
-    public List<Reader> split(TaskProfile conf) {
-        SQLServerReader sqlServerReader = new SQLServerReader();
-        List<Reader> readerList = new ArrayList<>();
-        readerList.add(sqlServerReader);
-        sourceMetric.sourceSuccessCount.incrementAndGet();
-        return readerList;
+    protected void initSource(InstanceProfile profile) {
+        try {
+            LOGGER.info("SQLServerSource init: {}", profile.toJsonStr());
+            debeziumQueue = new LinkedBlockingQueue<>(DEBEZIUM_QUEUE_SIZE);
+
+            dbName = profile.get(TaskConstants.TASK_SQLSERVER_DB_NAME);
+            schemaName = profile.get(TaskConstants.TASK_SQLSERVER_SCHEMA_NAME);
+            tableName = profile.get(TaskConstants.TASK_SQLSERVER_TABLE_NAME);
+
+            props.setProperty("name", "SQLServer-" + instanceId);
+            props.setProperty("connector.class", 
SqlServerConnector.class.getName());
+            props.setProperty("offset.storage", 
FileOffsetBackingStore.class.getName());
+            String agentPath = AgentConfiguration.getAgentConf()
+                    .get(AgentConstants.AGENT_HOME, 
AgentConstants.DEFAULT_AGENT_HOME);
+            String offsetPath = agentPath + "/" + getThreadName() + 
"/offset.dat";
+            props.setProperty("offset.storage.file.filename", offsetPath);
+            props.setProperty("offset.flush.interval.ms", "10000");
+            props.setProperty("database.history", 
FileDatabaseHistory.class.getCanonicalName());
+            props.setProperty("database.history.file.filename", agentPath + 
"/" + getThreadName() + "/history.dat");
+            // ignore "schema" and extract data from "payload"
+            props.setProperty("key.converter.schemas.enable", "false");
+            props.setProperty("value.converter.schemas.enable", "false");
+            // ignore ddl
+            props.setProperty("include.schema.changes", "false");
+            // convert time to formatted string
+            props.setProperty("converters", "datetime");
+            props.setProperty("datetime.type", 
"org.apache.inlong.agent.plugin.utils.SQLServerTimeConverter");
+            props.setProperty("datetime.format.date", "yyyy-MM-dd");
+            props.setProperty("datetime.format.time", "HH:mm:ss");
+            props.setProperty("datetime.format.datetime", "yyyy-MM-dd 
HH:mm:ss");
+            props.setProperty("datetime.format.timestamp", "yyyy-MM-dd 
HH:mm:ss");
+
+            
props.setProperty(String.valueOf(SqlServerConnectorConfig.HOSTNAME),
+                    profile.get(TaskConstants.TASK_SQLSERVER_HOSTNAME));
+            props.setProperty(String.valueOf(SqlServerConnectorConfig.PORT),
+                    profile.get(TaskConstants.TASK_SQLSERVER_PORT));
+            props.setProperty(String.valueOf(SqlServerConnectorConfig.USER),
+                    profile.get(TaskConstants.TASK_SQLSERVER_USER));
+            
props.setProperty(String.valueOf(SqlServerConnectorConfig.PASSWORD),
+                    profile.get(TaskConstants.TASK_SQLSERVER_PASSWORD));
+            
props.setProperty(String.valueOf(SqlServerConnectorConfig.DATABASE_NAME),
+                    profile.get(TaskConstants.TASK_SQLSERVER_DB_NAME));
+            
props.setProperty(String.valueOf(SqlServerConnectorConfig.SNAPSHOT_MODE),
+                    profile.get(TaskConstants.TASK_SQLSERVER_SNAPSHOT_MODE, 
SqlServerConstants.INITIAL));
+            
props.setProperty(String.valueOf(SqlServerConnectorConfig.SERVER_NAME),
+                    profile.get(TaskConstants.TASK_SQLSERVER_SERVER_NAME));
+            
props.setProperty(String.valueOf(SqlServerConnectorConfig.SCHEMA_INCLUDE_LIST),
+                    profile.get(TaskConstants.TASK_SQLSERVER_SCHEMA_NAME));
+            
props.setProperty(String.valueOf(SqlServerConnectorConfig.TABLE_INCLUDE_LIST),
+                    profile.get(TaskConstants.TASK_SQLSERVER_TABLE_NAME));
+
+            executor = Executors.newSingleThreadExecutor();
+            executor.execute(startDebeziumEngine());
+        } catch (Exception ex) {
+            stopRunning();
+            throw new FileException("error init stream for " + instanceId, ex);
+        }
+    }
+
+    private Runnable startDebeziumEngine() {
+        return () -> {
+            AgentThreadFactory.nameThread(getThreadName() + "debezium");
+            try (DebeziumEngine<ChangeEvent<String, String>> debeziumEngine = 
DebeziumEngine.create(Json.class)
+                    .using(props)
+                    .using(OffsetCommitPolicy.always())
+                    .notifying(this::handleConsumerEvent)
+                    .build()) {
+
+                debeziumEngine.run();
+            } catch (Throwable e) {
+                LOGGER.error("do run error in SQLServer debezium: ", e);
+            }
+        };
+    }
+
+    private void handleConsumerEvent(List<ChangeEvent<String, String>> records,
+            DebeziumEngine.RecordCommitter<ChangeEvent<String, String>> 
committer) throws InterruptedException {
+        for (ChangeEvent<String, String> record : records) {
+            boolean offerSuc = false;
+            SourceData sourceData = new 
SourceData(record.value().getBytes(StandardCharsets.UTF_8), "0");
+            while (isRunnable() && !offerSuc) {
+                offerSuc = debeziumQueue.offer(sourceData, 1, 
TimeUnit.SECONDS);
+            }
+            committer.markProcessed(record);
+        }
+        committer.markBatchFinished();
     }
 
     @Override
-    protected String getThreadName() {
+    public List<Reader> split(TaskProfile conf) {
         return null;
     }
 
     @Override
-    protected void initSource(InstanceProfile profile) {
-
+    protected String getThreadName() {
+        return "SQLServer-source-" + taskId + "-" + instanceId;
     }
 
     @Override
     protected void printCurrentState() {
-
+        LOGGER.info("sqlserver databases is {} and schema is {} and table is 
{}", dbName, schemaName, tableName);
     }
 
     @Override
     protected boolean doPrepareToRead() {
-        return false;
+        return true;
     }
 
     @Override
     protected List<SourceData> readFromSource() {
-        return null;
-    }
-
-    @Override
-    public Message read() {
-        return null;
+        List<SourceData> dataList = new ArrayList<>();
+        try {
+            int size = 0;
+            while (size < BATCH_READ_LINE_TOTAL_LEN) {
+                SourceData sourceData = debeziumQueue.poll(1, 
TimeUnit.SECONDS);
+                if (sourceData != null) {
+                    LOGGER.info("readFromSource: {}", sourceData.getData());
+                    size += sourceData.getData().length;
+                    dataList.add(sourceData);
+                } else {
+                    break;
+                }
+            }
+
+        } catch (InterruptedException e) {
+            LOGGER.error("poll {} data from debezium queue interrupted.", 
instanceId);
+        }
+        return dataList;
     }
 
     @Override
@@ -86,16 +204,12 @@ public class SQLServerSource extends AbstractSource {
 
     @Override
     protected void releaseSource() {
-
-    }
-
-    @Override
-    public boolean sourceFinish() {
-        return false;
+        LOGGER.info("release sqlserver source");
+        executor.shutdownNow();
     }
 
     @Override
     public boolean sourceExist() {
-        return false;
+        return true;
     }
-}
+}
\ No newline at end of file
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/SQLServerTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/SQLServerTask.java
new file mode 100644
index 0000000000..dd1446b301
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/SQLServerTask.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.CycleUnitType;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.utils.AgentUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+
+public class SQLServerTask extends AbstractTask {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SQLServerTask.class);
+    public static final String DEFAULT_SQLSERVER_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.SQLServerInstance";
+    private boolean isAdded = false;
+
+    private String dbName;
+    private String schemaName;
+    private String tableName;
+    private String instanceId;
+
+    private final DateTimeFormatter dateTimeFormatter = 
DateTimeFormatter.ofPattern("yyyyMMddHH");
+
+    @Override
+    protected int getInstanceLimit() {
+        return DEFAULT_INSTANCE_LIMIT;
+    }
+
+    @Override
+    protected void initTask() {
+        LOGGER.info("SQLServer commonInit: {}", taskProfile.toJsonStr());
+        dbName = taskProfile.get(TaskConstants.TASK_SQLSERVER_DB_NAME);
+        tableName = taskProfile.get(TaskConstants.TASK_SQLSERVER_TABLE_NAME);
+        schemaName = taskProfile.get(TaskConstants.TASK_SQLSERVER_SCHEMA_NAME);
+        instanceId = dbName + "-" + tableName;
+    }
+
+    @Override
+    public boolean isProfileValid(TaskProfile profile) {
+        if (!profile.allRequiredKeyExist()) {
+            LOGGER.error("task profile needs all required key");
+            return false;
+        }
+        if (!profile.hasKey(TaskConstants.TASK_SQLSERVER_HOSTNAME)) {
+            LOGGER.error("task profile needs hostname");
+            return false;
+        }
+        if (!profile.hasKey(TaskConstants.TASK_SQLSERVER_PORT)) {
+            LOGGER.error("task profile needs port");
+            return false;
+        }
+        if (!profile.hasKey(TaskConstants.TASK_SQLSERVER_USER)) {
+            LOGGER.error("task profile needs username");
+            return false;
+        }
+        if (!profile.hasKey(TaskConstants.TASK_SQLSERVER_PASSWORD)) {
+            LOGGER.error("task profile needs password");
+            return false;
+        }
+        if (!profile.hasKey(TaskConstants.TASK_SQLSERVER_DB_NAME)) {
+            LOGGER.error("task profile needs DB name");
+            return false;
+        }
+        if (!profile.hasKey(TaskConstants.TASK_SQLSERVER_SCHEMA_NAME)) {
+            LOGGER.error("task profile needs schema name");
+            return false;
+        }
+        if (!profile.hasKey(TaskConstants.TASK_SQLSERVER_TABLE_NAME)) {
+            LOGGER.error("task profile needs table name");
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    protected List<InstanceProfile> getNewInstanceList() {
+        List<InstanceProfile> list = new ArrayList<>();
+        if (isAdded) {
+            return list;
+        }
+        String dataTime = LocalDateTime.now().format(dateTimeFormatter);
+        InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(DEFAULT_SQLSERVER_INSTANCE, instanceId,
+                CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
+        list.add(instanceProfile);
+        this.isAdded = true;
+        return list;
+    }
+}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/SQLServerTimeConverter.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/SQLServerTimeConverter.java
new file mode 100644
index 0000000000..008dbda4ea
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/SQLServerTimeConverter.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.utils;
+
+import io.debezium.spi.converter.CustomConverter;
+import io.debezium.spi.converter.RelationalColumn;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.DateTimeException;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.Properties;
+import java.util.function.Consumer;
+
+public class SQLServerTimeConverter implements CustomConverter<SchemaBuilder, 
RelationalColumn> {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SQLServerTimeConverter.class);
+
+    private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;
+    private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;
+    private DateTimeFormatter datetimeFormatter = 
DateTimeFormatter.ISO_DATE_TIME;
+
+    private ZoneOffset defalutZoneOffset = 
ZoneOffset.systemDefault().getRules().getOffset(Instant.now());
+
+    @Override
+    public void configure(Properties props) {
+        readProps(props, "format.date", p -> dateFormatter = 
DateTimeFormatter.ofPattern(p));
+        readProps(props, "format.time", p -> timeFormatter = 
DateTimeFormatter.ofPattern(p));
+        readProps(props, "format.datetime", p -> datetimeFormatter = 
DateTimeFormatter.ofPattern(p));
+        readProps(props, "format.timestamp.zone", z -> defalutZoneOffset = 
ZoneOffset.of(z));
+    }
+
+    private void readProps(Properties properties, String settingKey, 
Consumer<String> callback) {
+        String settingValue = (String) properties.get(settingKey);
+        if (settingValue == null || settingValue.length() == 0) {
+            return;
+        }
+        try {
+            callback.accept(settingValue.trim());
+        } catch (IllegalArgumentException | DateTimeException e) {
+            LOGGER.error("The {} setting is illegal:{}", settingKey, 
settingValue);
+            throw e;
+        }
+    }
+
+    @Override
+    public void converterFor(RelationalColumn column, 
ConverterRegistration<SchemaBuilder> registration) {
+        String sqlType = column.typeName().toUpperCase();
+        SchemaBuilder schemaBuilder = null;
+        Converter converter = null;
+        if ("DATE".equals(sqlType)) {
+            schemaBuilder = 
SchemaBuilder.string().optional().name("org.apache.inlong.agent.date.string");
+            converter = this::convertDate;
+        }
+        if ("TIME".equals(sqlType)) {
+            schemaBuilder = 
SchemaBuilder.string().optional().name("org.apache.inlong.agent.time.string");
+            converter = this::convertTime;
+        }
+        if ("DATETIME".equals(sqlType) ||
+                "DATETIME2".equals(sqlType) ||
+                "SMALLDATETIME".equals(sqlType)) {
+            schemaBuilder = 
SchemaBuilder.string().optional().name("org.apache.inlong.agent.datetime.string");
+            converter = this::convertDateTime;
+        }
+        if ("DATETIMEOFFSET".equals(sqlType)) {
+            schemaBuilder = 
SchemaBuilder.string().optional().name("org.apache.inlong.agent.datetimeoffset.string");
+            converter = this::convertDateTimeOffset;
+        }
+        if (schemaBuilder != null) {
+            registration.register(schemaBuilder, converter);
+            LOGGER.info("register converter for sqlType {} to schema {}", 
sqlType, schemaBuilder.name());
+        }
+    }
+
+    private String convertDate(Object input) {
+        if (input instanceof java.sql.Date) {
+            return dateFormatter.format(((java.sql.Date) input).toLocalDate());
+        }
+        return input == null ? null : input.toString();
+    }
+
+    private String convertTime(Object input) {
+        if (input instanceof java.sql.Time) {
+            return timeFormatter.format(((java.sql.Time) input).toLocalTime());
+        } else if (input instanceof java.sql.Timestamp) {
+            return timeFormatter.format(((java.sql.Timestamp) 
input).toLocalDateTime().toLocalTime());
+        }
+        return input == null ? null : input.toString();
+    }
+
+    private String convertDateTime(Object input) {
+        if (input instanceof java.sql.Timestamp) {
+            return datetimeFormatter.format(((java.sql.Timestamp) 
input).toLocalDateTime());
+        }
+        return input == null ? null : input.toString();
+    }
+
+    private String convertDateTimeOffset(Object input) {
+        if (input instanceof microsoft.sql.DateTimeOffset) {
+            microsoft.sql.DateTimeOffset dateTimeOffset = 
(microsoft.sql.DateTimeOffset) input;
+            return datetimeFormatter.format(
+                    
dateTimeOffset.getOffsetDateTime().withOffsetSameInstant(defalutZoneOffset).toLocalDateTime());
+        }
+        return input == null ? null : input.toString();
+    }
+}
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
index 778c2710c4..0dc8b71be4 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
@@ -17,74 +17,196 @@
 
 package org.apache.inlong.agent.plugin.sources;
 
+import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
-import org.apache.inlong.agent.metrics.AgentMetricItem;
-import org.apache.inlong.agent.metrics.AgentMetricItemSet;
-import org.apache.inlong.common.metric.MetricItem;
+import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.constant.CommonConstants;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.core.task.OffsetManager;
+import org.apache.inlong.agent.core.task.TaskManager;
+import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
+import org.apache.inlong.agent.store.Store;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.common.enums.TaskStateEnum;
 import org.apache.inlong.common.metric.MetricRegister;
 
+import io.debezium.engine.ChangeEvent;
+import io.debezium.engine.DebeziumEngine;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
-import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import java.util.concurrent.atomic.AtomicLong;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
+import static org.junit.Assert.*;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doNothing;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
 import static org.powermock.api.mockito.PowerMockito.when;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
-import static org.powermock.api.support.membermodification.MemberMatcher.field;
 
 /**
  * Test cases for {@link SQLServerSource}.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({SQLServerSource.class, MetricRegister.class})
+@PrepareForTest({DebeziumEngine.class, Executors.class, SQLServerSource.class, 
MetricRegister.class})
 @PowerMockIgnore({"javax.management.*"})
 public class TestSQLServerSource {
 
+    private SQLServerSource source;
+
+    private static AgentBaseTestsHelper helper;
+    // task basic store
+    private static Store taskBasicStore;
+    // instance basic store
+    private static Store instanceBasicStore;
+    // offset basic store
+    private static Store offsetBasicStore;
+
+    InstanceProfile instanceProfile;
+
     @Mock
-    TaskProfile jobProfile;
+    private DebeziumEngine.Builder builder;
 
     @Mock
-    private AgentMetricItemSet agentMetricItemSet;
+    private ExecutorService executorService;
 
     @Mock
-    private AgentMetricItem agentMetricItem;
+    DebeziumEngine.RecordCommitter<ChangeEvent<String, String>> committer;
 
-    private AtomicLong sourceSuccessCount;
+    @Mock
+    private DebeziumEngine<ChangeEvent<String, String>> engine;
+
+    private BlockingQueue queue;
 
-    private AtomicLong sourceFailCount;
+    private final String instanceId = "s4bc475560b4444dbd4e9812ab1fd64d";
 
     @Before
     public void setup() throws Exception {
-        sourceSuccessCount = new AtomicLong(0);
-        sourceFailCount = new AtomicLong(0);
-
-        // mock metrics
-        
whenNew(AgentMetricItemSet.class).withArguments(anyString()).thenReturn(agentMetricItemSet);
-        
when(agentMetricItemSet.findMetricItem(any())).thenReturn(agentMetricItem);
-        field(AgentMetricItem.class, 
"sourceSuccessCount").set(agentMetricItem, sourceSuccessCount);
-        field(AgentMetricItem.class, "sourceFailCount").set(agentMetricItem, 
sourceFailCount);
-        PowerMockito.mockStatic(MetricRegister.class);
-        PowerMockito.doNothing().when(
-                MetricRegister.class, "register", any(MetricItem.class));
+
+        helper = new 
AgentBaseTestsHelper(TestSQLServerSource.class.getName()).setupAgentHome();
+        taskBasicStore = 
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_TASK);
+        instanceBasicStore = 
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_INSTANCE);
+        offsetBasicStore =
+                TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_OFFSET);
+        OffsetManager.init(taskBasicStore, instanceBasicStore, 
offsetBasicStore);
+        // mock DebeziumEngine
+        mockStatic(DebeziumEngine.class);
+        
when(DebeziumEngine.create(io.debezium.engine.format.Json.class)).thenReturn(builder);
+        when(builder.using(any(Properties.class))).thenReturn(builder);
+        
when(builder.notifying(any(DebeziumEngine.ChangeConsumer.class))).thenReturn(builder);
+        
when(builder.using(any(DebeziumEngine.CompletionCallback.class))).thenReturn(builder);
+        when(builder.build()).thenReturn(engine);
+
+        doNothing().when(committer).markProcessed(any(ChangeEvent.class));
+        doNothing().when(committer).markBatchFinished();
+
+        // mock executorService
+        mockStatic(Executors.class);
+        when(Executors.newSingleThreadExecutor()).thenReturn(executorService);
+
+        getSource();
+        // init source debeziumQueue
+        Field field = SQLServerSource.class.getDeclaredField("debeziumQueue");
+        field.setAccessible(true);
+        queue = (BlockingQueue) field.get(source);
+    }
+
+    private SQLServerSource getSource() {
+        final String username = "SA";
+        final String password = "123456";
+        final String hostname = "127.0.0.1";
+        final String port = "1434";
+        final String groupId = "group01";
+        final String streamId = "stream01";
+        final String dbName = "inlong";
+        final String schemaName = "dbo";
+        final String tableName = "test_source";
+        final String serverName = "server-01";
+
+        TaskProfile taskProfile = helper.getTaskProfile(1, "", false, 0L, 0L, 
TaskStateEnum.RUNNING, "D",
+                "GMT+8:00");
+        instanceProfile = taskProfile.createInstanceProfile("",
+                "", taskProfile.getCycleUnit(), "20240725", 
AgentUtils.getCurrentTime());
+        instanceProfile.set(CommonConstants.PROXY_INLONG_GROUP_ID, groupId);
+        instanceProfile.set(CommonConstants.PROXY_INLONG_STREAM_ID, streamId);
+        instanceProfile.set(TaskConstants.TASK_SQLSERVER_USER, username);
+        instanceProfile.set(TaskConstants.TASK_SQLSERVER_PASSWORD, password);
+        instanceProfile.set(TaskConstants.TASK_SQLSERVER_HOSTNAME, hostname);
+        instanceProfile.set(TaskConstants.TASK_SQLSERVER_PORT, port);
+        instanceProfile.set(TaskConstants.TASK_SQLSERVER_DB_NAME, dbName);
+        instanceProfile.set(TaskConstants.TASK_SQLSERVER_SCHEMA_NAME, 
schemaName);
+        instanceProfile.set(TaskConstants.TASK_SQLSERVER_TABLE_NAME, 
tableName);
+        instanceProfile.set(TaskConstants.TASK_SQLSERVER_SERVER_NAME, 
serverName);
+        instanceProfile.set(TaskConstants.TASK_AUDIT_VERSION, "0");
+        instanceProfile.setInstanceId(instanceId);
+
+        (source = new SQLServerSource()).init(instanceProfile);
+        return source;
     }
 
-    /**
-     * Test cases for .
-     */
     @Test
-    public void testSplit() {
+    public void testSQLServerSource() throws Exception {
+        testHandleConsumerEvent();
+        TestReadDataFromSource();
+        TestReadEmptyFromSource();
+    }
+
+    // test DebeziumEngine get one recode from SQLServer
+    private void testHandleConsumerEvent() throws Exception {
+        List<ChangeEvent<String, String>> records = new ArrayList<>();
+        records.add(new ChangeEvent<String, String>() {
+
+            @Override
+            public String key() {
+                return "KEY";
+            }
+
+            @Override
+            public String value() {
+                return "VALUE";
+            }
+
+            @Override
+            public String destination() {
+                return null;
+            }
+        });
+        Method handleConsumerEvent = 
SQLServerSource.class.getDeclaredMethod("handleConsumerEvent", List.class,
+                DebeziumEngine.RecordCommitter.class);
+        handleConsumerEvent.setAccessible(true);
+        handleConsumerEvent.invoke(source, records, committer);
+        assertEquals(1, queue.size());
+    }
+
+    // test read one source data from queue
+    private void TestReadDataFromSource() throws Exception {
+        Method handleConsumerEvent = 
SQLServerSource.class.getDeclaredMethod("readFromSource");
+        handleConsumerEvent.setAccessible(true);
+
+        List result = (List) handleConsumerEvent.invoke(source);
+        assertFalse(result.isEmpty());
+        assertTrue(queue.isEmpty());
+    }
+
+    // test read
+    private void TestReadEmptyFromSource() throws Exception {
+        Method handleConsumerEvent = 
SQLServerSource.class.getDeclaredMethod("readFromSource");
+        handleConsumerEvent.setAccessible(true);
 
-        // build mock
-        final SQLServerSource source = new SQLServerSource();
-        // assert
-        // assertEquals(1, source.split(jobProfile).size());
+        queue.clear();
+        List result = (List) handleConsumerEvent.invoke(source);
+        assertTrue(result.isEmpty());
+        assertTrue(queue.isEmpty());
     }
 }


Reply via email to