This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c5ecda50f8 [bugfix][connector-mongodb] fix mongodb null value write 
(#6967)
c5ecda50f8 is described below

commit c5ecda50f8fa0c81733bbc40ca38567b7fa63a1a
Author: 老王 <58297137+chl-...@users.noreply.github.com>
AuthorDate: Thu Jun 13 11:55:00 2024 +0800

    [bugfix][connector-mongodb] fix mongodb null value write (#6967)
---
 .../mongodb/serde/RowDataToBsonConverters.java     |  6 +-
 .../connector/v2/mongodb/AbstractMongodbIT.java    | 30 +++++++-
 .../e2e/connector/v2/mongodb/MongodbIT.java        | 20 +++++
 .../src/test/resources/mongodb_null_value.conf     | 89 ++++++++++++++++++++++
 4 files changed, 138 insertions(+), 7 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/RowDataToBsonConverters.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/RowDataToBsonConverters.java
index 4d45a61741..bf74b8cf83 100644
--- 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/RowDataToBsonConverters.java
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/RowDataToBsonConverters.java
@@ -92,11 +92,7 @@ public class RowDataToBsonConverters implements Serializable 
{
             @Override
             public BsonValue apply(Object value) {
                 if (value == null || NULL.equals(type.getSqlType())) {
-                    throw new MongodbConnectorException(
-                            UNSUPPORTED_DATA_TYPE,
-                            "The column type is <"
-                                    + type
-                                    + ">, but a null value is being written 
into it");
+                    return new BsonNull();
                 } else {
                     return internalConverter.apply(value);
                 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java
index 5dbe7cf347..dd24dc1e60 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java
@@ -58,6 +58,8 @@ public abstract class AbstractMongodbIT extends TestSuiteBase 
implements TestRes
 
     protected static final List<Document> TEST_SPLIT_DATASET = 
generateTestDataSet(10);
 
+    protected static final List<Document> TEST_NULL_DATASET = 
generateTestDataSetWithNull(10);
+
     protected static final String MONGODB_IMAGE = "mongo:latest";
 
     protected static final String MONGODB_CONTAINER_HOST = "e2e_mongodb";
@@ -70,6 +72,10 @@ public abstract class AbstractMongodbIT extends 
TestSuiteBase implements TestRes
 
     protected static final String MONGODB_SPLIT_TABLE = "test_split_op_db";
 
+    protected static final String MONGODB_NULL_TABLE = "test_null_op_db";
+
+    protected static final String MONGODB_NULL_TABLE_RESULT = 
"test_null_op_db_result";
+
     protected static final String MONGODB_MATCH_RESULT_TABLE = 
"test_match_op_result_db";
 
     protected static final String MONGODB_SPLIT_RESULT_TABLE = 
"test_split_op_result_db";
@@ -101,15 +107,18 @@ public abstract class AbstractMongodbIT extends 
TestSuiteBase implements TestRes
     protected void initSourceData() {
         MongoCollection<Document> sourceMatchTable =
                 
client.getDatabase(MONGODB_DATABASE).getCollection(MONGODB_MATCH_TABLE);
-
         sourceMatchTable.deleteMany(new Document());
         sourceMatchTable.insertMany(TEST_MATCH_DATASET);
 
         MongoCollection<Document> sourceSplitTable =
                 
client.getDatabase(MONGODB_DATABASE).getCollection(MONGODB_SPLIT_TABLE);
-
         sourceSplitTable.deleteMany(new Document());
         sourceSplitTable.insertMany(TEST_SPLIT_DATASET);
+
+        MongoCollection<Document> sourceNullTable =
+                
client.getDatabase(MONGODB_DATABASE).getCollection(MONGODB_NULL_TABLE);
+        sourceNullTable.deleteMany(new Document());
+        sourceNullTable.insertMany(TEST_NULL_DATASET);
     }
 
     protected void clearDate(String table) {
@@ -169,6 +178,23 @@ public abstract class AbstractMongodbIT extends 
TestSuiteBase implements TestRes
         return dataSet;
     }
 
+    public static List<Document> generateTestDataSetWithNull(int count) {
+        List<Document> dataSet = new ArrayList<>();
+
+        for (int i = 0; i < count; i++) {
+            dataSet.add(
+                    new Document("c_map", null)
+                            .append("c_array", null)
+                            .append("c_string", null)
+                            .append("c_boolean", null)
+                            .append("c_int", null)
+                            .append("c_bigint", null)
+                            .append("c_double", null)
+                            .append("c_row", null));
+        }
+        return dataSet;
+    }
+
     protected static String randomString() {
         int length = RANDOM.nextInt(10) + 1;
         StringBuilder sb = new StringBuilder(length);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
index fb643455a6..8ff1f22329 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
@@ -17,7 +17,9 @@
 
 package org.apache.seatunnel.e2e.connector.v2.mongodb;
 
+import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 
 import org.bson.Document;
 import org.junit.jupiter.api.Assertions;
@@ -43,6 +45,24 @@ public class MongodbIT extends AbstractMongodbIT {
         clearDate(MONGODB_SINK_TABLE);
     }
 
+    @TestTemplate
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.FLINK, EngineType.SPARK},
+            disabledReason = "Currently SPARK and FLINK do not support mongodb 
null value write")
+    public void testMongodbNullValue(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult nullResult = 
container.executeJob("/mongodb_null_value.conf");
+        Assertions.assertEquals(0, nullResult.getExitCode(), 
nullResult.getStderr());
+        Assertions.assertIterableEquals(
+                TEST_NULL_DATASET.stream().peek(e -> 
e.remove("_id")).collect(Collectors.toList()),
+                readMongodbData(MONGODB_NULL_TABLE_RESULT).stream()
+                        .peek(e -> e.remove("_id"))
+                        .collect(Collectors.toList()));
+        clearDate(MONGODB_NULL_TABLE);
+        clearDate(MONGODB_NULL_TABLE_RESULT);
+    }
+
     @TestTemplate
     public void testMongodbSourceMatch(TestContainer container)
             throws IOException, InterruptedException {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_null_value.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_null_value.conf
new file mode 100644
index 0000000000..3ec2a46c9f
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_null_value.conf
@@ -0,0 +1,89 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  MongoDB {
+    uri = "mongodb://e2e_mongodb:27017/test_db"
+    database = "test_db"
+    collection = "test_null_op_db"
+    match.projection = "{ c_bigint:0 }"
+    result_table_name = "mongodb_null_table"
+    cursor.no-timeout = true
+    fetch.size = 1000
+    max.time-min = 100
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_int = int
+        c_bigint = bigint
+        c_double = double
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_int = int
+          c_bigint = bigint
+          c_double = double
+        }
+      }
+    }
+  }
+}
+
+sink {
+  MongoDB {
+    uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true"
+    database = "test_db"
+    collection = "test_null_op_db_result"
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_int = int
+        c_bigint = bigint
+        c_double = double
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_int = int
+          c_bigint = bigint
+          c_double = double
+        }
+      }
+    }
+  }
+
+}

Reply via email to