This is an automated email from the ASF dual-hosted git repository. fanjia pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new c5ecda50f8 [bugfix][connector-mongodb] fix mongodb null value write (#6967) c5ecda50f8 is described below commit c5ecda50f8fa0c81733bbc40ca38567b7fa63a1a Author: 老王 <58297137+chl-...@users.noreply.github.com> AuthorDate: Thu Jun 13 11:55:00 2024 +0800 [bugfix][connector-mongodb] fix mongodb null value write (#6967) --- .../mongodb/serde/RowDataToBsonConverters.java | 6 +- .../connector/v2/mongodb/AbstractMongodbIT.java | 30 +++++++- .../e2e/connector/v2/mongodb/MongodbIT.java | 20 +++++ .../src/test/resources/mongodb_null_value.conf | 89 ++++++++++++++++++++++ 4 files changed, 138 insertions(+), 7 deletions(-) diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/RowDataToBsonConverters.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/RowDataToBsonConverters.java index 4d45a61741..bf74b8cf83 100644 --- a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/RowDataToBsonConverters.java +++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/RowDataToBsonConverters.java @@ -92,11 +92,7 @@ public class RowDataToBsonConverters implements Serializable { @Override public BsonValue apply(Object value) { if (value == null || NULL.equals(type.getSqlType())) { - throw new MongodbConnectorException( - UNSUPPORTED_DATA_TYPE, - "The column type is <" - + type - + ">, but a null value is being written into it"); + return new BsonNull(); } else { return internalConverter.apply(value); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java index 5dbe7cf347..dd24dc1e60 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java @@ -58,6 +58,8 @@ public abstract class AbstractMongodbIT extends TestSuiteBase implements TestRes protected static final List<Document> TEST_SPLIT_DATASET = generateTestDataSet(10); + protected static final List<Document> TEST_NULL_DATASET = generateTestDataSetWithNull(10); + protected static final String MONGODB_IMAGE = "mongo:latest"; protected static final String MONGODB_CONTAINER_HOST = "e2e_mongodb"; @@ -70,6 +72,10 @@ public abstract class AbstractMongodbIT extends TestSuiteBase implements TestRes protected static final String MONGODB_SPLIT_TABLE = "test_split_op_db"; + protected static final String MONGODB_NULL_TABLE = "test_null_op_db"; + + protected static final String MONGODB_NULL_TABLE_RESULT = "test_null_op_db_result"; + protected static final String MONGODB_MATCH_RESULT_TABLE = "test_match_op_result_db"; protected static final String MONGODB_SPLIT_RESULT_TABLE = "test_split_op_result_db"; @@ -101,15 +107,18 @@ public abstract class AbstractMongodbIT extends TestSuiteBase implements TestRes protected void initSourceData() { MongoCollection<Document> sourceMatchTable = client.getDatabase(MONGODB_DATABASE).getCollection(MONGODB_MATCH_TABLE); - sourceMatchTable.deleteMany(new Document()); sourceMatchTable.insertMany(TEST_MATCH_DATASET); MongoCollection<Document> sourceSplitTable = client.getDatabase(MONGODB_DATABASE).getCollection(MONGODB_SPLIT_TABLE); - sourceSplitTable.deleteMany(new Document()); sourceSplitTable.insertMany(TEST_SPLIT_DATASET); + + MongoCollection<Document> sourceNullTable = + client.getDatabase(MONGODB_DATABASE).getCollection(MONGODB_NULL_TABLE); + sourceNullTable.deleteMany(new Document()); + sourceNullTable.insertMany(TEST_NULL_DATASET); } protected void clearDate(String table) { @@ -169,6 +178,23 @@ public abstract class AbstractMongodbIT extends TestSuiteBase implements TestRes return dataSet; } + public static List<Document> generateTestDataSetWithNull(int count) { + List<Document> dataSet = new ArrayList<>(); + + for (int i = 0; i < count; i++) { + dataSet.add( + new Document("c_map", null) + .append("c_array", null) + .append("c_string", null) + .append("c_boolean", null) + .append("c_int", null) + .append("c_bigint", null) + .append("c_double", null) + .append("c_row", null)); + } + return dataSet; + } + protected static String randomString() { int length = RANDOM.nextInt(10) + 1; StringBuilder sb = new StringBuilder(length); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java index fb643455a6..8ff1f22329 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java @@ -17,7 +17,9 @@ package org.apache.seatunnel.e2e.connector.v2.mongodb; +import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.bson.Document; import org.junit.jupiter.api.Assertions; @@ -43,6 +45,24 @@ public class MongodbIT extends AbstractMongodbIT { clearDate(MONGODB_SINK_TABLE); } + @TestTemplate + @DisabledOnContainer( + value = {}, + type = {EngineType.FLINK, EngineType.SPARK}, + disabledReason = "Currently SPARK and FLINK do not support mongodb null value write") + public void testMongodbNullValue(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult nullResult = container.executeJob("/mongodb_null_value.conf"); + Assertions.assertEquals(0, nullResult.getExitCode(), nullResult.getStderr()); + Assertions.assertIterableEquals( + TEST_NULL_DATASET.stream().peek(e -> e.remove("_id")).collect(Collectors.toList()), + readMongodbData(MONGODB_NULL_TABLE_RESULT).stream() + .peek(e -> e.remove("_id")) + .collect(Collectors.toList())); + clearDate(MONGODB_NULL_TABLE); + clearDate(MONGODB_NULL_TABLE_RESULT); + } + @TestTemplate public void testMongodbSourceMatch(TestContainer container) throws IOException, InterruptedException { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_null_value.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_null_value.conf new file mode 100644 index 0000000000..3ec2a46c9f --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_null_value.conf @@ -0,0 +1,89 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + MongoDB { + uri = "mongodb://e2e_mongodb:27017/test_db" + database = "test_db" + collection = "test_null_op_db" + match.projection = "{ c_bigint:0 }" + result_table_name = "mongodb_null_table" + cursor.no-timeout = true + fetch.size = 1000 + max.time-min = 100 + schema = { + fields { + c_map = "map<string, string>" + c_array = "array<int>" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + c_row = { + c_map = "map<string, string>" + c_array = "array<int>" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + } + } + } + } +} + +sink { + MongoDB { + uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true" + database = "test_db" + collection = "test_null_op_db_result" + schema = { + fields { + c_map = "map<string, string>" + c_array = "array<int>" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + c_row = { + c_map = "map<string, string>" + c_array = "array<int>" + c_string = string + c_boolean = boolean + c_int = int + c_bigint = bigint + c_double = double + } + } + } + } + +}