zoy0 commented on code in PR #10735: URL: https://github.com/apache/inlong/pull/10735#discussion_r1696459899
########## inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLServerSource.java: ########## @@ -17,66 +17,180 @@ package org.apache.inlong.agent.plugin.sources; +import org.apache.inlong.agent.common.AgentThreadFactory; +import org.apache.inlong.agent.conf.AgentConfiguration; import org.apache.inlong.agent.conf.InstanceProfile; import org.apache.inlong.agent.conf.TaskProfile; -import org.apache.inlong.agent.plugin.Message; +import org.apache.inlong.agent.constant.AgentConstants; +import org.apache.inlong.agent.constant.SqlServerConstants; +import org.apache.inlong.agent.constant.TaskConstants; +import org.apache.inlong.agent.except.FileException; import org.apache.inlong.agent.plugin.file.Reader; import org.apache.inlong.agent.plugin.sources.file.AbstractSource; -import org.apache.inlong.agent.plugin.sources.reader.SQLServerReader; +import io.debezium.connector.sqlserver.SqlServerConnector; +import io.debezium.connector.sqlserver.SqlServerConnectorConfig; +import io.debezium.engine.ChangeEvent; +import io.debezium.engine.DebeziumEngine; +import io.debezium.engine.format.Json; +import io.debezium.engine.spi.OffsetCommitPolicy; +import io.debezium.relational.history.FileDatabaseHistory; +import org.apache.kafka.connect.storage.FileOffsetBackingStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import java.util.Properties; +import java.util.concurrent.*; /** * SQLServer source */ public class SQLServerSource extends AbstractSource { - private static final Logger logger = LoggerFactory.getLogger(SQLServerSource.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SQLServerSource.class); + private static final Integer DEBEZIUM_QUEUE_SIZE = 100; + private ExecutorService executor; + public InstanceProfile profile; + private BlockingQueue<SourceData> debeziumQueue; + private final Properties props = new Properties(); + private String dbName; + private String schemaName; + private String tableName; public SQLServerSource() { } - @Override - public List<Reader> split(TaskProfile conf) { - SQLServerReader sqlServerReader = new SQLServerReader(); - List<Reader> readerList = new ArrayList<>(); - readerList.add(sqlServerReader); - sourceMetric.sourceSuccessCount.incrementAndGet(); - return readerList; + protected void initSource(InstanceProfile profile) { + try { + LOGGER.info("SQLServerSource init: {}", profile.toJsonStr()); + debeziumQueue = new LinkedBlockingQueue<>(DEBEZIUM_QUEUE_SIZE); + + dbName = profile.get(TaskConstants.TASK_SQLSERVER_DBNAME); + schemaName = profile.get(TaskConstants.TASK_SQLSERVER_SCHEMA_NAME); + tableName = profile.get(TaskConstants.TASK_SQLSERVER_TABLE_NAME); + + props.setProperty("name", "SQLServer-" + instanceId); + props.setProperty("connector.class", SqlServerConnector.class.getName()); + props.setProperty("offset.storage", FileOffsetBackingStore.class.getName()); + String agentPath = AgentConfiguration.getAgentConf() + .get(AgentConstants.AGENT_HOME, AgentConstants.DEFAULT_AGENT_HOME); + String offsetPath = agentPath + "/" + getThreadName() + "/offset.dat"; + props.setProperty("offset.storage.file.filename", offsetPath); + props.setProperty("offset.flush.interval.ms", "10000"); + props.setProperty("database.history", FileDatabaseHistory.class.getCanonicalName()); + props.setProperty("database.history.file.filename", agentPath + "/" + getThreadName() + "/history.dat"); + // ignore "schema" and extract data from "payload" + props.setProperty("key.converter.schemas.enable", "false"); + props.setProperty("value.converter.schemas.enable", "false"); + // ignore ddl + props.setProperty("include.schema.changes", "false"); + // convert time to formatted string + props.setProperty("converters", "datetime"); + props.setProperty("datetime.type", "org.apache.inlong.agent.plugin.utils.SQLServerTimeConverter"); + props.setProperty("datetime.format.date", "yyyy-MM-dd"); + props.setProperty("datetime.format.time", "HH:mm:ss"); + props.setProperty("datetime.format.datetime", "yyyy-MM-dd HH:mm:ss"); + props.setProperty("datetime.format.timestamp", "yyyy-MM-dd HH:mm:ss"); Review Comment: The time data from debezium is timestamp. If not make special adjustments, sort-flink can't deserialize it success -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org