This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new c5159a2760 [Improve][Connector-V2][MongoDB] Support to convert to double from any numeric type (#6997) c5159a2760 is described below commit c5159a2760ecf8deb377e9b1b18367d29199aaf4 Author: Dongyeon Lee <dev.loust...@gmail.com> AuthorDate: Wed Jun 19 06:21:17 2024 -0700 [Improve][Connector-V2][MongoDB] Support to convert to double from any numeric type (#6997) --- release-note.md | 1 + .../mongodb/serde/BsonToRowDataConverters.java | 2 +- .../connector/v2/mongodb/AbstractMongodbIT.java | 140 ++++++++++++--------- .../e2e/connector/v2/mongodb/MongodbIT.java | 16 +++ .../src/test/resources/mongodb_double_value.conf | 88 +++++++++++++ 5 files changed, 187 insertions(+), 60 deletions(-) diff --git a/release-note.md b/release-note.md index 840648fe64..24455c40ac 100644 --- a/release-note.md +++ b/release-note.md @@ -55,6 +55,7 @@ - [Connector-v2] [File] Fix WriteStrategy parallel writing thread unsafe issue #5546 - [Connector-v2] [File] Inject FileSystem to OrcWriteStrategy - [Connector-v2] [File] Support assign encoding for file source/sink (#5973) +- [Connector-v2] [Mongodb] Support to convert to double from numeric type that mongodb saved it as numeric internally (#6997) ### Zeta(ST-Engine) diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/BsonToRowDataConverters.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/BsonToRowDataConverters.java index a343e0cd2d..505b30fcbd 100644 --- a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/BsonToRowDataConverters.java +++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/BsonToRowDataConverters.java @@ -353,7 +353,7 @@ public class BsonToRowDataConverters implements Serializable { } private static double convertToDouble(BsonValue bsonValue) { - if (bsonValue.isDouble()) { + if (bsonValue.isNumber()) { return bsonValue.asNumber().doubleValue(); } throw new MongodbConnectorException( diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java index dd24dc1e60..cf0b1dbd09 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java @@ -36,6 +36,7 @@ import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; import com.mongodb.client.model.Sorts; +import com.mongodb.client.result.InsertManyResult; import lombok.extern.slf4j.Slf4j; import java.time.Duration; @@ -60,6 +61,9 @@ public abstract class AbstractMongodbIT extends TestSuiteBase implements TestRes protected static final List<Document> TEST_NULL_DATASET = generateTestDataSetWithNull(10); + protected static final List<Document> TEST_DOUBLE_DATASET = + generateTestDataSetWithPresets(5, Arrays.asList(44.0d, 44.1d, 44.2d, 44.3d, 44.4d)); + protected static final String MONGODB_IMAGE = "mongo:latest"; protected static final String MONGODB_CONTAINER_HOST = "e2e_mongodb"; @@ -76,6 +80,10 @@ public abstract class AbstractMongodbIT extends TestSuiteBase implements TestRes protected static final String MONGODB_NULL_TABLE_RESULT = "test_null_op_db_result"; + protected static final String MONGODB_DOUBLE_TABLE = "test_double_op_db"; + + protected static final String MONGODB_DOUBLE_TABLE_RESULT = "test_double_op_db_result"; + protected static final String MONGODB_MATCH_RESULT_TABLE = "test_match_op_result_db"; protected static final String MONGODB_SPLIT_RESULT_TABLE = "test_split_op_result_db"; @@ -105,20 +113,10 @@ public abstract class AbstractMongodbIT extends TestSuiteBase implements TestRes } protected void initSourceData() { - MongoCollection<Document> sourceMatchTable = - client.getDatabase(MONGODB_DATABASE).getCollection(MONGODB_MATCH_TABLE); - sourceMatchTable.deleteMany(new Document()); - sourceMatchTable.insertMany(TEST_MATCH_DATASET); - - MongoCollection<Document> sourceSplitTable = - client.getDatabase(MONGODB_DATABASE).getCollection(MONGODB_SPLIT_TABLE); - sourceSplitTable.deleteMany(new Document()); - sourceSplitTable.insertMany(TEST_SPLIT_DATASET); - - MongoCollection<Document> sourceNullTable = - client.getDatabase(MONGODB_DATABASE).getCollection(MONGODB_NULL_TABLE); - sourceNullTable.deleteMany(new Document()); - sourceNullTable.insertMany(TEST_NULL_DATASET); + prepareInitDataInCollection(MONGODB_MATCH_TABLE, TEST_MATCH_DATASET); + prepareInitDataInCollection(MONGODB_SPLIT_TABLE, TEST_SPLIT_DATASET); + prepareInitDataInCollection(MONGODB_NULL_TABLE, TEST_NULL_DATASET); + prepareInitDataInCollection(MONGODB_DOUBLE_TABLE, TEST_DOUBLE_DATASET); } protected void clearDate(String table) { @@ -129,51 +127,7 @@ public abstract class AbstractMongodbIT extends TestSuiteBase implements TestRes List<Document> dataSet = new ArrayList<>(); for (int i = 0; i < count; i++) { - dataSet.add( - new Document( - "c_map", - new Document("OQBqH", randomString()) - .append("rkvlO", randomString()) - .append("pCMEX", randomString()) - .append("DAgdj", randomString()) - .append("dsJag", randomString())) - .append( - "c_array", - Arrays.asList( - RANDOM.nextInt(), - RANDOM.nextInt(), - RANDOM.nextInt(), - RANDOM.nextInt(), - RANDOM.nextInt())) - .append("c_string", randomString()) - .append("c_boolean", RANDOM.nextBoolean()) - .append("c_int", i) - .append("c_bigint", RANDOM.nextLong()) - .append("c_double", RANDOM.nextDouble() * Double.MAX_VALUE) - .append( - "c_row", - new Document( - "c_map", - new Document("OQBqH", randomString()) - .append("rkvlO", randomString()) - .append("pCMEX", randomString()) - .append("DAgdj", randomString()) - .append("dsJag", randomString())) - .append( - "c_array", - Arrays.asList( - RANDOM.nextInt(), - RANDOM.nextInt(), - RANDOM.nextInt(), - RANDOM.nextInt(), - RANDOM.nextInt())) - .append("c_string", randomString()) - .append("c_boolean", RANDOM.nextBoolean()) - .append("c_int", RANDOM.nextInt()) - .append("c_bigint", RANDOM.nextLong()) - .append( - "c_double", - RANDOM.nextDouble() * Double.MAX_VALUE))); + dataSet.add(generateData(i, RANDOM.nextDouble() * Double.MAX_VALUE)); } return dataSet; } @@ -195,6 +149,17 @@ public abstract class AbstractMongodbIT extends TestSuiteBase implements TestRes return dataSet; } + public static List<Document> generateTestDataSetWithPresets( + int count, List<Double> doublePresets) { + List<Document> dataSet = new ArrayList<>(count); + + for (int i = 0; i < count; i++) { + dataSet.add(generateData(i, doublePresets.get(i))); + } + + return dataSet; + } + protected static String randomString() { int length = RANDOM.nextInt(10) + 1; StringBuilder sb = new StringBuilder(length); @@ -205,6 +170,63 @@ public abstract class AbstractMongodbIT extends TestSuiteBase implements TestRes return sb.toString(); } + private static Document generateData(int intPreset, Double doublePreset) { + return new Document( + "c_map", + new Document("OQBqH", randomString()) + .append("rkvlO", randomString()) + .append("pCMEX", randomString()) + .append("DAgdj", randomString()) + .append("dsJag", randomString())) + .append( + "c_array", + Arrays.asList( + RANDOM.nextInt(), + RANDOM.nextInt(), + RANDOM.nextInt(), + RANDOM.nextInt(), + RANDOM.nextInt())) + .append("c_string", randomString()) + .append("c_boolean", RANDOM.nextBoolean()) + .append("c_int", intPreset) + .append("c_bigint", RANDOM.nextLong()) + .append("c_double", doublePreset) + .append( + "c_row", + new Document( + "c_map", + new Document("OQBqH", randomString()) + .append("rkvlO", randomString()) + .append("pCMEX", randomString()) + .append("DAgdj", randomString()) + .append("dsJag", randomString())) + .append( + "c_array", + Arrays.asList( + RANDOM.nextInt(), + RANDOM.nextInt(), + RANDOM.nextInt(), + RANDOM.nextInt(), + RANDOM.nextInt())) + .append("c_string", randomString()) + .append("c_boolean", RANDOM.nextBoolean()) + .append("c_int", RANDOM.nextInt()) + .append("c_bigint", RANDOM.nextLong()) + .append("c_double", RANDOM.nextDouble() * Double.MAX_VALUE)); + } + + private void prepareInitDataInCollection(String collection, List<Document> dataSet) { + MongoCollection<Document> source = + client.getDatabase(MONGODB_DATABASE).getCollection(collection); + source.deleteMany(new Document()); + + InsertManyResult result = source.insertMany(dataSet); + + if (result.getInsertedIds().size() != dataSet.size()) { + throw new IllegalStateException("Insertion count mismatch"); + } + } + protected List<Document> readMongodbData(String collection) { MongoCollection<Document> sinkTable = client.getDatabase(MONGODB_DATABASE).getCollection(collection); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java index 8ff1f22329..b289af315f 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java @@ -221,4 +221,20 @@ public class MongodbIT extends AbstractMongodbIT { clearDate(MONGODB_TRANSACTION_SINK_TABLE); clearDate(MONGODB_TRANSACTION_UPSERT_TABLE); } + + @TestTemplate + public void testMongodbDoubleValue(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult assertSinkResult = container.executeJob("/mongodb_double_value.conf"); + Assertions.assertEquals(0, assertSinkResult.getExitCode(), assertSinkResult.getStderr()); + + Assertions.assertIterableEquals( + TEST_DOUBLE_DATASET.stream() + .peek(e -> e.remove("_id")) + .collect(Collectors.toList()), + readMongodbData(MONGODB_DOUBLE_TABLE_RESULT).stream() + .peek(e -> e.remove("_id")) + .collect(Collectors.toList())); + clearDate(MONGODB_DOUBLE_TABLE_RESULT); + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_double_value.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_double_value.conf new file mode 100644 index 0000000000..0f0c01483a --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_double_value.conf @@ -0,0 +1,88 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + MongoDB { + uri = "mongodb://e2e_mongodb:27017/test_db" + database = "test_db" + collection = "test_double_op_db" + result_table_name = "mongodb_table" + cursor.no-timeout = true + fetch.size = 1000 + max.time-min = 100 + schema = { + fields { + c_map = "map<string, string>" + c_array = "array<int>" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + c_row = { + c_map = "map<string, string>" + c_array = "array<int>" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + } + } + } + } +} + +sink { + MongoDB { + uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true" + database = "test_db" + collection = "test_double_op_db_result" + source_table_name = "mongodb_table" + schema = { + fields { + c_map = "map<string, string>" + c_array = "array<int>" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + c_row = { + c_map = "map<string, string>" + c_array = "array<int>" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + } + } + } + } +}