This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new cc3da3797 [INLONG-6176][Agent] Support collect data from Oracle (#6203) cc3da3797 is described below commit cc3da379709a767e277a756500371aa2333946bd Author: haibo.duan <dhaibo1...@live.cn> AuthorDate: Wed Nov 9 18:35:16 2022 +0800 [INLONG-6176][Agent] Support collect data from Oracle (#6203) --- .../inlong/agent/constant/OracleConstants.java | 50 ++ .../apache/inlong/agent/pojo/JobProfileDto.java | 53 +- .../org/apache/inlong/agent/pojo/OracleJob.java | 75 +++ inlong-agent/agent-plugins/pom.xml | 5 + .../inlong/agent/plugin/sources/OracleSource.java | 48 ++ .../plugin/sources/reader/AbstractReader.java | 35 ++ .../agent/plugin/sources/reader/BinlogReader.java | 32 +- .../agent/plugin/sources/reader/MongoDBReader.java | 32 +- .../{SQLServerReader.java => OracleReader.java} | 634 ++++++++++----------- .../plugin/sources/reader/PostgreSQLReader.java | 32 +- .../plugin/sources/reader/SQLServerReader.java | 34 +- .../sources/snapshot/OracleSnapshotBase.java | 52 ++ .../agent/plugin/sources/TestOracleConnect.java | 62 ++ .../agent/plugin/sources/TestOracleReader.java | 234 ++++++++ .../agent/plugin/sources/TestOracleSource.java | 90 +++ pom.xml | 7 + 16 files changed, 1037 insertions(+), 438 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/OracleConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/OracleConstants.java new file mode 100644 index 000000000..6ba84a7c3 --- /dev/null +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/OracleConstants.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.constant; + +public class OracleConstants { + + /** + * The snapshot includes the structure and data of the captured tables. + * Specify this value to populate topics with a complete representation of the data from the captured tables. + */ + public static final String INITIAL = "initial"; + + /** + * The snapshot includes the structure and data of the captured tables. + * The connector performs an initial snapshot and then stops, without processing any subsequent changes. + */ + public static final String INITIAL_ONLY = "initial_only"; + + /** + * The snapshot includes only the structure of captured tables. + * Specify this value if you want the connector to capture data only for changes that occur after the snapshot. + */ + public static final String SCHEMA_ONLY = "schema_only"; + + /** + * This is a recovery setting for a connector that has already been capturing changes. + * When you restart the connector, this setting enables recovery of a corrupted or lost database history topic. + * You might set it periodically to "clean up" a database history topic that has been growing unexpectedly. + * Database history topics require infinite retention. Note this mode is only safe to be used when it is guaranteed + * that no schema changes happened since the point in time the connector was shut down before and the point in time + * the snapshot is taken. + */ + public static final String SCHEMA_ONLY_RECOVERY = "schema_only_recovery"; + +} diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java index 810a42d2b..f967dd1a8 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java @@ -54,6 +54,10 @@ public class JobProfileDto { * mongo source */ public static final String MONGO_SOURCE = "org.apache.inlong.agent.plugin.sources.MongoDBSource"; + /** + * oracle source + */ + public static final String ORACLE_SOURCE = "org.apache.inlong.agent.plugin.sources.OracleSource"; /** * mqtt source */ @@ -230,10 +234,10 @@ public class JobProfileDto { return mongoJob; } - private static SqlServerJob getSqlServerJob(DataConfig dataConfigs) { - SqlServerJob.SqlserverJobConfig config = GSON.fromJson(dataConfigs.getExtParams(), - SqlServerJob.SqlserverJobConfig.class); - SqlServerJob oracleJob = new SqlServerJob(); + private static OracleJob getOracleJob(DataConfig dataConfigs) { + OracleJob.OracleJobConfig config = GSON.fromJson(dataConfigs.getExtParams(), + OracleJob.OracleJobConfig.class); + OracleJob oracleJob = new OracleJob(); oracleJob.setUser(config.getUser()); oracleJob.setHostname(config.getHostname()); oracleJob.setPassword(config.getPassword()); @@ -241,23 +245,51 @@ public class JobProfileDto { oracleJob.setServerName(config.getServerName()); oracleJob.setDbname(config.getDbname()); - SqlServerJob.Offset offset = new SqlServerJob.Offset(); + OracleJob.Offset offset = new OracleJob.Offset(); offset.setFilename(config.getOffsetFilename()); offset.setSpecificOffsetFile(config.getSpecificOffsetFile()); offset.setSpecificOffsetPos(config.getSpecificOffsetPos()); oracleJob.setOffset(offset); - SqlServerJob.Snapshot snapshot = new SqlServerJob.Snapshot(); + OracleJob.Snapshot snapshot = new OracleJob.Snapshot(); snapshot.setMode(config.getSnapshotMode()); oracleJob.setSnapshot(snapshot); - SqlServerJob.History history = new SqlServerJob.History(); + OracleJob.History history = new OracleJob.History(); history.setFilename(config.getHistoryFilename()); oracleJob.setHistory(history); return oracleJob; } + private static SqlServerJob getSqlServerJob(DataConfig dataConfigs) { + SqlServerJob.SqlserverJobConfig config = GSON.fromJson(dataConfigs.getExtParams(), + SqlServerJob.SqlserverJobConfig.class); + SqlServerJob sqlServerJob = new SqlServerJob(); + sqlServerJob.setUser(config.getUser()); + sqlServerJob.setHostname(config.getHostname()); + sqlServerJob.setPassword(config.getPassword()); + sqlServerJob.setPort(config.getPort()); + sqlServerJob.setServerName(config.getServerName()); + sqlServerJob.setDbname(config.getDbname()); + + SqlServerJob.Offset offset = new SqlServerJob.Offset(); + offset.setFilename(config.getOffsetFilename()); + offset.setSpecificOffsetFile(config.getSpecificOffsetFile()); + offset.setSpecificOffsetPos(config.getSpecificOffsetPos()); + sqlServerJob.setOffset(offset); + + SqlServerJob.Snapshot snapshot = new SqlServerJob.Snapshot(); + snapshot.setMode(config.getSnapshotMode()); + sqlServerJob.setSnapshot(snapshot); + + SqlServerJob.History history = new SqlServerJob.History(); + history.setFilename(config.getHistoryFilename()); + sqlServerJob.setHistory(history); + + return sqlServerJob; + } + public static MqttJob getMqttJob(DataConfig dataConfigs) { MqttJob.MqttJobConfig config = GSON.fromJson(dataConfigs.getExtParams(), MqttJob.MqttJobConfig.class); @@ -341,6 +373,12 @@ public class JobProfileDto { job.setSource(KAFKA_SOURCE); profileDto.setJob(job); break; + case ORACLE: + OracleJob oracleJob = getOracleJob(dataConfig); + job.setOracleJob(oracleJob); + job.setSource(ORACLE_SOURCE); + profileDto.setJob(job); + break; case SQLSERVER: SqlServerJob sqlserverJob = getSqlServerJob(dataConfig); job.setSqlserverJob(sqlserverJob); @@ -385,6 +423,7 @@ public class JobProfileDto { private FileJob fileJob; private BinlogJob binlogJob; private KafkaJob kafkaJob; + private OracleJob oracleJob; private MongoJob mongoJob; private MqttJob mqttJob; private SqlServerJob sqlserverJob; diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/OracleJob.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/OracleJob.java new file mode 100644 index 000000000..ef2420b3a --- /dev/null +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/OracleJob.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.pojo; + +import lombok.Data; + +@Data +public class OracleJob { + + private String hostname; + private String user; + private String password; + private String port; + private String serverName; + private String dbname; + + private OracleJob.Snapshot snapshot; + private OracleJob.Offset offset; + private OracleJob.History history; + + @Data + public static class Offset { + + private String intervalMs; + private String filename; + private String specificOffsetFile; + private String specificOffsetPos; + } + + @Data + public static class Snapshot { + + private String mode; + } + + @Data + public static class History { + + private String filename; + } + + @Data + public static class OracleJobConfig { + + private String hostname; + private String user; + private String password; + private String port; + private String dbname; + private String serverName; + + private String snapshotMode; + private String intervalMs; + private String offsetFilename; + private String historyFilename; + + private String specificOffsetFile; + private String specificOffsetPos; + } +} diff --git a/inlong-agent/agent-plugins/pom.xml b/inlong-agent/agent-plugins/pom.xml index fa8e03a41..85520f508 100644 --- a/inlong-agent/agent-plugins/pom.xml +++ b/inlong-agent/agent-plugins/pom.xml @@ -85,6 +85,11 @@ </exclusions> </dependency> + <dependency> + <groupId>io.debezium</groupId> + <artifactId>debezium-connector-oracle</artifactId> + </dependency> + <dependency> <groupId>io.debezium</groupId> <artifactId>debezium-connector-sqlserver</artifactId> diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java new file mode 100644 index 000000000..b78f18dac --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.sources; + +import org.apache.inlong.agent.conf.JobProfile; +import org.apache.inlong.agent.plugin.Reader; +import org.apache.inlong.agent.plugin.sources.reader.OracleReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +/** + * Oracle SQL source + */ +public class OracleSource extends AbstractSource { + + private static final Logger logger = LoggerFactory.getLogger(OracleSource.class); + + public OracleSource() { + } + + @Override + public List<Reader> split(JobProfile conf) { + super.init(conf); + Reader oracleReader = new OracleReader(); + List<Reader> readerList = new ArrayList<>(); + readerList.add(oracleReader); + sourceMetric.sourceSuccessCount.incrementAndGet(); + return readerList; + } +} diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java index 5736f21b9..115070dc4 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java @@ -21,8 +21,14 @@ import org.apache.inlong.agent.conf.JobProfile; import org.apache.inlong.agent.metrics.AgentMetricItem; import org.apache.inlong.agent.metrics.AgentMetricItemSet; import org.apache.inlong.agent.plugin.Reader; +import org.apache.inlong.agent.pojo.DebeziumOffset; +import org.apache.inlong.agent.utils.DebeziumOffsetSerializer; import org.apache.inlong.common.metric.MetricRegister; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; @@ -49,6 +55,8 @@ public abstract class AbstractReader implements Reader { protected String metricName; protected Map<String, String> dimensions; + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractReader.class); + @Override public void init(JobProfile jobConf) { inlongGroupId = jobConf.get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID); @@ -68,4 +76,31 @@ public abstract class AbstractReader implements Reader { public String getInlongGroupId() { return inlongGroupId; } + + /** + * specific offsets + * + * @param server specific server + * @param file specific offset file + * @param pos specific offset pos + * @return + */ + public String serializeOffset(final String server, final String file, + final String pos) { + Map<String, Object> sourceOffset = new HashMap<>(); + sourceOffset.put("file", file); + sourceOffset.put("pos", pos); + DebeziumOffset specificOffset = new DebeziumOffset(); + specificOffset.setSourceOffset(sourceOffset); + Map<String, String> sourcePartition = new HashMap<>(); + sourcePartition.put("server", server); + specificOffset.setSourcePartition(sourcePartition); + byte[] serializedOffset = new byte[0]; + try { + serializedOffset = DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset); + } catch (IOException e) { + LOGGER.error("serialize offset message error", e); + } + return new String(serializedOffset, StandardCharsets.UTF_8); + } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java index 38877951b..fa61c0a10 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java @@ -35,14 +35,11 @@ import org.apache.inlong.agent.plugin.sources.snapshot.BinlogSnapshotBase; import org.apache.inlong.agent.plugin.utils.InLongDatabaseHistory; import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore; import org.apache.inlong.agent.pojo.DebeziumFormat; -import org.apache.inlong.agent.pojo.DebeziumOffset; import org.apache.inlong.agent.utils.AgentUtils; -import org.apache.inlong.agent.utils.DebeziumOffsetSerializer; import org.apache.kafka.connect.storage.FileOffsetBackingStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; @@ -215,8 +212,13 @@ public class BinlogReader extends AbstractReader { props.setProperty("offset.storage.file.filename", offsetStoreFileName); props.setProperty("database.history.file.filename", databaseStoreHistoryName); if (SnapshotModeConstants.SPECIFIC_OFFSETS.equals(snapshotMode)) { + Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE, + JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + "shouldn't be null"); + Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS, + JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " shouldn't be null"); props.setProperty("offset.storage", InLongFileOffsetBackingStore.class.getCanonicalName()); - props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE, serializeOffset()); + props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE, + serializeOffset(instanceId, specificOffsetFile, specificOffsetPos)); props.setProperty("database.history", InLongDatabaseHistory.class.getCanonicalName()); } else { props.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName()); @@ -235,28 +237,6 @@ public class BinlogReader extends AbstractReader { return props; } - private String serializeOffset() { - Map<String, Object> sourceOffset = new HashMap<>(); - Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE, - JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + "shouldn't be null"); - sourceOffset.put("file", specificOffsetFile); - Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS, - JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " shouldn't be null"); - sourceOffset.put("pos", specificOffsetPos); - DebeziumOffset specificOffset = new DebeziumOffset(); - specificOffset.setSourceOffset(sourceOffset); - Map<String, String> sourcePartition = new HashMap<>(); - sourcePartition.put("server", instanceId); - specificOffset.setSourcePartition(sourcePartition); - byte[] serializedOffset = new byte[0]; - try { - serializedOffset = DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset); - } catch (IOException e) { - LOGGER.error("serialize offset message error", e); - } - return new String(serializedOffset, StandardCharsets.UTF_8); - } - @Override public void destroy() { synchronized (this) { diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java index 3abe037bb..eee9e4bea 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java @@ -35,15 +35,12 @@ import org.apache.inlong.agent.plugin.Message; import org.apache.inlong.agent.plugin.sources.snapshot.MongoDBSnapshotBase; import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore; import org.apache.inlong.agent.pojo.DebeziumFormat; -import org.apache.inlong.agent.pojo.DebeziumOffset; -import org.apache.inlong.agent.utils.DebeziumOffsetSerializer; import org.apache.inlong.agent.utils.GsonUtil; import org.apache.kafka.connect.storage.FileOffsetBackingStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; -import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.List; @@ -305,8 +302,13 @@ public class MongoDBReader extends AbstractReader { String snapshotMode = props.getOrDefault(JOB_MONGO_SNAPSHOT_MODE, "").toString(); if (Objects.equals(SnapshotModeConstants.INITIAL, snapshotMode)) { + Preconditions.checkNotNull(JOB_MONGO_OFFSET_SPECIFIC_OFFSET_FILE, + JOB_MONGO_OFFSET_SPECIFIC_OFFSET_FILE + " cannot be null"); + Preconditions.checkNotNull(JOB_MONGO_OFFSET_SPECIFIC_OFFSET_POS, + JOB_MONGO_OFFSET_SPECIFIC_OFFSET_POS + " cannot be null"); props.setProperty("offset.storage", InLongFileOffsetBackingStore.class.getCanonicalName()); - props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE, serializeOffset()); + props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE, + serializeOffset(instanceId, specificOffsetFile, specificOffsetPos)); } else { props.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName()); } @@ -325,28 +327,6 @@ public class MongoDBReader extends AbstractReader { builder.with(field, value); } - private String serializeOffset() { - Map<String, Object> sourceOffset = new HashMap<>(); - Preconditions.checkNotNull(JOB_MONGO_OFFSET_SPECIFIC_OFFSET_FILE, - JOB_MONGO_OFFSET_SPECIFIC_OFFSET_FILE + " cannot be null"); - sourceOffset.put("file", specificOffsetFile); - Preconditions.checkNotNull(JOB_MONGO_OFFSET_SPECIFIC_OFFSET_POS, - JOB_MONGO_OFFSET_SPECIFIC_OFFSET_POS + " cannot be null"); - sourceOffset.put("pos", specificOffsetPos); - DebeziumOffset specificOffset = new DebeziumOffset(); - specificOffset.setSourceOffset(sourceOffset); - Map<String, String> sourcePartition = new HashMap<>(); - sourcePartition.put("server", instanceId); - specificOffset.setSourcePartition(sourcePartition); - byte[] serializedOffset = new byte[0]; - try { - serializedOffset = DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset); - } catch (IOException e) { - LOGGER.error("serialize offset message error", e); - } - return new String(serializedOffset, StandardCharsets.UTF_8); - } - /** * Handles a batch of records, calling the {@link DebeziumEngine.RecordCommitter#markProcessed(Object)} * for each record and {@link DebeziumEngine.RecordCommitter#markBatchFinished()} when this batch is finished. diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/OracleReader.java similarity index 72% copy from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java copy to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/OracleReader.java index 079d9af07..bfc80542a 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/OracleReader.java @@ -1,326 +1,308 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.agent.plugin.sources.reader; - -import com.google.common.base.Preconditions; -import com.google.gson.Gson; -import io.debezium.connector.sqlserver.SqlServerConnector; -import io.debezium.engine.ChangeEvent; -import io.debezium.engine.DebeziumEngine; -import io.debezium.relational.history.FileDatabaseHistory; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.inlong.agent.conf.AgentConfiguration; -import org.apache.inlong.agent.conf.JobProfile; -import org.apache.inlong.agent.constant.AgentConstants; -import org.apache.inlong.agent.constant.SnapshotModeConstants; -import org.apache.inlong.agent.constant.SqlServerConstants; -import org.apache.inlong.agent.message.DefaultMessage; -import org.apache.inlong.agent.metrics.audit.AuditUtils; -import org.apache.inlong.agent.plugin.Message; -import org.apache.inlong.agent.plugin.sources.snapshot.SqlServerSnapshotBase; -import org.apache.inlong.agent.plugin.utils.InLongDatabaseHistory; -import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore; -import org.apache.inlong.agent.pojo.DebeziumFormat; -import org.apache.inlong.agent.pojo.DebeziumOffset; -import org.apache.inlong.agent.utils.AgentUtils; -import org.apache.inlong.agent.utils.DebeziumOffsetSerializer; -import org.apache.inlong.agent.utils.GsonUtil; -import org.apache.kafka.connect.storage.FileOffsetBackingStore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; -import java.util.Properties; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; - -import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_MAP_CAPACITY; -import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA; - -/** - * Read data from SQLServer database by Debezium - */ -public class SQLServerReader extends AbstractReader { - - public static final String SQLSERVER_READER_TAG_NAME = "AgentSQLServerMetric"; - public static final String JOB_DATABASE_HOSTNAME = "job.sqlserverJob.hostname"; - public static final String JOB_DATABASE_PORT = "job.sqlserverJob.port"; - public static final String JOB_DATABASE_USER = "job.sqlserverJob.user"; - public static final String JOB_DATABASE_PASSWORD = "job.sqlserverJob.password"; - public static final String JOB_DATABASE_DBNAME = "job.sqlserverJob.dbname"; - public static final String JOB_DATABASE_SNAPSHOT_MODE = "job.sqlserverJob.snapshot.mode"; - public static final String JOB_DATABASE_QUEUE_SIZE = "job.sqlserverJob.queueSize"; - public static final String JOB_DATABASE_OFFSETS = "job.sqlserverJob.offsets"; - public static final String JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE = "job.sqlserverJob.offset.specificOffsetFile"; - public static final String JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS = "job.sqlserverJob.offset.specificOffsetPos"; - - public static final String JOB_DATABASE_SERVER_NAME = "job.sqlserverJob.serverName"; - - public static final String JOB_DATABASE_STORE_OFFSET_INTERVAL_MS = "job.sqlserverJob.offset.intervalMs"; - public static final String JOB_DATABASE_STORE_HISTORY_FILENAME = "job.sqlserverJob.history.filename"; - - private static final Logger LOGGER = LoggerFactory.getLogger(SqlReader.class); - private final AgentConfiguration agentConf = AgentConfiguration.getAgentConf(); - - private static final Gson GSON = new Gson(); - - private String databaseStoreHistoryName; - private String instanceId; - private String dbName; - private String serverName; - private String userName; - private String password; - private String hostName; - private String port; - private String offsetFlushIntervalMs; - private String offsetStoreFileName; - private String snapshotMode; - private String offset; - private String specificOffsetFile; - private String specificOffsetPos; - - private ExecutorService executor; - private SqlServerSnapshotBase sqlServerSnapshot; - private boolean finished = false; - - private LinkedBlockingQueue<Pair<String, String>> sqlServerMessageQueue; - private JobProfile jobProfile; - private boolean destroyed = false; - - public SQLServerReader() { - - } - - @Override - public Message read() { - if (!sqlServerMessageQueue.isEmpty()) { - return getSqlServerMessage(); - } else { - return null; - } - } - - /** - * poll message from buffer pool - * - * @return org.apache.inlong.agent.plugin.Message - */ - private DefaultMessage getSqlServerMessage() { - // Retrieves and removes the head of this queue, - // or returns null if this queue is empty. - Pair<String, String> message = sqlServerMessageQueue.poll(); - if (Objects.isNull(message)) { - return null; - } - Map<String, String> header = new HashMap<>(DEFAULT_MAP_CAPACITY); - header.put(PROXY_KEY_DATA, message.getKey()); - return new DefaultMessage(GsonUtil.toJson(message.getValue()).getBytes(StandardCharsets.UTF_8), header); - } - - public boolean isDestroyed() { - return destroyed; - } - - @Override - public boolean isFinished() { - return finished; - } - - @Override - public String getReadSource() { - return instanceId; - } - - @Override - public void setReadTimeout(long mill) { - - } - - @Override - public void setWaitMillisecond(long millis) { - - } - - @Override - public String getSnapshot() { - if (sqlServerSnapshot != null) { - return sqlServerSnapshot.getSnapshot(); - } else { - return StringUtils.EMPTY; - } - } - - @Override - public void finishRead() { - this.finished = true; - } - - @Override - public boolean isSourceExist() { - return true; - } - - private String tryToInitAndGetHistoryPath() { - String historyPath = agentConf.get( - AgentConstants.AGENT_HISTORY_PATH, AgentConstants.DEFAULT_AGENT_HISTORY_PATH); - String parentPath = agentConf.get( - AgentConstants.AGENT_HOME, AgentConstants.DEFAULT_AGENT_HOME); - return AgentUtils.makeDirsIfNotExist(historyPath, parentPath).getAbsolutePath(); - } - - @Override - public void init(JobProfile jobConf) { - super.init(jobConf); - jobProfile = jobConf; - LOGGER.info("init SqlServer reader with jobConf {}", jobConf.toJsonStr()); - userName = jobConf.get(JOB_DATABASE_USER); - password = jobConf.get(JOB_DATABASE_PASSWORD); - hostName = jobConf.get(JOB_DATABASE_HOSTNAME); - port = jobConf.get(JOB_DATABASE_PORT); - dbName = jobConf.get(JOB_DATABASE_DBNAME); - serverName = jobConf.get(JOB_DATABASE_SERVER_NAME); - instanceId = jobConf.getInstanceId(); - offsetFlushIntervalMs = jobConf.get(JOB_DATABASE_STORE_OFFSET_INTERVAL_MS, "100000"); - offsetStoreFileName = jobConf.get(JOB_DATABASE_STORE_HISTORY_FILENAME, - tryToInitAndGetHistoryPath()) + "/offset.dat" + instanceId; - snapshotMode = jobConf.get(JOB_DATABASE_SNAPSHOT_MODE, SqlServerConstants.INITIAL); - sqlServerMessageQueue = new LinkedBlockingQueue<>(jobConf.getInt(JOB_DATABASE_QUEUE_SIZE, 1000)); - finished = false; - - databaseStoreHistoryName = jobConf.get(JOB_DATABASE_STORE_HISTORY_FILENAME, - tryToInitAndGetHistoryPath()) + "/history.dat" + jobConf.getInstanceId(); - offset = jobConf.get(JOB_DATABASE_OFFSETS, ""); - specificOffsetFile = jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE, ""); - specificOffsetPos = jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS, "-1"); - - sqlServerSnapshot = new SqlServerSnapshotBase(offsetStoreFileName); - sqlServerSnapshot.save(offset, sqlServerSnapshot.getFile()); - - Properties props = getEngineProps(); - - DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create( - io.debezium.engine.format.Json.class) - .using(props) - .notifying((records, committer) -> { - try { - for (ChangeEvent<String, String> record : records) { - DebeziumFormat debeziumFormat = GSON - .fromJson(record.value(), DebeziumFormat.class); - sqlServerMessageQueue.put(Pair.of(debeziumFormat.getSource().getTable(), record.value())); - committer.markProcessed(record); - } - committer.markBatchFinished(); - long dataSize = records.stream().mapToLong(c -> c.value().length()).sum(); - AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId, - System.currentTimeMillis(), records.size(), dataSize); - readerMetric.pluginReadSuccessCount.addAndGet(records.size()); - readerMetric.pluginReadCount.addAndGet(records.size()); - } catch (Exception e) { - readerMetric.pluginReadFailCount.addAndGet(records.size()); - readerMetric.pluginReadCount.addAndGet(records.size()); - LOGGER.error("parse SqlServer message error", e); - } - }) - .using((success, message, error) -> { - if (!success) { - LOGGER.error("SqlServer job with jobConf {} has error {}", instanceId, message, error); - } - }).build(); - - executor = Executors.newSingleThreadExecutor(); - executor.execute(engine); - - LOGGER.info("get initial snapshot of job {}, snapshot {}", instanceId, getSnapshot()); - } - - private String serializeOffset() { - Map<String, Object> sourceOffset = new HashMap<>(); - Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE, - JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + "shouldn't be null"); - sourceOffset.put("file", specificOffsetFile); - Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS, - JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " shouldn't be null"); - sourceOffset.put("pos", specificOffsetPos); - DebeziumOffset specificOffset = new DebeziumOffset(); - specificOffset.setSourceOffset(sourceOffset); - Map<String, String> sourcePartition = new HashMap<>(); - sourcePartition.put("server", instanceId); - specificOffset.setSourcePartition(sourcePartition); - byte[] serializedOffset = new byte[0]; - try { - serializedOffset = DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset); - } catch (IOException e) { - LOGGER.error("serialize offset message error", e); - } - return new String(serializedOffset, StandardCharsets.UTF_8); - } - - private Properties getEngineProps() { - Properties props = new Properties(); - props.setProperty("name", "engine" + instanceId); - props.setProperty("connector.class", SqlServerConnector.class.getCanonicalName()); - props.setProperty("database.hostname", hostName); - props.setProperty("database.port", port); - props.setProperty("database.user", userName); - props.setProperty("database.password", password); - props.setProperty("database.dbname", dbName); - props.setProperty("database.server.name", serverName); - props.setProperty("offset.flush.interval.ms", offsetFlushIntervalMs); - props.setProperty("database.snapshot.mode", snapshotMode); - props.setProperty("key.converter.schemas.enable", "false"); - props.setProperty("value.converter.schemas.enable", "false"); - props.setProperty("snapshot.mode", snapshotMode); - props.setProperty("offset.storage.file.filename", offsetStoreFileName); - props.setProperty("database.history.file.filename", databaseStoreHistoryName); - if (SnapshotModeConstants.SPECIFIC_OFFSETS.equals(snapshotMode)) { - props.setProperty("offset.storage", InLongFileOffsetBackingStore.class.getCanonicalName()); - props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE, serializeOffset()); - props.setProperty("database.history", InLongDatabaseHistory.class.getCanonicalName()); - } else { - props.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName()); - props.setProperty("database.history", FileDatabaseHistory.class.getCanonicalName()); - } - props.setProperty("tombstones.on.delete", "false"); - props.setProperty("converters", "datetime"); - props.setProperty("datetime.type", "org.apache.inlong.agent.plugin.utils.BinlogTimeConverter"); - props.setProperty("datetime.format.date", "yyyy-MM-dd"); - props.setProperty("datetime.format.time", "HH:mm:ss"); - props.setProperty("datetime.format.datetime", "yyyy-MM-dd HH:mm:ss"); - props.setProperty("datetime.format.timestamp", "yyyy-MM-dd HH:mm:ss"); - - LOGGER.info("SqlServer job {} start with props {}", jobProfile.getInstanceId(), props); - return props; - } - - @Override - public void destroy() { - synchronized (this) { - if (!destroyed) { - this.executor.shutdownNow(); - this.sqlServerSnapshot.close(); - this.destroyed = true; - } - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.sources.reader; + +import com.google.common.base.Preconditions; +import com.google.gson.Gson; +import io.debezium.connector.oracle.OracleConnector; +import io.debezium.engine.ChangeEvent; +import io.debezium.engine.DebeziumEngine; +import io.debezium.relational.history.FileDatabaseHistory; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.inlong.agent.conf.AgentConfiguration; +import org.apache.inlong.agent.conf.JobProfile; +import org.apache.inlong.agent.constant.AgentConstants; +import org.apache.inlong.agent.constant.OracleConstants; +import org.apache.inlong.agent.constant.SnapshotModeConstants; +import org.apache.inlong.agent.message.DefaultMessage; +import org.apache.inlong.agent.metrics.audit.AuditUtils; +import org.apache.inlong.agent.plugin.Message; +import org.apache.inlong.agent.plugin.sources.snapshot.OracleSnapshotBase; +import org.apache.inlong.agent.plugin.utils.InLongDatabaseHistory; +import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore; +import org.apache.inlong.agent.pojo.DebeziumFormat; +import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.agent.utils.GsonUtil; +import org.apache.kafka.connect.storage.FileOffsetBackingStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; + +import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_MAP_CAPACITY; +import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA; + +/** + * Read data from Oracle database by Debezium + */ +public class OracleReader extends AbstractReader { + + public static final String ORACLE_READER_TAG_NAME = "AgentOracleMetric"; + public static final String JOB_DATABASE_USER = "job.oracleJob.user"; + public static final String JOB_DATABASE_PASSWORD = "job.oracleJob.password"; + public static final String JOB_DATABASE_HOSTNAME = "job.oracleJob.hostname"; + public static final String JOB_DATABASE_PORT = "job.oracleJob.port"; + public static final String JOB_DATABASE_SNAPSHOT_MODE = "job.oracleJob.snapshot.mode"; + public static final String JOB_DATABASE_SERVER_NAME = "job.oracleJob.serverName"; + public static final String JOB_DATABASE_QUEUE_SIZE = "job.oracleJob.queueSize"; + public static final String JOB_DATABASE_OFFSETS = "job.oracleJob.offsets"; + public static final String JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE = "job.oracleJob.offset.specificOffsetFile"; + public static final String JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS = "job.oracleJob.offset.specificOffsetPos"; + public static final String JOB_DATABASE_DBNAME = "job.oracleJob.dbname"; + public static final String JOB_DATABASE_STORE_OFFSET_INTERVAL_MS = "job.oracleJob.offset.intervalMs"; + public static final String JOB_DATABASE_STORE_HISTORY_FILENAME = "job.oracleJob.history.filename"; + + private static final Gson GSON = new Gson(); + public static final String ORACLE = "oracle"; + + private static final Logger LOGGER = LoggerFactory.getLogger(SqlReader.class); + + private final AgentConfiguration agentConf = AgentConfiguration.getAgentConf(); + + private String databaseStoreHistoryName; + private String instanceId; + private String dbName; + private String serverName; + private String userName; + private String password; + private String hostName; + private String port; + private String offsetFlushIntervalMs; + private String offsetStoreFileName; + private String snapshotMode; + private String offset; + private String specificOffsetFile; + private String specificOffsetPos; + private OracleSnapshotBase oracleSnapshot; + private boolean finished = false; + private ExecutorService executor; + + /** + * pair.left : table name + * pair.right : actual data + */ + private LinkedBlockingQueue<Pair<String, String>> oracleMessageQueue; + private JobProfile jobProfile; + private boolean destroyed = false; + + public OracleReader() { + } + + @Override + public Message read() { + if (!oracleMessageQueue.isEmpty()) { + return getOracleMessage(); + } else { + return null; + } + } + + /** + * poll message from buffer pool + * + * @return org.apache.inlong.agent.plugin.Message + */ + private DefaultMessage getOracleMessage() { + // Retrieves and removes the head of this queue, + // or returns null if this queue is empty. + Pair<String, String> message = oracleMessageQueue.poll(); + if (Objects.isNull(message)) { + return null; + } + Map<String, String> header = new HashMap<>(DEFAULT_MAP_CAPACITY); + header.put(PROXY_KEY_DATA, message.getKey()); + return new DefaultMessage(GsonUtil.toJson(message.getValue()).getBytes(StandardCharsets.UTF_8), header); + } + + public boolean isDestroyed() { + return destroyed; + } + + @Override + public boolean isFinished() { + return finished; + } + + @Override + public String getReadSource() { + return instanceId; + } + + @Override + public void setReadTimeout(long mill) { + + } + + @Override + public void setWaitMillisecond(long millis) { + + } + + @Override + public String getSnapshot() { + if (oracleSnapshot != null) { + return oracleSnapshot.getSnapshot(); + } else { + return StringUtils.EMPTY; + } + } + + @Override + public void finishRead() { + this.finished = true; + } + + @Override + public boolean isSourceExist() { + return true; + } + + private String tryToInitAndGetHistoryPath() { + String historyPath = agentConf.get( + AgentConstants.AGENT_HISTORY_PATH, AgentConstants.DEFAULT_AGENT_HISTORY_PATH); + String parentPath = agentConf.get( + AgentConstants.AGENT_HOME, AgentConstants.DEFAULT_AGENT_HOME); + return AgentUtils.makeDirsIfNotExist(historyPath, parentPath).getAbsolutePath(); + } + + @Override + public void init(JobProfile jobConf) { + super.init(jobConf); + jobProfile = jobConf; + LOGGER.info("init oracle reader with jobConf {}", jobConf.toJsonStr()); + userName = jobConf.get(JOB_DATABASE_USER); + password = jobConf.get(JOB_DATABASE_PASSWORD); + hostName = jobConf.get(JOB_DATABASE_HOSTNAME); + port = jobConf.get(JOB_DATABASE_PORT); + dbName = jobConf.get(JOB_DATABASE_DBNAME); + serverName = jobConf.get(JOB_DATABASE_SERVER_NAME); + instanceId = jobConf.getInstanceId(); + offsetFlushIntervalMs = jobConf.get(JOB_DATABASE_STORE_OFFSET_INTERVAL_MS, "100000"); + offsetStoreFileName = jobConf.get(JOB_DATABASE_STORE_HISTORY_FILENAME, + tryToInitAndGetHistoryPath()) + "/offset.dat" + instanceId; + snapshotMode = jobConf.get(JOB_DATABASE_SNAPSHOT_MODE, OracleConstants.INITIAL); + oracleMessageQueue = new LinkedBlockingQueue<>(jobConf.getInt(JOB_DATABASE_QUEUE_SIZE, 1000)); + finished = false; + + databaseStoreHistoryName = jobConf.get(JOB_DATABASE_STORE_HISTORY_FILENAME, + tryToInitAndGetHistoryPath()) + "/history.dat" + jobConf.getInstanceId(); + offset = jobConf.get(JOB_DATABASE_OFFSETS, ""); + specificOffsetFile = jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE, ""); + specificOffsetPos = jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS, "-1"); + + oracleSnapshot = new OracleSnapshotBase(offsetStoreFileName); + oracleSnapshot.save(offset, oracleSnapshot.getFile()); + + Properties props = getEngineProps(); + + DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create( + io.debezium.engine.format.Json.class) + .using(props) + .notifying((records, committer) -> { + try { + for (ChangeEvent<String, String> record : records) { + DebeziumFormat debeziumFormat = GSON + .fromJson(record.value(), DebeziumFormat.class); + oracleMessageQueue.put(Pair.of(debeziumFormat.getSource().getTable(), record.value())); + committer.markProcessed(record); + } + committer.markBatchFinished(); + long dataSize = records.stream().mapToLong(c -> c.value().length()).sum(); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId, + System.currentTimeMillis(), records.size(), dataSize); + readerMetric.pluginReadSuccessCount.addAndGet(records.size()); + readerMetric.pluginReadCount.addAndGet(records.size()); + } catch (Exception e) { + readerMetric.pluginReadFailCount.addAndGet(records.size()); + readerMetric.pluginReadCount.addAndGet(records.size()); + LOGGER.error("parse binlog message error", e); + } + }) + .using((success, message, error) -> { + if (!success) { + LOGGER.error("oracle job with jobConf {} has error {}", instanceId, message, error); + } + }).build(); + + executor = Executors.newSingleThreadExecutor(); + executor.execute(engine); + + LOGGER.info("get initial snapshot of job {}, snapshot {}", instanceId, getSnapshot()); + } + + private Properties getEngineProps() { + Properties props = new Properties(); + props.setProperty("name", "engine" + instanceId); + props.setProperty("connector.class", OracleConnector.class.getCanonicalName()); + props.setProperty("database.hostname", hostName); + props.setProperty("database.port", port); + props.setProperty("database.user", userName); + props.setProperty("database.password", password); + props.setProperty("database.dbname", dbName); + props.setProperty("database.server.name", serverName); + props.setProperty("offset.flush.interval.ms", offsetFlushIntervalMs); + props.setProperty("database.snapshot.mode", snapshotMode); + props.setProperty("key.converter.schemas.enable", "false"); + props.setProperty("value.converter.schemas.enable", "false"); + props.setProperty("snapshot.mode", snapshotMode); + props.setProperty("offset.storage.file.filename", offsetStoreFileName); + props.setProperty("database.history.file.filename", databaseStoreHistoryName); + if (SnapshotModeConstants.SPECIFIC_OFFSETS.equals(snapshotMode)) { + Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE, + JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + " cannot be null"); + Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS, + JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " cannot be null"); + props.setProperty("offset.storage", InLongFileOffsetBackingStore.class.getCanonicalName()); + props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE, + serializeOffset(instanceId, specificOffsetFile, specificOffsetPos)); + props.setProperty("database.history", InLongDatabaseHistory.class.getCanonicalName()); + } else { + props.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName()); + props.setProperty("database.history", FileDatabaseHistory.class.getCanonicalName()); + } + props.setProperty("tombstones.on.delete", "false"); + props.setProperty("converters", "datetime"); + props.setProperty("datetime.type", "org.apache.inlong.agent.plugin.utils.BinlogTimeConverter"); + props.setProperty("datetime.format.date", "yyyy-MM-dd"); + props.setProperty("datetime.format.time", "HH:mm:ss"); + props.setProperty("datetime.format.datetime", "yyyy-MM-dd HH:mm:ss"); + props.setProperty("datetime.format.timestamp", "yyyy-MM-dd HH:mm:ss"); + + LOGGER.info("oracle job {} start with props {}", jobProfile.getInstanceId(), props); + return props; + } + + @Override + public void destroy() { + synchronized (this) { + if (!destroyed) { + this.executor.shutdownNow(); + this.oracleSnapshot.close(); + this.destroyed = true; + } + } + } +} diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java index 23bfa1fbe..38f02e277 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java @@ -34,14 +34,11 @@ import org.apache.inlong.agent.plugin.Message; import org.apache.inlong.agent.plugin.sources.snapshot.PostgreSQLSnapshotBase; import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore; import org.apache.inlong.agent.pojo.DebeziumFormat; -import org.apache.inlong.agent.pojo.DebeziumOffset; import org.apache.inlong.agent.utils.AgentUtils; -import org.apache.inlong.agent.utils.DebeziumOffsetSerializer; import org.apache.kafka.connect.storage.FileOffsetBackingStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; @@ -212,8 +209,13 @@ public class PostgreSQLReader extends AbstractReader { props.setProperty("snapshot.mode", snapshotMode); props.setProperty("offset.storage.file.filename", offsetStoreFileName); if (PostgreSQLConstants.CUSTOM.equals(snapshotMode)) { + Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE, + JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + " cannot be null"); + Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS, + JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " cannot be null"); props.setProperty("offset.storage", InLongFileOffsetBackingStore.class.getCanonicalName()); - props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE, serializeOffset()); + props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE, + serializeOffset(instanceId, specificOffsetFile, specificOffsetPos)); } else { props.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName()); } @@ -229,28 +231,6 @@ public class PostgreSQLReader extends AbstractReader { return props; } - private String serializeOffset() { - Map<String, Object> sourceOffset = new HashMap<>(); - Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE, - JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + " cannot be null"); - sourceOffset.put("file", specificOffsetFile); - Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS, - JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " cannot be null"); - sourceOffset.put("pos", specificOffsetPos); - DebeziumOffset specificOffset = new DebeziumOffset(); - specificOffset.setSourceOffset(sourceOffset); - Map<String, String> sourcePartition = new HashMap<>(); - sourcePartition.put("server", instanceId); - specificOffset.setSourcePartition(sourcePartition); - byte[] serializedOffset = new byte[0]; - try { - serializedOffset = DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset); - } catch (IOException e) { - LOGGER.error("serialize offset message error", e); - } - return new String(serializedOffset, StandardCharsets.UTF_8); - } - @Override public void destroy() { synchronized (this) { diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java index 079d9af07..3a6eeafcc 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/SQLServerReader.java @@ -37,15 +37,12 @@ import org.apache.inlong.agent.plugin.sources.snapshot.SqlServerSnapshotBase; import org.apache.inlong.agent.plugin.utils.InLongDatabaseHistory; import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore; import org.apache.inlong.agent.pojo.DebeziumFormat; -import org.apache.inlong.agent.pojo.DebeziumOffset; import org.apache.inlong.agent.utils.AgentUtils; -import org.apache.inlong.agent.utils.DebeziumOffsetSerializer; import org.apache.inlong.agent.utils.GsonUtil; import org.apache.kafka.connect.storage.FileOffsetBackingStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; @@ -80,7 +77,7 @@ public class SQLServerReader extends AbstractReader { public static final String JOB_DATABASE_STORE_OFFSET_INTERVAL_MS = "job.sqlserverJob.offset.intervalMs"; public static final String JOB_DATABASE_STORE_HISTORY_FILENAME = "job.sqlserverJob.history.filename"; - private static final Logger LOGGER = LoggerFactory.getLogger(SqlReader.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SQLServerReader.class); private final AgentConfiguration agentConf = AgentConfiguration.getAgentConf(); private static final Gson GSON = new Gson(); @@ -254,28 +251,6 @@ public class SQLServerReader extends AbstractReader { LOGGER.info("get initial snapshot of job {}, snapshot {}", instanceId, getSnapshot()); } - private String serializeOffset() { - Map<String, Object> sourceOffset = new HashMap<>(); - Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE, - JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + "shouldn't be null"); - sourceOffset.put("file", specificOffsetFile); - Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS, - JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " shouldn't be null"); - sourceOffset.put("pos", specificOffsetPos); - DebeziumOffset specificOffset = new DebeziumOffset(); - specificOffset.setSourceOffset(sourceOffset); - Map<String, String> sourcePartition = new HashMap<>(); - sourcePartition.put("server", instanceId); - specificOffset.setSourcePartition(sourcePartition); - byte[] serializedOffset = new byte[0]; - try { - serializedOffset = DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset); - } catch (IOException e) { - LOGGER.error("serialize offset message error", e); - } - return new String(serializedOffset, StandardCharsets.UTF_8); - } - private Properties getEngineProps() { Properties props = new Properties(); props.setProperty("name", "engine" + instanceId); @@ -294,8 +269,13 @@ public class SQLServerReader extends AbstractReader { props.setProperty("offset.storage.file.filename", offsetStoreFileName); props.setProperty("database.history.file.filename", databaseStoreHistoryName); if (SnapshotModeConstants.SPECIFIC_OFFSETS.equals(snapshotMode)) { + Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE, + JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE + " cannot be null"); + Preconditions.checkNotNull(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS, + JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS + " cannot be null"); props.setProperty("offset.storage", InLongFileOffsetBackingStore.class.getCanonicalName()); - props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE, serializeOffset()); + props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE, + serializeOffset(instanceId, specificOffsetFile, specificOffsetPos)); props.setProperty("database.history", InLongDatabaseHistory.class.getCanonicalName()); } else { props.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName()); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/OracleSnapshotBase.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/OracleSnapshotBase.java new file mode 100644 index 000000000..4c19c3eea --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/OracleSnapshotBase.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.sources.snapshot; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; + +/** + * Oracle Snapshot + */ +public class OracleSnapshotBase extends AbstractSnapshot { + + private static final Logger LOGGER = LoggerFactory.getLogger(OracleSnapshotBase.class); + private final File file; + + public OracleSnapshotBase(String filePath) { + file = new File(filePath); + } + + @Override + public String getSnapshot() { + byte[] offset = this.load(this.file); + return ENCODER.encodeToString(offset); + } + + @Override + public void close() { + + } + + public File getFile() { + return file; + } + +} diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleConnect.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleConnect.java new file mode 100644 index 000000000..f577ed5db --- /dev/null +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleConnect.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.sources; + +import org.apache.inlong.agent.conf.JobProfile; +import org.apache.inlong.agent.constant.JobConstants; +import org.apache.inlong.agent.plugin.Message; +import org.apache.inlong.agent.plugin.sources.reader.OracleReader; +import org.junit.Ignore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.UUID; + +import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID; +import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID; + +/** + * Test cases for {@link OracleReader}. + */ +public class TestOracleConnect { + + private static final Logger LOGGER = LoggerFactory.getLogger(TestOracleConnect.class); + + @Ignore + public void testOracle() { + JobProfile jobProfile = new JobProfile(); + jobProfile.set("job.oracleJob.hostname", "localhost"); + jobProfile.set("job.oracleJob.port", "1521"); + jobProfile.set("job.oracleJob.user", "c##dbzuser"); + jobProfile.set("job.oracleJob.password", "dbz"); + jobProfile.set("job.oracleJob.sid", "ORCLCDB"); + jobProfile.set("job.oracleJob.dbname", "ORCLCDB"); + jobProfile.set("job.oracleJob.serverName", "server1"); + jobProfile.set(JobConstants.JOB_INSTANCE_ID, UUID.randomUUID().toString()); + jobProfile.set(PROXY_INLONG_GROUP_ID, UUID.randomUUID().toString()); + jobProfile.set(PROXY_INLONG_STREAM_ID, UUID.randomUUID().toString()); + OracleReader oracleReader = new OracleReader(); + oracleReader.init(jobProfile); + while (true) { + Message message = oracleReader.read(); + if (message != null) { + LOGGER.info("event content: {}", message); + } + } + } +} diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleReader.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleReader.java new file mode 100644 index 000000000..db24e3acc --- /dev/null +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleReader.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.sources; + +import io.debezium.engine.ChangeEvent; +import io.debezium.engine.DebeziumEngine; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.inlong.agent.conf.JobProfile; +import org.apache.inlong.agent.constant.CommonConstants; +import org.apache.inlong.agent.constant.OracleConstants; +import org.apache.inlong.agent.metrics.AgentMetricItem; +import org.apache.inlong.agent.metrics.AgentMetricItemSet; +import org.apache.inlong.agent.plugin.Message; +import org.apache.inlong.agent.plugin.sources.reader.OracleReader; +import org.apache.inlong.agent.plugin.sources.snapshot.OracleSnapshotBase; +import org.apache.inlong.common.metric.MetricRegister; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.field; +import static org.powermock.api.mockito.PowerMockito.mockStatic; +import static org.powermock.api.mockito.PowerMockito.when; +import static org.powermock.api.mockito.PowerMockito.whenNew; + +/** + * Test cases for {@link OracleReader}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({DebeziumEngine.class, Executors.class, MetricRegister.class, OracleReader.class}) +@PowerMockIgnore({"javax.management.*"}) +public class TestOracleReader { + + private OracleReader reader; + + @Mock + private JobProfile jobProfile; + + @Mock + private AgentMetricItemSet agentMetricItemSet; + + @Mock + private AgentMetricItem agentMetricItem; + + @Mock + private OracleSnapshotBase oracleSnapshot; + + @Mock + private DebeziumEngine.Builder builder; + + @Mock + private ExecutorService executorService; + + @Mock + private LinkedBlockingQueue<Pair<String, String>> oracleMessageQueue; + + @Mock + private DebeziumEngine<ChangeEvent<String, String>> engine; + + private AtomicLong atomicLong; + + private AtomicLong atomicCountLong; + + private final String instanceId = "s4bc475560b4444dbd4e9812ab1fd64d"; + + @Before + public void setUp() throws Exception { + final String username = "sa"; + final String password = "123456"; + final String hostname = "127.0.0.1"; + final String port = "1434"; + final String groupId = "group01"; + final String streamId = "stream01"; + final String dbName = "testdb"; + final String serverName = "serverName"; + final String offsetFlushIntervalMs = "1000"; + final String offsetStoreFileName = "/opt/offset.dat"; + final String snapshotMode = OracleConstants.INITIAL; + final int queueSize = 1000; + final String databaseStoreHistoryName = "/opt/history.dat"; + final String offset = "111"; + final String specificOffsetFile = ""; + final String specificOffsetPos = "-1"; + + atomicLong = new AtomicLong(0L); + atomicCountLong = new AtomicLong(0L); + + when(jobProfile.getInstanceId()).thenReturn(instanceId); + when(jobProfile.get(eq(CommonConstants.PROXY_INLONG_GROUP_ID), anyString())).thenReturn(groupId); + when(jobProfile.get(eq(CommonConstants.PROXY_INLONG_STREAM_ID), anyString())).thenReturn(streamId); + when(jobProfile.get(eq(OracleReader.JOB_DATABASE_USER))).thenReturn(username); + when(jobProfile.get(eq(OracleReader.JOB_DATABASE_PASSWORD))).thenReturn(password); + when(jobProfile.get(eq(OracleReader.JOB_DATABASE_HOSTNAME))).thenReturn(hostname); + when(jobProfile.get(eq(OracleReader.JOB_DATABASE_PORT))).thenReturn(port); + when(jobProfile.get(eq(OracleReader.JOB_DATABASE_DBNAME))).thenReturn(dbName); + when(jobProfile.get(eq(OracleReader.JOB_DATABASE_SERVER_NAME))).thenReturn(serverName); + when(jobProfile.get(eq(OracleReader.JOB_DATABASE_STORE_OFFSET_INTERVAL_MS), anyString())).thenReturn( + offsetFlushIntervalMs); + when(jobProfile.get(eq(OracleReader.JOB_DATABASE_STORE_HISTORY_FILENAME), anyString())).thenReturn( + offsetStoreFileName); + when(jobProfile.get(eq(OracleReader.JOB_DATABASE_SNAPSHOT_MODE), anyString())).thenReturn(snapshotMode); + when(jobProfile.getInt(eq(OracleReader.JOB_DATABASE_QUEUE_SIZE), anyInt())).thenReturn(queueSize); + when(jobProfile.get(eq(OracleReader.JOB_DATABASE_STORE_HISTORY_FILENAME))).thenReturn(databaseStoreHistoryName); + when(jobProfile.get(eq(OracleReader.JOB_DATABASE_OFFSETS), anyString())).thenReturn(offset); + when(jobProfile.get(eq(OracleReader.JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE), anyString())).thenReturn( + specificOffsetFile); + when(jobProfile.get(eq(OracleReader.JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS), anyString())).thenReturn( + specificOffsetPos); + whenNew(OracleSnapshotBase.class).withAnyArguments().thenReturn(oracleSnapshot); + + //mock oracleMessageQueue + whenNew(LinkedBlockingQueue.class).withAnyArguments().thenReturn(oracleMessageQueue); + + //mock DebeziumEngine + mockStatic(DebeziumEngine.class); + when(DebeziumEngine.create(io.debezium.engine.format.Json.class)).thenReturn(builder); + when(builder.using(any(Properties.class))).thenReturn(builder); + when(builder.notifying(any(DebeziumEngine.ChangeConsumer.class))).thenReturn(builder); + when(builder.using(any(DebeziumEngine.CompletionCallback.class))).thenReturn(builder); + when(builder.build()).thenReturn(engine); + + //mock executorService + mockStatic(Executors.class); + when(Executors.newSingleThreadExecutor()).thenReturn(executorService); + + //mock metrics + whenNew(AgentMetricItemSet.class).withArguments(anyString()).thenReturn(agentMetricItemSet); + when(agentMetricItemSet.findMetricItem(any())).thenReturn(agentMetricItem); + field(AgentMetricItem.class, "pluginReadCount").set(agentMetricItem, atomicLong); + field(AgentMetricItem.class, "pluginReadSuccessCount").set(agentMetricItem, atomicCountLong); + + //init method + mockStatic(MetricRegister.class); + (reader = new OracleReader()).init(jobProfile); + } + + /** + * Test cases for {@link OracleReader#read()}. + */ + @Test + public void testRead() throws Exception { + final String right = "value"; + final String left = "key"; + final String dataKey = "dataKey"; + when(oracleMessageQueue.isEmpty()).thenReturn(true); + assertEquals(null, reader.read()); + when(oracleMessageQueue.isEmpty()).thenReturn(false); + when(oracleMessageQueue.poll()).thenReturn(Pair.of(left, right)); + Message result = reader.read(); + assertEquals(String.join(right, "\"", "\""), result.toString()); + assertEquals(left, result.getHeader().get(dataKey)); + } + + /** + * Test cases for {@link OracleReader#destroy()}. + */ + @Test + public void testDestroy() throws Exception { + assertFalse(reader.isDestroyed()); + reader.destroy(); + verify(executorService).shutdownNow(); + verify(oracleSnapshot).close(); + assertTrue(reader.isDestroyed()); + } + + /** + * Test cases for {@link OracleReader#finishRead()}. + */ + @Test + public void testFinishRead() throws Exception { + assertFalse(reader.isFinished()); + reader.finishRead(); + assertTrue(reader.isFinished()); + } + + /** + * Test cases for {@link OracleReader#isSourceExist()}. + */ + @Test + public void testIsSourceExist() { + assertTrue(reader.isSourceExist()); + } + + /** + * Test cases for {@link OracleReader#getSnapshot()}. + */ + @Test + public void testGetSnapshot() { + final String snapShort = "snapShort"; + when(oracleSnapshot.getSnapshot()).thenReturn(snapShort); + assertEquals(snapShort, reader.getSnapshot()); + } + + /** + * Test cases for {@link OracleReader#getReadSource()}. + */ + @Test + public void testGetReadSource() { + assertEquals(instanceId, reader.getReadSource()); + } +} diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleSource.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleSource.java new file mode 100644 index 000000000..160357e77 --- /dev/null +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleSource.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.sources; + +import org.apache.inlong.agent.conf.JobProfile; +import org.apache.inlong.agent.metrics.AgentMetricItem; +import org.apache.inlong.agent.metrics.AgentMetricItemSet; +import org.apache.inlong.common.metric.MetricItem; +import org.apache.inlong.common.metric.MetricRegister; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.powermock.api.mockito.PowerMockito.when; +import static org.powermock.api.mockito.PowerMockito.whenNew; +import static org.powermock.api.support.membermodification.MemberMatcher.field; + +/** + * Test cases for {@link OracleSource}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({OracleSource.class, MetricRegister.class}) +@PowerMockIgnore({"javax.management.*"}) +public class TestOracleSource { + + @Mock + JobProfile jobProfile; + + @Mock + private AgentMetricItemSet agentMetricItemSet; + + @Mock + private AgentMetricItem agentMetricItem; + + private AtomicLong sourceSuccessCount; + + private AtomicLong sourceFailCount; + + @Before + public void setup() throws Exception { + sourceSuccessCount = new AtomicLong(0); + sourceFailCount = new AtomicLong(0); + + // mock metrics + whenNew(AgentMetricItemSet.class).withArguments(anyString()).thenReturn(agentMetricItemSet); + when(agentMetricItemSet.findMetricItem(any())).thenReturn(agentMetricItem); + field(AgentMetricItem.class, "sourceSuccessCount").set(agentMetricItem, sourceSuccessCount); + field(AgentMetricItem.class, "sourceFailCount").set(agentMetricItem, sourceFailCount); + PowerMockito.mockStatic(MetricRegister.class); + PowerMockito.doNothing().when( + MetricRegister.class, "register", any(MetricItem.class)); + } + + /** + * Test cases for {@link OracleSource#split(JobProfile)}. + */ + @Test + public void testSplit() { + + // build mock + final OracleSource source = new OracleSource(); + // assert + assertEquals(1, source.split(jobProfile).size()); + } +} diff --git a/pom.xml b/pom.xml index 08785dc82..7f0f54c35 100644 --- a/pom.xml +++ b/pom.xml @@ -623,6 +623,13 @@ <artifactId>debezium-connector-postgres</artifactId> <version>${debezium.version}</version> </dependency> + + <dependency> + <groupId>io.debezium</groupId> + <artifactId>debezium-connector-oracle</artifactId> + <version>${debezium.version}</version> + </dependency> + <dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId>