This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new aabb0aa99 [INLONG-5045][Agent] Support collect data from MongoDB (#5674) aabb0aa99 is described below commit aabb0aa99bebabcda9d07934d8440062e06894a8 Author: seedscoder <seedsco...@gmail.com> AuthorDate: Wed Sep 14 10:11:46 2022 +0800 [INLONG-5045][Agent] Support collect data from MongoDB (#5674) --- .../apache/inlong/agent/constant/JobConstants.java | 30 ++ .../inlong/agent/pojo/DebeziumSourceFormat.java | 4 + .../apache/inlong/agent/pojo/JobProfileDto.java | 57 ++++ .../org/apache/inlong/agent/pojo/MongoJob.java | 115 +++++++ .../org/apache/inlong/agent/utils/GsonUtil.java | 57 ++++ inlong-agent/agent-plugins/pom.xml | 6 + .../inlong/agent/plugin/sources/MongoDBSource.java | 43 +++ .../agent/plugin/sources/reader/BinlogReader.java | 2 +- .../agent/plugin/sources/reader/MongoDBReader.java | 377 +++++++++++++++++++++ .../plugin/sources/reader/PostgreSQLReader.java | 2 +- ...eSQLSnapshotBase.java => AbstractSnapshot.java} | 67 ++-- .../sources/snapshot/BinlogSnapshotBase.java | 75 +--- .../sources/snapshot/MongoDBSnapshotBase.java | 64 ++++ .../sources/snapshot/PostgreSQLSnapshotBase.java | 70 +--- .../sources/PostgreSQLOffsetManagerTest.java | 2 +- .../plugin/sources/TestBinlogOffsetManager.java | 2 +- .../agent/plugin/sources/TestMongoDBReader.java | 119 +++++++ licenses/inlong-agent/LICENSE | 1 + pom.xml | 7 + 19 files changed, 930 insertions(+), 170 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java index 1894d1507..f6a5d989e 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java @@ -89,6 +89,36 @@ public class JobConstants extends CommonConstants { public static final String JOB_KAFKA_READ_TIMEOUT = "job.kafkaJob.read.timeout"; public static final String JOB_KAFKA_AUTO_COMMIT_OFFSET_RESET = "job.kafkaJob.autoOffsetReset"; + + public static final String JOB_MONGO_HOSTS = "job.mongoJob.hosts"; + public static final String JOB_MONGO_USER = "job.mongoJob.user"; + public static final String JOB_MONGO_PASSWORD = "job.mongoJob.password"; + public static final String JOB_MONGO_DATABASE_INCLUDE_LIST = "job.mongoJob.databaseIncludeList"; + public static final String JOB_MONGO_DATABASE_EXCLUDE_LIST = "job.mongoJob.databaseExcludeList"; + public static final String JOB_MONGO_COLLECTION_INCLUDE_LIST = "job.mongoJob.collectionIncludeList"; + public static final String JOB_MONGO_COLLECTION_EXCLUDE_LIST = "job.mongoJob.collectionExcludeList"; + public static final String JOB_MONGO_FIELD_EXCLUDE_LIST = "job.mongoJob.fieldExcludeList"; + public static final String JOB_MONGO_SNAPSHOT_MODE = "job.mongoJob.snapshotMode"; + public static final String JOB_MONGO_CAPTURE_MODE = "job.mongoJob.captureMode"; + public static final String JOB_MONGO_QUEUE_SIZE = "job.mongoJob.queueSize"; + public static final String JOB_MONGO_STORE_HISTORY_FILENAME = "job.mongoJob.history.filename"; + public static final String JOB_MONGO_OFFSET_SPECIFIC_OFFSET_FILE = "job.mongoJob.offset.specificOffsetFile"; + public static final String JOB_MONGO_OFFSET_SPECIFIC_OFFSET_POS = "job.mongoJob.offset.specificOffsetPos"; + public static final String JOB_MONGO_OFFSETS = "job.mongoJob.offsets"; + public static final String JOB_MONGO_CONNECT_TIMEOUT_MS = "job.mongoJob.connectTimeoutInMs"; + public static final String JOB_MONGO_CURSOR_MAX_AWAIT = "job.mongoJob.cursorMaxAwaitTimeInMs"; + public static final String JOB_MONGO_SOCKET_TIMEOUT = "job.mongoJob.socketTimeoutInMs"; + public static final String JOB_MONGO_SELECTION_TIMEOUT = "job.mongoJob.selectionTimeoutInMs"; + public static final String JOB_MONGO_FIELD_RENAMES = "job.mongoJob.fieldRenames"; + public static final String JOB_MONGO_MEMBERS_DISCOVER = "job.mongoJob.membersAutoDiscover"; + public static final String JOB_MONGO_CONNECT_MAX_ATTEMPTS = "job.mongoJob.connectMaxAttempts"; + public static final String JOB_MONGO_BACKOFF_MAX_DELAY = "job.mongoJob.connectBackoffMaxDelayInMs"; + public static final String JOB_MONGO_BACKOFF_INITIAL_DELAY = "job.mongoJob.connectBackoffInitialDelayInMs"; + public static final String JOB_MONGO_INITIAL_SYNC_MAX_THREADS = "job.mongoJob.initialSyncMaxThreads"; + public static final String JOB_MONGO_SSL_INVALID_HOSTNAME_ALLOWED = "job.mongoJob.sslInvalidHostnameAllowed"; + public static final String JOB_MONGO_SSL_ENABLE = "job.mongoJob.sslEnabled"; + public static final String JOB_MONGO_POLL_INTERVAL = "job.mongoJob.pollIntervalInMs"; + public static final Long JOB_KAFKA_DEFAULT_OFFSET = 0L; // job type, delete/add diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/DebeziumSourceFormat.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/DebeziumSourceFormat.java index 0e0eb9489..97c7bab17 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/DebeziumSourceFormat.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/DebeziumSourceFormat.java @@ -29,5 +29,9 @@ public class DebeziumSourceFormat { private String db; private String table; + /** + * mongo source metadata + */ + private String collection; } diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java index e1a1482f0..80ed18d95 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java @@ -50,6 +50,10 @@ public class JobProfileDto { * kafka source */ public static final String KAFKA_SOURCE = "org.apache.inlong.agent.plugin.sources.KafkaSource"; + /** + * mongo source + */ + public static final String MONGO_SOURCE = "org.apache.inlong.agent.plugin.sources.MongoDBSource"; private static final Gson GSON = new Gson(); @@ -171,6 +175,52 @@ public class JobProfileDto { return kafkaJob; } + private static MongoJob getMongoJob(DataConfig dataConfigs) { + + MongoJob.MongoJobTaskConfig config = GSON.fromJson(dataConfigs.getExtParams(), + MongoJob.MongoJobTaskConfig.class); + MongoJob mongoJob = new MongoJob(); + + mongoJob.setHosts(config.getHosts()); + mongoJob.setUser(config.getUser()); + mongoJob.setPassword(config.getPassword()); + mongoJob.setDatabaseIncludeList(config.getDatabaseIncludeList()); + mongoJob.setDatabaseExcludeList(config.getDatabaseExcludeList()); + mongoJob.setCollectionIncludeList(config.getCollectionIncludeList()); + mongoJob.setCollectionExcludeList(config.getCollectionExcludeList()); + mongoJob.setFieldExcludeList(config.getFieldExcludeList()); + mongoJob.setConnectTimeoutInMs(config.getConnectTimeoutInMs()); + mongoJob.setQueueSize(config.getQueueSize()); + mongoJob.setCursorMaxAwaitTimeInMs(config.getCursorMaxAwaitTimeInMs()); + mongoJob.setSocketTimeoutInMs(config.getSocketTimeoutInMs()); + mongoJob.setSelectionTimeoutInMs(config.getSelectionTimeoutInMs()); + mongoJob.setFieldRenames(config.getFieldRenames()); + mongoJob.setMembersAutoDiscover(config.getMembersAutoDiscover()); + mongoJob.setConnectMaxAttempts(config.getConnectMaxAttempts()); + mongoJob.setConnectBackoffMaxDelayInMs(config.getConnectBackoffMaxDelayInMs()); + mongoJob.setConnectBackoffInitialDelayInMs(config.getConnectBackoffInitialDelayInMs()); + mongoJob.setInitialSyncMaxThreads(config.getInitialSyncMaxThreads()); + mongoJob.setSslInvalidHostnameAllowed(config.getSslInvalidHostnameAllowed()); + mongoJob.setSslEnabled(config.getSslEnabled()); + mongoJob.setPollIntervalInMs(config.getPollIntervalInMs()); + + MongoJob.Offset offset = new MongoJob.Offset(); + offset.setFilename(config.getOffsetFilename()); + offset.setSpecificOffsetFile(config.getSpecificOffsetFile()); + offset.setSpecificOffsetPos(config.getSpecificOffsetPos()); + mongoJob.setOffset(offset); + + MongoJob.Snapshot snapshot = new MongoJob.Snapshot(); + snapshot.setMode(config.getSnapshotMode()); + mongoJob.setSnapshot(snapshot); + + MongoJob.History history = new MongoJob.History(); + history.setFilename(config.getHistoryFilename()); + mongoJob.setHistory(history); + + return mongoJob; + } + private static Proxy getProxy(DataConfig dataConfigs) { Proxy proxy = new Proxy(); Manager manager = new Manager(); @@ -231,6 +281,12 @@ public class JobProfileDto { job.setSource(KAFKA_SOURCE); profileDto.setJob(job); break; + case MONGODB: + MongoJob mongoJob = getMongoJob(dataConfig); + job.setMongoJob(mongoJob); + job.setSource(MONGO_SOURCE); + profileDto.setJob(job); + break; default: } return TriggerProfile.parseJsonStr(GSON.toJson(profileDto)); @@ -257,6 +313,7 @@ public class JobProfileDto { private FileJob fileJob; private BinlogJob binlogJob; private KafkaJob kafkaJob; + private MongoJob mongoJob; } @Data diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/MongoJob.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/MongoJob.java new file mode 100644 index 000000000..087162d05 --- /dev/null +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/MongoJob.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.pojo; + +import lombok.Data; + +/** + * MongoJob : mongo job + */ +@Data +public class MongoJob { + + private String hosts; + private String user; + private String password; + private String databaseIncludeList; + private String databaseExcludeList; + private String collectionIncludeList; + private String collectionExcludeList; + private String fieldExcludeList; + private String connectTimeoutInMs; + private String queueSize; + private String cursorMaxAwaitTimeInMs; + private String socketTimeoutInMs; + private String selectionTimeoutInMs; + private String fieldRenames; + private String membersAutoDiscover; + private String connectMaxAttempts; + private String connectBackoffMaxDelayInMs; + private String connectBackoffInitialDelayInMs; + private String initialSyncMaxThreads; + private String sslInvalidHostnameAllowed; + private String sslEnabled; + private String pollIntervalInMs; + private Snapshot snapshot; + private Capture capture; + private Offset offset; + private History history; + + @Data + public static class Offset { + + private String intervalMs; + private String filename; + private String specificOffsetFile; + private String specificOffsetPos; + } + + @Data + public static class Snapshot { + private String mode; + } + + @Data + public static class Capture { + private String mode; + } + + @Data + public static class History { + private String filename; + + } + + @Data + public static class MongoJobTaskConfig { + + private String hosts; + private String user; + private String password; + + private String databaseIncludeList; + private String databaseExcludeList; + private String collectionIncludeList; + private String collectionExcludeList; + private String fieldExcludeList; + private String connectTimeoutInMs; + private String queueSize; + private String cursorMaxAwaitTimeInMs; + private String socketTimeoutInMs; + private String selectionTimeoutInMs; + private String fieldRenames; + private String membersAutoDiscover; + private String connectMaxAttempts; + private String connectBackoffMaxDelayInMs; + private String connectBackoffInitialDelayInMs; + private String initialSyncMaxThreads; + private String sslInvalidHostnameAllowed; + private String sslEnabled; + private String pollIntervalInMs; + + private String snapshotMode; + private String captureMode; + + private String offsetFilename; + private String historyFilename; + private String specificOffsetFile; + private String specificOffsetPos; + } +} diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/GsonUtil.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/GsonUtil.java new file mode 100644 index 000000000..5e810199f --- /dev/null +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/GsonUtil.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.utils; + +import com.google.gson.Gson; + +/** + * GsonUtil : Gson instances are Thread-safe, so you can reuse them freely across multiple threads. + */ +public class GsonUtil { + + private static final Gson gson = new Gson(); + + /** + * instantiation is not allowed + */ + private GsonUtil() { + throw new UnsupportedOperationException("This is a utility class, so instantiation is not allowed"); + } + + /** + * This method deserializes the specified Json into an object of the specified class. + * + * @param json json + * @param classOfT class of T + * @param <T> T + * @return T + */ + public static <T> T fromJson(String json, Class<T> classOfT) { + return gson.fromJson(json, classOfT); + } + + /** + * This method serializes the specified object into its equivalent Json representation. + * + * @param obj obj + * @return json content + */ + public static String toJson(Object obj) { + return gson.toJson(obj); + } +} diff --git a/inlong-agent/agent-plugins/pom.xml b/inlong-agent/agent-plugins/pom.xml index 0df97e832..c00e5351f 100644 --- a/inlong-agent/agent-plugins/pom.xml +++ b/inlong-agent/agent-plugins/pom.xml @@ -84,6 +84,12 @@ </exclusion> </exclusions> </dependency> + + <dependency> + <groupId>io.debezium</groupId> + <artifactId>debezium-connector-mongodb</artifactId> + </dependency> + <dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java new file mode 100644 index 000000000..21dd01f84 --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MongoDBSource.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.sources; + +import org.apache.inlong.agent.conf.JobProfile; +import org.apache.inlong.agent.plugin.Reader; +import org.apache.inlong.agent.plugin.sources.reader.MongoDBReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; + +/** + * MongoDBSource : mongo source, split mongo source job into multi readers + */ +public class MongoDBSource extends AbstractSource { + + private static final Logger LOGGER = LoggerFactory.getLogger(MongoDBSource.class); + + @Override + public List<Reader> split(JobProfile conf) { + super.init(conf); + List<Reader> readerList = Collections.singletonList(new MongoDBReader()); + sourceMetric.sourceSuccessCount.incrementAndGet(); + return readerList; + } +} diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java index 165e85626..21188de90 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java @@ -155,7 +155,7 @@ public class BinlogReader extends AbstractReader { tryToInitAndGetHistoryPath()) + "/offset.dat" + jobConf.getInstanceId(); binlogSnapshot = new BinlogSnapshotBase(offsetStoreFileName); String offset = jobConf.get(JOB_DATABASE_OFFSETS, ""); - binlogSnapshot.save(offset); + binlogSnapshot.save(offset, binlogSnapshot.getFile()); Properties props = getEngineProps(); DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(io.debezium.engine.format.Json.class) diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java new file mode 100644 index 000000000..b085e0b0f --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MongoDBReader.java @@ -0,0 +1,377 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.sources.reader; + +import com.alibaba.fastjson.JSONPath; +import com.google.common.base.Preconditions; +import io.debezium.config.Configuration; +import io.debezium.config.Field; +import io.debezium.connector.mongodb.MongoDbConnector; +import io.debezium.engine.ChangeEvent; +import io.debezium.engine.DebeziumEngine; +import io.debezium.engine.format.Json; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.inlong.agent.conf.JobProfile; +import org.apache.inlong.agent.constant.SnapshotModeConstants; +import org.apache.inlong.agent.message.DefaultMessage; +import org.apache.inlong.agent.metrics.audit.AuditUtils; +import org.apache.inlong.agent.plugin.Message; +import org.apache.inlong.agent.plugin.sources.snapshot.MongoDBSnapshotBase; +import org.apache.inlong.agent.plugin.utils.InLongFileOffsetBackingStore; +import org.apache.inlong.agent.pojo.DebeziumFormat; +import org.apache.inlong.agent.pojo.DebeziumOffset; +import org.apache.inlong.agent.utils.DebeziumOffsetSerializer; +import org.apache.inlong.agent.utils.GsonUtil; +import org.apache.kafka.connect.storage.FileOffsetBackingStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; + +import static io.debezium.connector.mongodb.MongoDbConnectorConfig.HOSTS; +import static io.debezium.connector.mongodb.MongoDbConnectorConfig.USER; +import static io.debezium.connector.mongodb.MongoDbConnectorConfig.PASSWORD; +import static io.debezium.connector.mongodb.MongoDbConnectorConfig.CONNECT_BACKOFF_INITIAL_DELAY_MS; +import static io.debezium.connector.mongodb.MongoDbConnectorConfig.CONNECT_BACKOFF_MAX_DELAY_MS; +import static io.debezium.connector.mongodb.MongoDbConnectorConfig.CONNECT_TIMEOUT_MS; +import static io.debezium.connector.mongodb.MongoDbConnectorConfig.SOCKET_TIMEOUT_MS; +import static io.debezium.connector.mongodb.MongoDbConnectorConfig.SERVER_SELECTION_TIMEOUT_MS; +import static io.debezium.connector.mongodb.MongoDbConnectorConfig.MONGODB_POLL_INTERVAL_MS; +import static io.debezium.connector.mongodb.MongoDbConnectorConfig.MAX_FAILED_CONNECTIONS; +import static io.debezium.connector.mongodb.MongoDbConnectorConfig.AUTO_DISCOVER_MEMBERS; +import static io.debezium.connector.mongodb.MongoDbConnectorConfig.SSL_ENABLED; +import static io.debezium.connector.mongodb.MongoDbConnectorConfig.SSL_ALLOW_INVALID_HOSTNAMES; +import static io.debezium.connector.mongodb.MongoDbConnectorConfig.CURSOR_MAX_AWAIT_TIME_MS; +import static io.debezium.connector.mongodb.MongoDbConnectorConfig.DATABASE_INCLUDE_LIST; +import static io.debezium.connector.mongodb.MongoDbConnectorConfig.DATABASE_EXCLUDE_LIST; +import static io.debezium.connector.mongodb.MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST; +import static io.debezium.connector.mongodb.MongoDbConnectorConfig.COLLECTION_EXCLUDE_LIST; +import static io.debezium.connector.mongodb.MongoDbConnectorConfig.FIELD_EXCLUDE_LIST; +import static io.debezium.connector.mongodb.MongoDbConnectorConfig.FIELD_RENAMES; +import static io.debezium.connector.mongodb.MongoDbConnectorConfig.MAX_COPY_THREADS; +import static io.debezium.connector.mongodb.MongoDbConnectorConfig.SNAPSHOT_MODE; +import static io.debezium.connector.mongodb.MongoDbConnectorConfig.CAPTURE_MODE; +import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_MAP_CAPACITY; +import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_HOSTS; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_USER; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_PASSWORD; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_DATABASE_INCLUDE_LIST; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_DATABASE_EXCLUDE_LIST; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_COLLECTION_INCLUDE_LIST; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_COLLECTION_EXCLUDE_LIST; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_FIELD_EXCLUDE_LIST; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SNAPSHOT_MODE; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_CAPTURE_MODE; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_QUEUE_SIZE; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_STORE_HISTORY_FILENAME; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_OFFSET_SPECIFIC_OFFSET_FILE; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_OFFSET_SPECIFIC_OFFSET_POS; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_OFFSETS; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_CONNECT_TIMEOUT_MS; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_CURSOR_MAX_AWAIT; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SOCKET_TIMEOUT; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SELECTION_TIMEOUT; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_FIELD_RENAMES; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_MEMBERS_DISCOVER; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_CONNECT_MAX_ATTEMPTS; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_BACKOFF_MAX_DELAY; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_BACKOFF_INITIAL_DELAY; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_INITIAL_SYNC_MAX_THREADS; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SSL_INVALID_HOSTNAME_ALLOWED; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_SSL_ENABLE; +import static org.apache.inlong.agent.constant.JobConstants.JOB_MONGO_POLL_INTERVAL; + +/** + * MongoDBReader : mongo source, split mongo source job into multi readers + */ +public class MongoDBReader extends AbstractReader { + + private static final Logger LOGGER = LoggerFactory.getLogger(MongoDBReader.class); + + private String instanceId; + private String offsetStoreFileName; + private String specificOffsetFile; + private String specificOffsetPos; + private boolean finished = false; + private boolean destroyed = false; + + private ExecutorService executor; + /** + * mongo snapshot info <br/> + * Currently, there is no usage scenario + */ + private MongoDBSnapshotBase snapshot; + /** + * message buffer queue + */ + private LinkedBlockingQueue<Pair<String, DebeziumFormat>> bufferPool; + + @Override + public Message read() { + if (!bufferPool.isEmpty()) { + super.readerMetric.pluginReadCount.incrementAndGet(); + return this.pollMessage(); + } else { + return null; + } + } + + @Override + public boolean isFinished() { + return finished; + } + + @Override + public String getReadSource() { + return instanceId; + } + + @Override + public void setReadTimeout(long mills) { + } + + @Override + public void setWaitMillisecond(long millis) { + } + + @Override + public String getSnapshot() { + if (snapshot != null) { + return snapshot.getSnapshot(); + } else { + return ""; + } + } + + @Override + public void finishRead() { + this.finished = true; + } + + @Override + public boolean isSourceExist() { + return true; + } + + @Override + public void destroy() { + synchronized (this) { + if (!destroyed) { + this.executor.shutdownNow(); + this.snapshot.close(); + this.destroyed = true; + } + } + } + + @Override + public void init(JobProfile jobConf) { + super.init(jobConf); + this.setGlobalParamsValue(jobConf); + this.startEmbeddedDebeziumEngine(jobConf); + } + + /** + * poll message from buffer pool + * + * @return org.apache.inlong.agent.plugin.Message + */ + private Message pollMessage() { + // Retrieves and removes the head of this queue, + // or returns null if this queue is empty. + Pair<String, DebeziumFormat> message = bufferPool.poll(); + if (message == null) { + return null; + } + Map<String, String> header = new HashMap<>(DEFAULT_MAP_CAPACITY); + header.put(PROXY_KEY_DATA, message.getKey()); + return new DefaultMessage(GsonUtil.toJson(message.getValue()).getBytes(StandardCharsets.UTF_8), header); + } + + /** + * set global parameters value + * + * @param jobConf job conf + */ + private void setGlobalParamsValue(JobProfile jobConf) { + bufferPool = new LinkedBlockingQueue<>(jobConf.getInt(JOB_MONGO_QUEUE_SIZE, 1000)); + instanceId = jobConf.getInstanceId(); + // offset file absolute path + offsetStoreFileName = jobConf.get(JOB_MONGO_STORE_HISTORY_FILENAME, + MongoDBSnapshotBase.getSnapshotFilePath()) + "/mongo-" + instanceId + "-offset.dat"; + // snapshot info + snapshot = new MongoDBSnapshotBase(offsetStoreFileName); + String offset = jobConf.get(JOB_MONGO_OFFSETS, ""); + snapshot.save(offset, new File(offsetStoreFileName)); + // offset info + specificOffsetFile = jobConf.get(JOB_MONGO_OFFSET_SPECIFIC_OFFSET_FILE, ""); + specificOffsetPos = jobConf.get(JOB_MONGO_OFFSET_SPECIFIC_OFFSET_POS, "-1"); + } + + /** + * start the embedded debezium engine + * + * @param jobConf job conf + */ + private void startEmbeddedDebeziumEngine(JobProfile jobConf) { + DebeziumEngine<ChangeEvent<String, String>> debeziumEngine = DebeziumEngine.create(Json.class) + .using(this.buildMongoConnectorConfig(jobConf)) + .notifying(this::handleChangeEvent) + .using(this::handle) + .build(); + this.executor = Executors.newSingleThreadExecutor(); + this.executor.execute(debeziumEngine); + } + + /** + * Handle the completion of the embedded connector engine. + * + * @param success {@code true} if the connector completed normally, + * or {@code false} if the connector produced an error + * that prevented startup or premature termination. + * @param message the completion message; never null + * @param error the error, or null if there was no exception + */ + private void handle(boolean success, String message, Throwable error) { + //jobConf.getInstanceId() + if (!success) { + LOGGER.error("{}, {}", message, error); + } + } + + /** + * A Configuration object is basically a decorator around a {@link Properties} object. + * + * @return Configuration + */ + private Properties buildMongoConnectorConfig(JobProfile jobConf) { + Configuration.Builder builder = Configuration.create(); + setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_HOSTS, HOSTS); + setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_USER, USER); + setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_PASSWORD, PASSWORD); + setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_DATABASE_INCLUDE_LIST, DATABASE_INCLUDE_LIST); + setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_DATABASE_EXCLUDE_LIST, DATABASE_EXCLUDE_LIST); + setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_COLLECTION_INCLUDE_LIST, COLLECTION_INCLUDE_LIST); + setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_COLLECTION_EXCLUDE_LIST, COLLECTION_EXCLUDE_LIST); + setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_FIELD_EXCLUDE_LIST, FIELD_EXCLUDE_LIST); + setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_SNAPSHOT_MODE, SNAPSHOT_MODE); + setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_CAPTURE_MODE, CAPTURE_MODE); + setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_CONNECT_TIMEOUT_MS, CONNECT_TIMEOUT_MS); + setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_CURSOR_MAX_AWAIT, CURSOR_MAX_AWAIT_TIME_MS); + setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_SOCKET_TIMEOUT, SOCKET_TIMEOUT_MS); + setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_SELECTION_TIMEOUT, SERVER_SELECTION_TIMEOUT_MS); + setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_FIELD_RENAMES, FIELD_RENAMES); + setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_MEMBERS_DISCOVER, AUTO_DISCOVER_MEMBERS); + setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_CONNECT_MAX_ATTEMPTS, MAX_FAILED_CONNECTIONS); + setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_BACKOFF_MAX_DELAY, CONNECT_BACKOFF_MAX_DELAY_MS); + setEngineConfigIfNecessary(jobConf, builder, + JOB_MONGO_BACKOFF_INITIAL_DELAY, CONNECT_BACKOFF_INITIAL_DELAY_MS); + setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_INITIAL_SYNC_MAX_THREADS, MAX_COPY_THREADS); + setEngineConfigIfNecessary(jobConf, builder, + JOB_MONGO_SSL_INVALID_HOSTNAME_ALLOWED, SSL_ALLOW_INVALID_HOSTNAMES); + setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_SSL_ENABLE, SSL_ENABLED); + setEngineConfigIfNecessary(jobConf, builder, JOB_MONGO_POLL_INTERVAL, MONGODB_POLL_INTERVAL_MS); + + Properties props = builder.build().asProperties(); + props.setProperty("offset.storage.file.filename", offsetStoreFileName); + props.setProperty("connector.class", MongoDbConnector.class.getCanonicalName()); + props.setProperty("name", instanceId); + + String snapshotMode = props.getOrDefault(JOB_MONGO_SNAPSHOT_MODE, "").toString(); + if (Objects.equals(SnapshotModeConstants.INITIAL, snapshotMode)) { + props.setProperty("offset.storage", InLongFileOffsetBackingStore.class.getCanonicalName()); + props.setProperty(InLongFileOffsetBackingStore.OFFSET_STATE_VALUE, serializeOffset()); + } else { + props.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName()); + } + LOGGER.info("mongo job {} start with props {}", + jobConf.getInstanceId(), + GsonUtil.toJson(props)); + return props; + } + + private void setEngineConfigIfNecessary(JobProfile jobConf, + Configuration.Builder builder, String key, Field field) { + String value = jobConf.get(key, field.defaultValueAsString()); + if (StringUtils.isBlank(value)) { + return; + } + builder.with(field, value); + } + + private String serializeOffset() { + Map<String, Object> sourceOffset = new HashMap<>(); + Preconditions.checkNotNull(JOB_MONGO_OFFSET_SPECIFIC_OFFSET_FILE, + JOB_MONGO_OFFSET_SPECIFIC_OFFSET_FILE + " cannot be null"); + sourceOffset.put("file", specificOffsetFile); + Preconditions.checkNotNull(JOB_MONGO_OFFSET_SPECIFIC_OFFSET_POS, + JOB_MONGO_OFFSET_SPECIFIC_OFFSET_POS + " cannot be null"); + sourceOffset.put("pos", specificOffsetPos); + DebeziumOffset specificOffset = new DebeziumOffset(); + specificOffset.setSourceOffset(sourceOffset); + Map<String, String> sourcePartition = new HashMap<>(); + sourcePartition.put("server", instanceId); + specificOffset.setSourcePartition(sourcePartition); + byte[] serializedOffset = new byte[0]; + try { + serializedOffset = DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset); + } catch (IOException e) { + LOGGER.error("serialize offset message error", e); + } + return new String(serializedOffset, StandardCharsets.UTF_8); + } + + /** + * Handles a batch of records, calling the {@link DebeziumEngine.RecordCommitter#markProcessed(Object)} + * for each record and {@link DebeziumEngine.RecordCommitter#markBatchFinished()} when this batch is finished. + * + * @param records the records to be processed + * @param committer the committer that indicates to the system that we are finished + */ + private void handleChangeEvent(List<ChangeEvent<String, String>> records, + DebeziumEngine.RecordCommitter<ChangeEvent<String, String>> committer) { + try { + for (ChangeEvent<String, String> record : records) { + DebeziumFormat debeziumFormat = JSONPath.read(record.value(), "$.payload", DebeziumFormat.class); + bufferPool.put(Pair.of(debeziumFormat.getSource().getCollection(), debeziumFormat)); + committer.markProcessed(record); + } + committer.markBatchFinished(); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, super.inlongGroupId, super.inlongStreamId, + System.currentTimeMillis(), records.size()); + readerMetric.pluginReadCount.addAndGet(records.size()); + } catch (InterruptedException e) { + e.printStackTrace(); + LOGGER.error("parse mongo message error", e); + + readerMetric.pluginReadFailCount.addAndGet(records.size()); + } + } +} diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java index 5d3e9207c..5d78c2787 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/PostgreSQLReader.java @@ -146,7 +146,7 @@ public class PostgreSQLReader extends AbstractReader { specificOffsetFile = jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE, ""); specificOffsetPos = jobConf.get(JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_POS, "-1"); postgreSQLSnapshot = new PostgreSQLSnapshotBase(offsetStoreFileName); - postgreSQLSnapshot.save(offset); + postgreSQLSnapshot.save(offset, postgreSQLSnapshot.getFile()); Properties props = getEngineProps(); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/PostgreSQLSnapshotBase.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/AbstractSnapshot.java similarity index 69% copy from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/PostgreSQLSnapshotBase.java copy to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/AbstractSnapshot.java index d27213389..8b100149d 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/PostgreSQLSnapshotBase.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/AbstractSnapshot.java @@ -22,55 +22,49 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedInputStream; -import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.OutputStream; +import java.io.ByteArrayOutputStream; import java.nio.file.Files; import java.util.Base64; /** - * PostgreSQL Snapshot + * AbstractSnapshot : AbstractSnapshot */ -public class PostgreSQLSnapshotBase implements SnapshotBase { - - public static final int BUFFER_SIZE = 1024; - public static final int START_OFFSET = 0; - private static final Logger LOGGER = LoggerFactory.getLogger(PostgreSQLSnapshotBase.class); - private final Base64.Decoder decoder = Base64.getDecoder(); - private final Base64.Encoder encoder = Base64.getEncoder(); - private File file; - private byte[] offset; +public abstract class AbstractSnapshot implements SnapshotBase { - public PostgreSQLSnapshotBase(String filePath) { - file = new File(filePath); - } - - @Override - public String getSnapshot() { - load(); - return encoder.encodeToString(offset); - } + private static final Logger LOGGER = LoggerFactory.getLogger(SnapshotBase.class); - @Override - public void close() { + protected static final Base64.Decoder DECODER = Base64.getDecoder(); + protected static final Base64.Encoder ENCODER = Base64.getEncoder(); - } + /** + * buffer size used for reading and writing + */ + private static final int BUFFER_SIZE = 8192; + /** + * start offset + */ + private static final int START_OFFSET = 0; /** - * Load postgres offset from local file + * Load the file contents from the specified path + * + * @param file file + * @return file content */ - private void load() { + public byte[] load(File file) { try { if (!file.exists()) { // if parentDir not exist, create first File parentDir = file.getParentFile(); if (parentDir == null) { LOGGER.info("no parent dir, file:{}", file.getAbsolutePath()); - return; + return new byte[0]; } if (!parentDir.exists()) { - boolean success = parentDir.mkdir(); + boolean success = parentDir.mkdirs(); LOGGER.info("create dir {} result {}", parentDir, success); } file.createNewFile(); @@ -83,23 +77,27 @@ public class PostgreSQLSnapshotBase implements SnapshotBase { while ((len = inputStream.read(buf)) != -1) { outputStream.write(buf, START_OFFSET, len); } - offset = outputStream.toByteArray(); inputStream.close(); outputStream.close(); + return outputStream.toByteArray(); } catch (Throwable ex) { - LOGGER.error("load PostgreSQL WAL log error", ex); + LOGGER.error("load binlog offset error", ex); ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex); + return null; } } /** - * Save PostgreSQL offset to local file + * offset persist + * + * @param snapshot Contents of the file to be written back + * @param destFile Target file */ - public void save(String snapshot) { - byte[] bytes = decoder.decode(snapshot); + public void save(String snapshot, File destFile) { + byte[] bytes = DECODER.decode(snapshot); if (bytes.length != 0) { - offset = bytes; - try (OutputStream output = Files.newOutputStream(file.toPath())) { + //offset = bytes; + try (OutputStream output = Files.newOutputStream(destFile.toPath())) { output.write(bytes); } catch (Throwable e) { LOGGER.error("save offset to file error", e); @@ -107,4 +105,5 @@ public class PostgreSQLSnapshotBase implements SnapshotBase { } } } + } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/BinlogSnapshotBase.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/BinlogSnapshotBase.java index d0b9ca230..d19b4f61c 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/BinlogSnapshotBase.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/BinlogSnapshotBase.java @@ -17,32 +17,19 @@ package org.apache.inlong.agent.plugin.sources.snapshot; -import org.apache.inlong.agent.utils.ThreadUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedInputStream; -import java.io.ByteArrayOutputStream; import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.OutputStream; -import java.util.Base64; -import java.util.Base64.Decoder; -import java.util.Base64.Encoder; /** * binlog snapshot */ -public class BinlogSnapshotBase implements SnapshotBase { +public class BinlogSnapshotBase extends AbstractSnapshot { - public static final int BUFFER_SIZE = 1024; - public static final int START_OFFSET = 0; private static final Logger log = LoggerFactory.getLogger(BinlogSnapshotBase.class); - private final Decoder decoder = Base64.getDecoder(); - private final Encoder encoder = Base64.getEncoder(); - private File file; - private byte[] offset; + + private final File file; public BinlogSnapshotBase(String filePath) { file = new File(filePath); @@ -50,64 +37,16 @@ public class BinlogSnapshotBase implements SnapshotBase { @Override public String getSnapshot() { - load(); - return encoder.encodeToString(offset); + byte[] offset = this.load(this.file); + return ENCODER.encodeToString(offset); } @Override public void close() { } - /** - * load binlog offset from local file - */ - public void load() { - try { - if (!file.exists()) { - // if parentDir not exist, create first - File parentDir = file.getParentFile(); - if (parentDir == null) { - log.info("no parent dir, file:{}", file.getAbsolutePath()); - return; - } - if (!parentDir.exists()) { - boolean success = parentDir.mkdirs(); - log.info("create dir {} result {}", parentDir, success); - } - file.createNewFile(); - } - FileInputStream fis = new FileInputStream(file); - BufferedInputStream inputStream = new BufferedInputStream(fis); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - int len; - byte[] buf = new byte[BUFFER_SIZE]; - while ((len = inputStream.read(buf)) != -1) { - outputStream.write(buf, START_OFFSET, len); - } - offset = outputStream.toByteArray(); - inputStream.close(); - outputStream.close(); - } catch (Throwable ex) { - log.error("load binlog offset error", ex); - ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex); - } - } - - /** - * save binlog offset to local file - */ - public void save(String snapshot) { - byte[] bytes = decoder.decode(snapshot); - if (bytes.length != 0) { - offset = bytes; - try (OutputStream output = new FileOutputStream(file)) { - output.write(bytes); - } catch (Throwable e) { - log.error("save offset to file error", e); - ThreadUtils.threadThrowableHandler(Thread.currentThread(), e); - - } - } + public File getFile() { + return file; } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/MongoDBSnapshotBase.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/MongoDBSnapshotBase.java new file mode 100644 index 000000000..82108b208 --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/MongoDBSnapshotBase.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.sources.snapshot; + +import org.apache.inlong.agent.conf.AgentConfiguration; +import org.apache.inlong.agent.constant.AgentConstants; +import org.apache.inlong.agent.utils.AgentUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; + +/** + * MongoDBSnapshotBase : mongo snapshot + */ +public class MongoDBSnapshotBase extends AbstractSnapshot { + + private static final Logger LOGGER = LoggerFactory.getLogger(MongoDBSnapshotBase.class); + /** + * agent configuration + */ + private static final AgentConfiguration AGENT_CONFIGURATION = AgentConfiguration.getAgentConf(); + /** + * snapshot file + */ + private final File file; + + public MongoDBSnapshotBase(String filePath) { + file = new File(filePath); + } + + @Override + public String getSnapshot() { + return ENCODER.encodeToString(this.load(file)); + } + + @Override + public void close() { + + } + + public static String getSnapshotFilePath() { + String historyPath = AGENT_CONFIGURATION.get( + AgentConstants.AGENT_HISTORY_PATH, AgentConstants.DEFAULT_AGENT_HISTORY_PATH); + String parentPath = AGENT_CONFIGURATION.get( + AgentConstants.AGENT_HOME, AgentConstants.DEFAULT_AGENT_HOME); + return AgentUtils.makeDirsIfNotExist(historyPath, parentPath).getAbsolutePath(); + } +} diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/PostgreSQLSnapshotBase.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/PostgreSQLSnapshotBase.java index d27213389..8aee4c16f 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/PostgreSQLSnapshotBase.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/snapshot/PostgreSQLSnapshotBase.java @@ -17,30 +17,18 @@ package org.apache.inlong.agent.plugin.sources.snapshot; -import org.apache.inlong.agent.utils.ThreadUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedInputStream; -import java.io.ByteArrayOutputStream; import java.io.File; -import java.io.FileInputStream; -import java.io.OutputStream; -import java.nio.file.Files; -import java.util.Base64; /** * PostgreSQL Snapshot */ -public class PostgreSQLSnapshotBase implements SnapshotBase { +public class PostgreSQLSnapshotBase extends AbstractSnapshot { - public static final int BUFFER_SIZE = 1024; - public static final int START_OFFSET = 0; private static final Logger LOGGER = LoggerFactory.getLogger(PostgreSQLSnapshotBase.class); - private final Base64.Decoder decoder = Base64.getDecoder(); - private final Base64.Encoder encoder = Base64.getEncoder(); - private File file; - private byte[] offset; + private final File file; public PostgreSQLSnapshotBase(String filePath) { file = new File(filePath); @@ -48,8 +36,8 @@ public class PostgreSQLSnapshotBase implements SnapshotBase { @Override public String getSnapshot() { - load(); - return encoder.encodeToString(offset); + byte[] offset = this.load(this.file); + return ENCODER.encodeToString(offset); } @Override @@ -57,54 +45,8 @@ public class PostgreSQLSnapshotBase implements SnapshotBase { } - /** - * Load postgres offset from local file - */ - private void load() { - try { - if (!file.exists()) { - // if parentDir not exist, create first - File parentDir = file.getParentFile(); - if (parentDir == null) { - LOGGER.info("no parent dir, file:{}", file.getAbsolutePath()); - return; - } - if (!parentDir.exists()) { - boolean success = parentDir.mkdir(); - LOGGER.info("create dir {} result {}", parentDir, success); - } - file.createNewFile(); - } - FileInputStream fis = new FileInputStream(file); - BufferedInputStream inputStream = new BufferedInputStream(fis); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - int len; - byte[] buf = new byte[BUFFER_SIZE]; - while ((len = inputStream.read(buf)) != -1) { - outputStream.write(buf, START_OFFSET, len); - } - offset = outputStream.toByteArray(); - inputStream.close(); - outputStream.close(); - } catch (Throwable ex) { - LOGGER.error("load PostgreSQL WAL log error", ex); - ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex); - } + public File getFile() { + return file; } - /** - * Save PostgreSQL offset to local file - */ - public void save(String snapshot) { - byte[] bytes = decoder.decode(snapshot); - if (bytes.length != 0) { - offset = bytes; - try (OutputStream output = Files.newOutputStream(file.toPath())) { - output.write(bytes); - } catch (Throwable e) { - LOGGER.error("save offset to file error", e); - ThreadUtils.threadThrowableHandler(Thread.currentThread(), e); - } - } - } } diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/PostgreSQLOffsetManagerTest.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/PostgreSQLOffsetManagerTest.java index 63cb8ae93..658a27f0c 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/PostgreSQLOffsetManagerTest.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/PostgreSQLOffsetManagerTest.java @@ -58,7 +58,7 @@ public class PostgreSQLOffsetManagerTest { byte[] snapshotBytes = new byte[]{-65,-14,23}; final Base64 base64 = new Base64(); String encodeSnapshot = base64.encodeAsString(snapshotBytes); - snapshotManager.save(encodeSnapshot); + snapshotManager.save(encodeSnapshot, snapshotManager.getFile()); Assert.assertEquals(snapshotManager.getSnapshot(),encodeSnapshot); File file = new File(filePath.toString()); Assert.assertEquals(file.exists(),true); diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestBinlogOffsetManager.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestBinlogOffsetManager.java index 45d94632e..4d6f262d4 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestBinlogOffsetManager.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestBinlogOffsetManager.java @@ -52,7 +52,7 @@ public class TestBinlogOffsetManager { byte[] snapshotBytes = new byte[]{-65, -14, -23}; final Base64 base64 = new Base64(); String encodeSnapshot = base64.encodeAsString(snapshotBytes); - snapshotManager.save(encodeSnapshot); + snapshotManager.save(encodeSnapshot, snapshotManager.getFile()); Assert.assertEquals(snapshotManager.getSnapshot(), encodeSnapshot); } diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMongoDBReader.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMongoDBReader.java new file mode 100644 index 000000000..9f5a06f45 --- /dev/null +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMongoDBReader.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.sources; + +import com.alibaba.fastjson.JSONPath; +import org.apache.inlong.agent.conf.JobProfile; +import org.apache.inlong.agent.constant.JobConstants; +import org.apache.inlong.agent.plugin.Message; +import org.apache.inlong.agent.plugin.sources.reader.MongoDBReader; +import org.apache.inlong.agent.pojo.DebeziumFormat; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.UUID; + +import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID; +import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID; + +/** + * TestMongoDBReader : TestMongoDBReader + */ +public class TestMongoDBReader { + + + private static final Logger LOGGER = LoggerFactory.getLogger(TestMongoDBReader.class); + + /** + * Change event format verification + */ + @Test + public void testDebeziumFormat() { + String json = "{\n" + + " \"payload\": {\n" + + " \"after\": \"{\\\"_id\\\":{\\\"$oid\\\":\\\"6304d21d96befa25a3630c5e\\\"}" + + ",\\\"orderId\\\":\\\"o0011\\\",\\\"userId\\\":\\\"u0012\\\",\\\"items\\\":" + + "[{\\\"itemId2\\\":\\\"i001\\\",\\\"itemName\\\":\\\"yyy\\\"}," + + "{\\\"itemId\\\":\\\"i002\\\",\\\"itemName\\\":\\\"yyy2\\\"}]}\",\n" + + " \"patch\": null,\n" + + " \"filter\": null,\n" + + " \"updateDescription\": {\n" + + " \"removedFields\": null,\n" + + " \"updatedFields\": \"{\\\"items\\\":[{\\\"itemId2\\\":\\\"i001\\\", " + + "\\\"itemName\\\":\\\"xxx\\\"}, {\\\"itemId\\\":\\\"i002\\\", \\\"itemName\\\":\\\"xxx2\\\"}], " + + "\\\"userId\\\":\\\"u0012\\\"}\",\n" + + " \"truncatedArrays\": null\n" + + " },\n" + + " \"source\": {\n" + + " \"version\": \"1.8.0.Final\",\n" + + " \"connector\": \"mongodb\",\n" + + " \"name\": \"myrs\",\n" + + " \"ts_ms\": 1661332000000,\n" + + " \"snapshot\": \"false\",\n" + + " \"db\": \"mall\",\n" + + " \"sequence\": null,\n" + + " \"rs\": \"myrs\",\n" + + " \"collection\": \"order\",\n" + + " \"ord\": 1,\n" + + " \"h\": null,\n" + + " \"tord\": null,\n" + + " \"stxnid\": null,\n" + + " \"lsid\": null,\n" + + " \"txnNumber\": null\n" + + " },\n" + + " \"op\": \"u\",\n" + + " \"ts_ms\": 1661332000257,\n" + + " \"transaction\": null\n" + + " }\n" + + "}"; + DebeziumFormat debeziumFormat = JSONPath.read(json, "$.payload", DebeziumFormat.class); + Assert.assertEquals("order", debeziumFormat.getSource().getCollection()); + Assert.assertEquals("false", debeziumFormat.getSource().getSnapshot()); + } + + /** + * Use local Mongo shard cluster for temporary testing + */ + @Ignore + public void readChangeEventFromMongo() { + JobProfile jobProfile = new JobProfile(); + jobProfile.set("job.mongoJob.hosts", "localhost:37018"); + jobProfile.set("job.mongoJob.user", "mongo"); + jobProfile.set("job.mongoJob.password", "root"); + jobProfile.set("job.mongoJob.name", "myrs"); + jobProfile.set("job.mongoJob.connectMaxAttempts", "3"); + jobProfile.set("job.mongoJob.databaseIncludeList", "mall"); + jobProfile.set("job.mongoJob.collectionIncludeList", "order"); + jobProfile.set("job.mongoJob.snapshotMode", "never"); + jobProfile.set(JobConstants.JOB_INSTANCE_ID, UUID.randomUUID().toString()); + + jobProfile.set(PROXY_INLONG_GROUP_ID, UUID.randomUUID().toString()); + jobProfile.set(PROXY_INLONG_STREAM_ID, UUID.randomUUID().toString()); + MongoDBReader mongoReader = new MongoDBReader(); + mongoReader.init(jobProfile); + while (true) { + Message message = mongoReader.read(); + if (message != null) { + LOGGER.info("event content: {}", message); + } + } + } +} diff --git a/licenses/inlong-agent/LICENSE b/licenses/inlong-agent/LICENSE index 9b18e8655..04b79a5bd 100644 --- a/licenses/inlong-agent/LICENSE +++ b/licenses/inlong-agent/LICENSE @@ -368,6 +368,7 @@ The text of each license is the standard Apache 2.0 license. io.debezium:debezium-api:1.8.0.Final - Debezium API (https://github.com/debezium/debezium/tree/v1.8.0.Final), (The Apache Software License, Version 2.0) io.debezium:debezium-connector-mysql:1.8.0.Final - Debezium Connector for MySQL (https://github.com/debezium/debezium/tree/v1.8.0.Final), (The Apache Software License, Version 2.0) io.debezium:debezium-connector-postgres:1.8.0.Final - Debezium Connector for PostgreSQL (https://github.com/debezium/debezium/tree/v1.8.0.Final), (The Apache Software License, Version 2.0) + io.debezium:debezium-connector-mongodb:1.8.0.Final - Debezium connector for MongoDB (https://github.com/debezium/debezium/tree/v1.8.0.Final), (The Apache Software License, Version 2.0) io.debezium:debezium-core:1.8.0.Final - Debezium Core (https://github.com/debezium/debezium/tree/v1.8.0.Final), (The Apache Software License, Version 2.0) io.debezium:debezium-ddl-parser:1.8.0.Final - Debezium ANTLR DDL parsers (https://github.com/debezium/debezium/tree/v1.8.0.Final), (The Apache Software License, Version 2.0) io.debezium:debezium-embedded:1.8.0.Final - Debezium Embedded (https://github.com/debezium/debezium/tree/v1.8.0.Final), (The Apache Software License, Version 2.0) diff --git a/pom.xml b/pom.xml index 99819cc8d..943a3c0ea 100644 --- a/pom.xml +++ b/pom.xml @@ -603,6 +603,13 @@ <artifactId>debezium-connector-mysql</artifactId> <version>${debezium.version}</version> </dependency> + + <dependency> + <groupId>io.debezium</groupId> + <artifactId>debezium-connector-mongodb</artifactId> + <version>${debezium.version}</version> + </dependency> + <dependency> <groupId>io.debezium</groupId> <artifactId>debezium-connector-postgres</artifactId>