This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c5159a2760 [Improve][Connector-V2][MongoDB] Support to convert to 
double from any numeric type (#6997)
c5159a2760 is described below

commit c5159a2760ecf8deb377e9b1b18367d29199aaf4
Author: Dongyeon Lee <dev.loust...@gmail.com>
AuthorDate: Wed Jun 19 06:21:17 2024 -0700

    [Improve][Connector-V2][MongoDB] Support to convert to double from any 
numeric type (#6997)
---
 release-note.md                                    |   1 +
 .../mongodb/serde/BsonToRowDataConverters.java     |   2 +-
 .../connector/v2/mongodb/AbstractMongodbIT.java    | 140 ++++++++++++---------
 .../e2e/connector/v2/mongodb/MongodbIT.java        |  16 +++
 .../src/test/resources/mongodb_double_value.conf   |  88 +++++++++++++
 5 files changed, 187 insertions(+), 60 deletions(-)

diff --git a/release-note.md b/release-note.md
index 840648fe64..24455c40ac 100644
--- a/release-note.md
+++ b/release-note.md
@@ -55,6 +55,7 @@
 - [Connector-v2] [File] Fix WriteStrategy parallel writing thread unsafe issue 
#5546
 - [Connector-v2] [File] Inject FileSystem to OrcWriteStrategy
 - [Connector-v2] [File] Support assign encoding for file source/sink (#5973)
+- [Connector-v2] [Mongodb] Support to convert to double from numeric type that 
mongodb saved it as numeric internally (#6997)
 
 ### Zeta(ST-Engine)
 
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/BsonToRowDataConverters.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/BsonToRowDataConverters.java
index a343e0cd2d..505b30fcbd 100644
--- 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/BsonToRowDataConverters.java
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/BsonToRowDataConverters.java
@@ -353,7 +353,7 @@ public class BsonToRowDataConverters implements 
Serializable {
     }
 
     private static double convertToDouble(BsonValue bsonValue) {
-        if (bsonValue.isDouble()) {
+        if (bsonValue.isNumber()) {
             return bsonValue.asNumber().doubleValue();
         }
         throw new MongodbConnectorException(
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java
index dd24dc1e60..cf0b1dbd09 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java
@@ -36,6 +36,7 @@ import com.mongodb.client.MongoClients;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoCursor;
 import com.mongodb.client.model.Sorts;
+import com.mongodb.client.result.InsertManyResult;
 import lombok.extern.slf4j.Slf4j;
 
 import java.time.Duration;
@@ -60,6 +61,9 @@ public abstract class AbstractMongodbIT extends TestSuiteBase 
implements TestRes
 
     protected static final List<Document> TEST_NULL_DATASET = 
generateTestDataSetWithNull(10);
 
+    protected static final List<Document> TEST_DOUBLE_DATASET =
+            generateTestDataSetWithPresets(5, Arrays.asList(44.0d, 44.1d, 
44.2d, 44.3d, 44.4d));
+
     protected static final String MONGODB_IMAGE = "mongo:latest";
 
     protected static final String MONGODB_CONTAINER_HOST = "e2e_mongodb";
@@ -76,6 +80,10 @@ public abstract class AbstractMongodbIT extends 
TestSuiteBase implements TestRes
 
     protected static final String MONGODB_NULL_TABLE_RESULT = 
"test_null_op_db_result";
 
+    protected static final String MONGODB_DOUBLE_TABLE = "test_double_op_db";
+
+    protected static final String MONGODB_DOUBLE_TABLE_RESULT = 
"test_double_op_db_result";
+
     protected static final String MONGODB_MATCH_RESULT_TABLE = 
"test_match_op_result_db";
 
     protected static final String MONGODB_SPLIT_RESULT_TABLE = 
"test_split_op_result_db";
@@ -105,20 +113,10 @@ public abstract class AbstractMongodbIT extends 
TestSuiteBase implements TestRes
     }
 
     protected void initSourceData() {
-        MongoCollection<Document> sourceMatchTable =
-                
client.getDatabase(MONGODB_DATABASE).getCollection(MONGODB_MATCH_TABLE);
-        sourceMatchTable.deleteMany(new Document());
-        sourceMatchTable.insertMany(TEST_MATCH_DATASET);
-
-        MongoCollection<Document> sourceSplitTable =
-                
client.getDatabase(MONGODB_DATABASE).getCollection(MONGODB_SPLIT_TABLE);
-        sourceSplitTable.deleteMany(new Document());
-        sourceSplitTable.insertMany(TEST_SPLIT_DATASET);
-
-        MongoCollection<Document> sourceNullTable =
-                
client.getDatabase(MONGODB_DATABASE).getCollection(MONGODB_NULL_TABLE);
-        sourceNullTable.deleteMany(new Document());
-        sourceNullTable.insertMany(TEST_NULL_DATASET);
+        prepareInitDataInCollection(MONGODB_MATCH_TABLE, TEST_MATCH_DATASET);
+        prepareInitDataInCollection(MONGODB_SPLIT_TABLE, TEST_SPLIT_DATASET);
+        prepareInitDataInCollection(MONGODB_NULL_TABLE, TEST_NULL_DATASET);
+        prepareInitDataInCollection(MONGODB_DOUBLE_TABLE, TEST_DOUBLE_DATASET);
     }
 
     protected void clearDate(String table) {
@@ -129,51 +127,7 @@ public abstract class AbstractMongodbIT extends 
TestSuiteBase implements TestRes
         List<Document> dataSet = new ArrayList<>();
 
         for (int i = 0; i < count; i++) {
-            dataSet.add(
-                    new Document(
-                                    "c_map",
-                                    new Document("OQBqH", randomString())
-                                            .append("rkvlO", randomString())
-                                            .append("pCMEX", randomString())
-                                            .append("DAgdj", randomString())
-                                            .append("dsJag", randomString()))
-                            .append(
-                                    "c_array",
-                                    Arrays.asList(
-                                            RANDOM.nextInt(),
-                                            RANDOM.nextInt(),
-                                            RANDOM.nextInt(),
-                                            RANDOM.nextInt(),
-                                            RANDOM.nextInt()))
-                            .append("c_string", randomString())
-                            .append("c_boolean", RANDOM.nextBoolean())
-                            .append("c_int", i)
-                            .append("c_bigint", RANDOM.nextLong())
-                            .append("c_double", RANDOM.nextDouble() * 
Double.MAX_VALUE)
-                            .append(
-                                    "c_row",
-                                    new Document(
-                                                    "c_map",
-                                                    new Document("OQBqH", 
randomString())
-                                                            .append("rkvlO", 
randomString())
-                                                            .append("pCMEX", 
randomString())
-                                                            .append("DAgdj", 
randomString())
-                                                            .append("dsJag", 
randomString()))
-                                            .append(
-                                                    "c_array",
-                                                    Arrays.asList(
-                                                            RANDOM.nextInt(),
-                                                            RANDOM.nextInt(),
-                                                            RANDOM.nextInt(),
-                                                            RANDOM.nextInt(),
-                                                            RANDOM.nextInt()))
-                                            .append("c_string", randomString())
-                                            .append("c_boolean", 
RANDOM.nextBoolean())
-                                            .append("c_int", RANDOM.nextInt())
-                                            .append("c_bigint", 
RANDOM.nextLong())
-                                            .append(
-                                                    "c_double",
-                                                    RANDOM.nextDouble() * 
Double.MAX_VALUE)));
+            dataSet.add(generateData(i, RANDOM.nextDouble() * 
Double.MAX_VALUE));
         }
         return dataSet;
     }
@@ -195,6 +149,17 @@ public abstract class AbstractMongodbIT extends 
TestSuiteBase implements TestRes
         return dataSet;
     }
 
+    public static List<Document> generateTestDataSetWithPresets(
+            int count, List<Double> doublePresets) {
+        List<Document> dataSet = new ArrayList<>(count);
+
+        for (int i = 0; i < count; i++) {
+            dataSet.add(generateData(i, doublePresets.get(i)));
+        }
+
+        return dataSet;
+    }
+
     protected static String randomString() {
         int length = RANDOM.nextInt(10) + 1;
         StringBuilder sb = new StringBuilder(length);
@@ -205,6 +170,63 @@ public abstract class AbstractMongodbIT extends 
TestSuiteBase implements TestRes
         return sb.toString();
     }
 
+    private static Document generateData(int intPreset, Double doublePreset) {
+        return new Document(
+                        "c_map",
+                        new Document("OQBqH", randomString())
+                                .append("rkvlO", randomString())
+                                .append("pCMEX", randomString())
+                                .append("DAgdj", randomString())
+                                .append("dsJag", randomString()))
+                .append(
+                        "c_array",
+                        Arrays.asList(
+                                RANDOM.nextInt(),
+                                RANDOM.nextInt(),
+                                RANDOM.nextInt(),
+                                RANDOM.nextInt(),
+                                RANDOM.nextInt()))
+                .append("c_string", randomString())
+                .append("c_boolean", RANDOM.nextBoolean())
+                .append("c_int", intPreset)
+                .append("c_bigint", RANDOM.nextLong())
+                .append("c_double", doublePreset)
+                .append(
+                        "c_row",
+                        new Document(
+                                        "c_map",
+                                        new Document("OQBqH", randomString())
+                                                .append("rkvlO", 
randomString())
+                                                .append("pCMEX", 
randomString())
+                                                .append("DAgdj", 
randomString())
+                                                .append("dsJag", 
randomString()))
+                                .append(
+                                        "c_array",
+                                        Arrays.asList(
+                                                RANDOM.nextInt(),
+                                                RANDOM.nextInt(),
+                                                RANDOM.nextInt(),
+                                                RANDOM.nextInt(),
+                                                RANDOM.nextInt()))
+                                .append("c_string", randomString())
+                                .append("c_boolean", RANDOM.nextBoolean())
+                                .append("c_int", RANDOM.nextInt())
+                                .append("c_bigint", RANDOM.nextLong())
+                                .append("c_double", RANDOM.nextDouble() * 
Double.MAX_VALUE));
+    }
+
+    private void prepareInitDataInCollection(String collection, List<Document> 
dataSet) {
+        MongoCollection<Document> source =
+                client.getDatabase(MONGODB_DATABASE).getCollection(collection);
+        source.deleteMany(new Document());
+
+        InsertManyResult result = source.insertMany(dataSet);
+
+        if (result.getInsertedIds().size() != dataSet.size()) {
+            throw new IllegalStateException("Insertion count mismatch");
+        }
+    }
+
     protected List<Document> readMongodbData(String collection) {
         MongoCollection<Document> sinkTable =
                 client.getDatabase(MONGODB_DATABASE).getCollection(collection);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
index 8ff1f22329..b289af315f 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
@@ -221,4 +221,20 @@ public class MongodbIT extends AbstractMongodbIT {
         clearDate(MONGODB_TRANSACTION_SINK_TABLE);
         clearDate(MONGODB_TRANSACTION_UPSERT_TABLE);
     }
+
+    @TestTemplate
+    public void testMongodbDoubleValue(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult assertSinkResult = 
container.executeJob("/mongodb_double_value.conf");
+        Assertions.assertEquals(0, assertSinkResult.getExitCode(), 
assertSinkResult.getStderr());
+
+        Assertions.assertIterableEquals(
+                TEST_DOUBLE_DATASET.stream()
+                        .peek(e -> e.remove("_id"))
+                        .collect(Collectors.toList()),
+                readMongodbData(MONGODB_DOUBLE_TABLE_RESULT).stream()
+                        .peek(e -> e.remove("_id"))
+                        .collect(Collectors.toList()));
+        clearDate(MONGODB_DOUBLE_TABLE_RESULT);
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_double_value.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_double_value.conf
new file mode 100644
index 0000000000..0f0c01483a
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_double_value.conf
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  MongoDB {
+    uri = "mongodb://e2e_mongodb:27017/test_db"
+    database = "test_db"
+    collection = "test_double_op_db"
+    result_table_name = "mongodb_table"
+    cursor.no-timeout = true
+    fetch.size = 1000
+    max.time-min = 100
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_int = int
+        c_bigint = bigint
+        c_double = double
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_int = int
+          c_bigint = bigint
+          c_double = double
+        }
+      }
+    }
+  }
+}
+
+sink {
+  MongoDB {
+    uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true"
+    database = "test_db"
+    collection = "test_double_op_db_result"
+    source_table_name = "mongodb_table"
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_int = int
+        c_bigint = bigint
+        c_double = double
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_int = int
+          c_bigint = bigint
+          c_double = double
+        }
+      }
+    }
+  }
+}

Reply via email to