zoy0 commented on code in PR #10735: URL: https://github.com/apache/inlong/pull/10735#discussion_r1697792839
########## inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java: ########## @@ -17,66 +17,180 @@ package org.apache.inlong.agent.plugin.sources; +import org.apache.inlong.agent.common.AgentThreadFactory; +import org.apache.inlong.agent.conf.AgentConfiguration; import org.apache.inlong.agent.conf.InstanceProfile; import org.apache.inlong.agent.conf.TaskProfile; -import org.apache.inlong.agent.plugin.Message; +import org.apache.inlong.agent.constant.AgentConstants; +import org.apache.inlong.agent.constant.SqlServerConstants; +import org.apache.inlong.agent.constant.TaskConstants; +import org.apache.inlong.agent.except.FileException; import org.apache.inlong.agent.plugin.file.Reader; import org.apache.inlong.agent.plugin.sources.file.AbstractSource; -import org.apache.inlong.agent.plugin.sources.reader.SQLServerReader; +import io.debezium.connector.sqlserver.SqlServerConnector; +import io.debezium.connector.sqlserver.SqlServerConnectorConfig; +import io.debezium.engine.ChangeEvent; +import io.debezium.engine.DebeziumEngine; +import io.debezium.engine.format.Json; +import io.debezium.engine.spi.OffsetCommitPolicy; +import io.debezium.relational.history.FileDatabaseHistory; +import org.apache.kafka.connect.storage.FileOffsetBackingStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import java.util.Properties; +import java.util.concurrent.*; /** * SQLServer source */ public class SQLServerSource extends AbstractSource { - private static final Logger logger = LoggerFactory.getLogger(SQLServerSource.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SQLServerSource.class); + private static final Integer DEBEZIUM_QUEUE_SIZE = 100; + private ExecutorService executor; + public InstanceProfile profile; + private BlockingQueue<SourceData> debeziumQueue; + private final Properties props = new Properties(); + private String dbName; + private String schemaName; + private String tableName; public SQLServerSource() { } - @Override - public List<Reader> split(TaskProfile conf) { - SQLServerReader sqlServerReader = new SQLServerReader(); - List<Reader> readerList = new ArrayList<>(); - readerList.add(sqlServerReader); - sourceMetric.sourceSuccessCount.incrementAndGet(); - return readerList; + protected void initSource(InstanceProfile profile) { + try { + LOGGER.info("SQLServerSource init: {}", profile.toJsonStr()); + debeziumQueue = new LinkedBlockingQueue<>(DEBEZIUM_QUEUE_SIZE); + + dbName = profile.get(TaskConstants.TASK_SQLSERVER_DBNAME); + schemaName = profile.get(TaskConstants.TASK_SQLSERVER_SCHEMA_NAME); + tableName = profile.get(TaskConstants.TASK_SQLSERVER_TABLE_NAME); + + props.setProperty("name", "SQLServer-" + instanceId); + props.setProperty("connector.class", SqlServerConnector.class.getName()); + props.setProperty("offset.storage", FileOffsetBackingStore.class.getName()); + String agentPath = AgentConfiguration.getAgentConf() + .get(AgentConstants.AGENT_HOME, AgentConstants.DEFAULT_AGENT_HOME); + String offsetPath = agentPath + "/" + getThreadName() + "/offset.dat"; + props.setProperty("offset.storage.file.filename", offsetPath); + props.setProperty("offset.flush.interval.ms", "10000"); + props.setProperty("database.history", FileDatabaseHistory.class.getCanonicalName()); + props.setProperty("database.history.file.filename", agentPath + "/" + getThreadName() + "/history.dat"); + // ignore "schema" and extract data from "payload" + props.setProperty("key.converter.schemas.enable", "false"); + props.setProperty("value.converter.schemas.enable", "false"); + // ignore ddl + props.setProperty("include.schema.changes", "false"); + // convert time to formatted string + props.setProperty("converters", "datetime"); + props.setProperty("datetime.type", "org.apache.inlong.agent.plugin.utils.SQLServerTimeConverter"); + props.setProperty("datetime.format.date", "yyyy-MM-dd"); + props.setProperty("datetime.format.time", "HH:mm:ss"); + props.setProperty("datetime.format.datetime", "yyyy-MM-dd HH:mm:ss"); + props.setProperty("datetime.format.timestamp", "yyyy-MM-dd HH:mm:ss"); Review Comment: DATE or DATETIME data coming from Debezium sql server connector are long type. But the Flink sort can't parse unix timestamp, only SQL and ISO-8601. So we should convert the time to "yyyy-MM-dd HH:mm:ss" and let flink sort can parse them -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org