This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b222c13f2f [Bug][connectors-v2] fix mongodb bson convert exception 
(#8044)
b222c13f2f is described below

commit b222c13f2fcb53dd21fe63f375140e5d8563ab93
Author: zhangdonghao <39961809+hawk9...@users.noreply.github.com>
AuthorDate: Thu Nov 14 20:29:21 2024 +0800

    [Bug][connectors-v2] fix mongodb bson convert exception (#8044)
---
 .../MongoDBConnectorDeserializationSchema.java     |  70 +++++++-
 .../MongoDBConnectorDeserializationSchemaTest.java | 187 +++++++++++++++++++--
 .../mongodb/serde/BsonToRowDataConverters.java     |  35 +++-
 .../mongodb/serde/BsonToRowDataConvertersTest.java |  93 +++++++++-
 4 files changed, 363 insertions(+), 22 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
index edbeca4fab..823ed5d9ec 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
@@ -343,6 +343,15 @@ public class MongoDBConnectorDeserializationSchema
                         return convertToLocalDateTime(bsonValue).toLocalDate();
                     }
                 };
+            case TIME:
+                return new SerializableFunction<BsonValue, Object>() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object apply(BsonValue bsonValue) {
+                        return convertToLocalDateTime(bsonValue).toLocalTime();
+                    }
+                };
             case TIMESTAMP:
                 return new SerializableFunction<BsonValue, Object>() {
                     private static final long serialVersionUID = 1L;
@@ -382,7 +391,7 @@ public class MongoDBConnectorDeserializationSchema
     private static LocalDateTime convertToLocalDateTime(BsonValue bsonValue) {
         Instant instant;
         if (bsonValue.isTimestamp()) {
-            instant = Instant.ofEpochSecond(bsonValue.asTimestamp().getTime());
+            instant = 
Instant.ofEpochSecond(bsonValue.asTimestamp().getValue());
         } else if (bsonValue.isDateTime()) {
             instant = Instant.ofEpochMilli(bsonValue.asDateTime().getValue());
         } else {
@@ -521,7 +530,7 @@ public class MongoDBConnectorDeserializationSchema
     }
 
     private static double convertToDouble(@Nonnull BsonValue bsonValue) {
-        if (bsonValue.isDouble()) {
+        if (bsonValue.isNumber()) {
             return bsonValue.asNumber().doubleValue();
         }
         throw new MongodbConnectorException(
@@ -532,9 +541,20 @@ public class MongoDBConnectorDeserializationSchema
                         + bsonValue.getBsonType());
     }
 
-    private static int convertToInt(@Nonnull BsonValue bsonValue) {
+    private static int convertToInt(BsonValue bsonValue) {
         if (bsonValue.isInt32()) {
-            return bsonValue.asNumber().intValue();
+            return bsonValue.asInt32().getValue();
+        } else if (bsonValue.isNumber()) {
+            long longValue = bsonValue.asNumber().longValue();
+            if (longValue > Integer.MAX_VALUE || longValue < 
Integer.MIN_VALUE) {
+                throw new MongodbConnectorException(
+                        UNSUPPORTED_DATA_TYPE,
+                        "Unable to convert to integer from unexpected value '"
+                                + bsonValue
+                                + "' of type "
+                                + bsonValue.getBsonType());
+            }
+            return (int) longValue;
         }
         throw new MongodbConnectorException(
                 UNSUPPORTED_DATA_TYPE,
@@ -568,8 +588,19 @@ public class MongoDBConnectorDeserializationSchema
                 "Unsupported BYTES value type: " + 
bsonValue.getClass().getSimpleName());
     }
 
-    private static long convertToLong(@Nonnull BsonValue bsonValue) {
-        if (bsonValue.isInt64()) {
+    private static long convertToLong(BsonValue bsonValue) {
+        if (bsonValue.isInt64() || bsonValue.isInt32()) {
+            return bsonValue.asNumber().longValue();
+        } else if (bsonValue.isDouble()) {
+            double value = bsonValue.asNumber().doubleValue();
+            if (value > Long.MAX_VALUE || value < Long.MIN_VALUE) {
+                throw new MongodbConnectorException(
+                        UNSUPPORTED_DATA_TYPE,
+                        "Unable to convert to long from unexpected value '"
+                                + bsonValue
+                                + "' of type "
+                                + bsonValue.getBsonType());
+            }
             return bsonValue.asNumber().longValue();
         }
         throw new MongodbConnectorException(
@@ -599,4 +630,31 @@ public class MongoDBConnectorDeserializationSchema
                         + "' of type "
                         + bsonValue.getBsonType());
     }
+
+    @VisibleForTesting
+    public Object convertToObject(
+            @Nonnull SeaTunnelDataType<?> dataType, @Nonnull BsonValue 
bsonValue) {
+        switch (dataType.getSqlType()) {
+            case INT:
+                return convertToInt(bsonValue);
+            case BIGINT:
+                return convertToLong(bsonValue);
+            case DOUBLE:
+                return convertToDouble(bsonValue);
+            case STRING:
+                return convertToString(bsonValue);
+            case DATE:
+                return convertToLocalDateTime(bsonValue).toLocalDate();
+            case TIME:
+                return convertToLocalDateTime(bsonValue).toLocalTime();
+            case TIMESTAMP:
+                return convertToLocalDateTime(bsonValue);
+            case DECIMAL:
+                DecimalType decimalType = (DecimalType) dataType;
+                BigDecimal decimalValue = convertToBigDecimal(bsonValue);
+                return fromBigDecimal(
+                        decimalValue, decimalType.getPrecision(), 
decimalType.getScale());
+        }
+        return null;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/test/java/mongodb/sender/MongoDBConnectorDeserializationSchemaTest.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/test/java/mongodb/sender/MongoDBConnectorDeserializationSchemaTest.java
index 1b90490093..91c9fb47bc 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/test/java/mongodb/sender/MongoDBConnectorDeserializationSchemaTest.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/test/java/mongodb/sender/MongoDBConnectorDeserializationSchemaTest.java
@@ -22,19 +22,37 @@ import 
org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.exception.MongodbConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.sender.MongoDBConnectorDeserializationSchema;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils;
 
 import org.apache.kafka.connect.source.SourceRecord;
 
+import org.bson.BsonDateTime;
+import org.bson.BsonDecimal128;
 import org.bson.BsonDocument;
+import org.bson.BsonDouble;
+import org.bson.BsonInt32;
 import org.bson.BsonInt64;
+import org.bson.BsonObjectId;
 import org.bson.BsonString;
+import org.bson.types.Decimal128;
+import org.bson.types.ObjectId;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
+import java.math.BigDecimal;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.temporal.ChronoUnit;
 import java.util.Collections;
 import java.util.Map;
+import java.util.Objects;
+import java.util.stream.IntStream;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.COLL_FIELD;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DB_FIELD;
@@ -52,25 +70,65 @@ import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.Mongod
 
 public class MongoDBConnectorDeserializationSchemaTest {
 
-    @Test
-    public void extractTableId() {
-        CatalogTable catalogTable =
+    private static TableSchema tableSchema;
+    private static CatalogTable catalogTable;
+
+    @BeforeAll
+    public static void setUp() {
+        tableSchema =
+                TableSchema.builder()
+                        .column(PhysicalColumn.of("int", BasicType.INT_TYPE, 
1L, true, null, ""))
+                        .column(PhysicalColumn.of("long", BasicType.LONG_TYPE, 
1L, true, null, ""))
+                        .column(
+                                PhysicalColumn.of(
+                                        "double", BasicType.DOUBLE_TYPE, 1L, 
true, null, ""))
+                        .column(
+                                PhysicalColumn.of(
+                                        "decimal", new DecimalType(10, 2), 1L, 
true, null, ""))
+                        .column(
+                                PhysicalColumn.of(
+                                        "string", BasicType.STRING_TYPE, 200L, 
true, null, ""))
+                        .column(
+                                PhysicalColumn.of(
+                                        "date",
+                                        LocalTimeType.LOCAL_DATE_TYPE,
+                                        null,
+                                        null,
+                                        true,
+                                        null,
+                                        null))
+                        .column(
+                                PhysicalColumn.of(
+                                        "time",
+                                        LocalTimeType.LOCAL_TIME_TYPE,
+                                        null,
+                                        null,
+                                        true,
+                                        null,
+                                        null))
+                        .column(
+                                PhysicalColumn.of(
+                                        "timestamp",
+                                        LocalTimeType.LOCAL_DATE_TIME_TYPE,
+                                        null,
+                                        null,
+                                        true,
+                                        null,
+                                        null))
+                        .build();
+        catalogTable =
                 CatalogTable.of(
                         TableIdentifier.of("catalog", "database", "table"),
-                        TableSchema.builder()
-                                .column(
-                                        PhysicalColumn.of(
-                                                "name1", 
BasicType.STRING_TYPE, 1L, true, null, ""))
-                                .column(
-                                        PhysicalColumn.of(
-                                                "name1", 
BasicType.STRING_TYPE, 1L, true, null, ""))
-                                .build(),
+                        tableSchema,
                         Collections.emptyMap(),
                         Collections.emptyList(),
                         "comment");
+    }
+
+    @Test
+    public void extractTableId() {
         MongoDBConnectorDeserializationSchema schema =
-                new MongoDBConnectorDeserializationSchema(
-                        Collections.singletonList(catalogTable), 
Collections.emptyMap());
+                new 
MongoDBConnectorDeserializationSchema(Collections.singletonList(catalogTable));
 
         // Build SourceRecord
         Map<String, String> partitionMap =
@@ -106,4 +164,107 @@ public class MongoDBConnectorDeserializationSchemaTest {
         Object tableId = schema.extractTableIdForTest(sourceRecord);
         Assertions.assertEquals("inventory.products", tableId);
     }
+
+    @Test
+    public void testBsonConvert() {
+        MongoDBConnectorDeserializationSchema schema =
+                new 
MongoDBConnectorDeserializationSchema(Collections.singletonList(catalogTable));
+        // check int
+        Assertions.assertEquals(
+                123456, schema.convertToObject(getDataType("int"), new 
BsonInt32(123456)));
+        Assertions.assertEquals(
+                Integer.MAX_VALUE,
+                schema.convertToObject(getDataType("int"), new 
BsonInt64(Integer.MAX_VALUE)));
+        Assertions.assertEquals(
+                123456, schema.convertToObject(getDataType("int"), new 
BsonDouble(123456)));
+        Assertions.assertThrowsExactly(
+                MongodbConnectorException.class,
+                () ->
+                        schema.convertToObject(
+                                getDataType("int"), new 
BsonDouble(1234567890123456789.0d)));
+        Assertions.assertThrowsExactly(
+                MongodbConnectorException.class,
+                () -> schema.convertToObject(getDataType("int"), new 
BsonInt64(Long.MIN_VALUE)));
+        // check long
+        Assertions.assertEquals(
+                123456L, schema.convertToObject(getDataType("long"), new 
BsonInt32(123456)));
+        Assertions.assertEquals(
+                (long) Integer.MAX_VALUE,
+                schema.convertToObject(getDataType("long"), new 
BsonInt64(Integer.MAX_VALUE)));
+        Assertions.assertEquals(
+                123456L, schema.convertToObject(getDataType("long"), new 
BsonDouble(123456)));
+        Assertions.assertThrowsExactly(
+                MongodbConnectorException.class,
+                () ->
+                        schema.convertToObject(
+                                getDataType("long"),
+                                new 
BsonDouble(12345678901234567891234567890123456789.0d)));
+
+        // check double
+        Assertions.assertEquals(
+                1.0d, schema.convertToObject(getDataType("double"), new 
BsonInt32(1)));
+        Assertions.assertEquals(
+                1.0d, schema.convertToObject(getDataType("double"), new 
BsonInt64(1)));
+        Assertions.assertEquals(
+                4.4d, schema.convertToObject(getDataType("double"), new 
BsonDouble(4.4)));
+        // check decimal
+        Assertions.assertEquals(
+                new BigDecimal("3.14"),
+                schema.convertToObject(
+                        getDataType("decimal"), new 
BsonDecimal128(Decimal128.parse("3.1415926"))));
+        // check string
+        Assertions.assertEquals(
+                "123456", schema.convertToObject(getDataType("string"), new 
BsonString("123456")));
+        Assertions.assertEquals(
+                "507f191e810c19729de860ea",
+                schema.convertToObject(
+                        getDataType("string"),
+                        new BsonObjectId(new 
ObjectId("507f191e810c19729de860ea"))));
+        BsonDocument document =
+                new BsonDocument()
+                        .append("key", new BsonString("123456"))
+                        .append("value", new BsonInt64(123456789L));
+        Assertions.assertEquals(
+                "{\"key\": \"123456\", \"value\": 123456789}",
+                schema.convertToObject(getDataType("string"), document));
+
+        LocalDateTime now = LocalDateTime.now().truncatedTo(ChronoUnit.MILLIS);
+        long epochMilli = 
now.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
+        // check localDate
+        Assertions.assertEquals(
+                now.toLocalDate(),
+                schema.convertToObject(getDataType("date"), new 
BsonDateTime(epochMilli)));
+        Assertions.assertEquals(
+                now.toLocalDate(),
+                schema.convertToObject(getDataType("date"), new 
BsonDateTime(epochMilli)));
+        // check localTime
+        Assertions.assertEquals(
+                now.toLocalTime(),
+                schema.convertToObject(getDataType("time"), new 
BsonDateTime(epochMilli)));
+        Assertions.assertEquals(
+                now.toLocalTime(),
+                schema.convertToObject(getDataType("time"), new 
BsonDateTime(epochMilli)));
+        // check localDateTime
+        Assertions.assertEquals(
+                now,
+                schema.convertToObject(getDataType("timestamp"), new 
BsonDateTime(epochMilli)));
+        Assertions.assertEquals(
+                now,
+                schema.convertToObject(getDataType("timestamp"), new 
BsonDateTime(epochMilli)));
+    }
+
+    private SeaTunnelDataType<?> getDataType(String fieldName) {
+        String[] fieldNames = tableSchema.getFieldNames();
+        return IntStream.range(0, fieldNames.length)
+                .mapToObj(
+                        i -> {
+                            if (fieldName.equals(fieldNames[i])) {
+                                return 
tableSchema.getColumns().get(i).getDataType();
+                            }
+                            return null;
+                        })
+                .filter(Objects::nonNull)
+                .findFirst()
+                .orElseThrow(() -> new RuntimeException("not found field"));
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/BsonToRowDataConverters.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/BsonToRowDataConverters.java
index 8eda6612c7..4993a0db46 100644
--- 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/BsonToRowDataConverters.java
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/BsonToRowDataConverters.java
@@ -178,6 +178,15 @@ public class BsonToRowDataConverters implements 
Serializable {
                         return convertToLocalDateTime(bsonValue).toLocalDate();
                     }
                 };
+            case TIME:
+                return new SerializableFunction<BsonValue, Object>() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object apply(BsonValue bsonValue) {
+                        return convertToLocalDateTime(bsonValue).toLocalTime();
+                    }
+                };
             case TIMESTAMP:
                 return new SerializableFunction<BsonValue, Object>() {
                     private static final long serialVersionUID = 1L;
@@ -217,7 +226,7 @@ public class BsonToRowDataConverters implements 
Serializable {
     private static LocalDateTime convertToLocalDateTime(BsonValue bsonValue) {
         Instant instant;
         if (bsonValue.isTimestamp()) {
-            instant = Instant.ofEpochSecond(bsonValue.asTimestamp().getTime());
+            instant = Instant.ofEpochMilli(bsonValue.asTimestamp().getValue());
         } else if (bsonValue.isDateTime()) {
             instant = Instant.ofEpochMilli(bsonValue.asDateTime().getValue());
         } else {
@@ -366,7 +375,18 @@ public class BsonToRowDataConverters implements 
Serializable {
 
     private static int convertToInt(BsonValue bsonValue) {
         if (bsonValue.isInt32()) {
-            return bsonValue.asNumber().intValue();
+            return bsonValue.asInt32().getValue();
+        } else if (bsonValue.isNumber()) {
+            long longValue = bsonValue.asNumber().longValue();
+            if (longValue > Integer.MAX_VALUE || longValue < 
Integer.MIN_VALUE) {
+                throw new MongodbConnectorException(
+                        UNSUPPORTED_DATA_TYPE,
+                        "Unable to convert to integer from unexpected value '"
+                                + bsonValue
+                                + "' of type "
+                                + bsonValue.getBsonType());
+            }
+            return (int) longValue;
         }
         throw new MongodbConnectorException(
                 UNSUPPORTED_DATA_TYPE,
@@ -403,6 +423,17 @@ public class BsonToRowDataConverters implements 
Serializable {
     private static long convertToLong(BsonValue bsonValue) {
         if (bsonValue.isInt64() || bsonValue.isInt32()) {
             return bsonValue.asNumber().longValue();
+        } else if (bsonValue.isDouble()) {
+            double value = bsonValue.asNumber().doubleValue();
+            if (value > Long.MAX_VALUE || value < Long.MIN_VALUE) {
+                throw new MongodbConnectorException(
+                        UNSUPPORTED_DATA_TYPE,
+                        "Unable to convert to long from unexpected value '"
+                                + bsonValue
+                                + "' of type "
+                                + bsonValue.getBsonType());
+            }
+            return bsonValue.asNumber().longValue();
         }
         throw new MongodbConnectorException(
                 UNSUPPORTED_DATA_TYPE,
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/test/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/BsonToRowDataConvertersTest.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/test/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/BsonToRowDataConvertersTest.java
index b47769c0ac..26c268a4e7 100644
--- 
a/seatunnel-connectors-v2/connector-mongodb/src/test/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/BsonToRowDataConvertersTest.java
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/test/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/BsonToRowDataConvertersTest.java
@@ -18,13 +18,29 @@
 package org.apache.seatunnel.connectors.seatunnel.mongodb.serde;
 
 import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import 
org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;
 
+import org.bson.BsonDateTime;
+import org.bson.BsonDecimal128;
+import org.bson.BsonDocument;
 import org.bson.BsonDouble;
 import org.bson.BsonInt32;
 import org.bson.BsonInt64;
+import org.bson.BsonObjectId;
+import org.bson.BsonString;
+import org.bson.BsonTimestamp;
+import org.bson.types.Decimal128;
+import org.bson.types.ObjectId;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.math.BigDecimal;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.temporal.ChronoUnit;
+
 public class BsonToRowDataConvertersTest {
     private final BsonToRowDataConverters converterFactory = new 
BsonToRowDataConverters();
 
@@ -42,7 +58,7 @@ public class BsonToRowDataConvertersTest {
     }
 
     @Test
-    public void testConvertBsonIntToBigInt() {
+    public void testConvertBsonNumberToLong() {
         // It covered #7567
         BsonToRowDataConverters.BsonToRowDataConverter converter =
                 converterFactory.createConverter(BasicType.LONG_TYPE);
@@ -51,5 +67,80 @@ public class BsonToRowDataConvertersTest {
 
         Assertions.assertEquals(
                 (long) Integer.MAX_VALUE, converter.convert(new 
BsonInt64(Integer.MAX_VALUE)));
+
+        Assertions.assertEquals(123456L, converter.convert(new 
BsonDouble(123456)));
+
+        Assertions.assertThrowsExactly(
+                MongodbConnectorException.class,
+                () -> converter.convert(new 
BsonDouble(12345678901234567891234567890123456789.0d)));
+    }
+
+    @Test
+    public void testConvertBsonNumberToInt() {
+        // It covered #8042
+        BsonToRowDataConverters.BsonToRowDataConverter converter =
+                converterFactory.createConverter(BasicType.INT_TYPE);
+        Assertions.assertEquals(123456, converter.convert(new 
BsonInt32(123456)));
+        Assertions.assertEquals(
+                Integer.MAX_VALUE, converter.convert(new 
BsonInt64(Integer.MAX_VALUE)));
+        Assertions.assertEquals(123456, converter.convert(new 
BsonDouble(123456)));
+        Assertions.assertThrowsExactly(
+                MongodbConnectorException.class,
+                () -> converter.convert(new 
BsonDouble(1234567890123456789.0d)));
+    }
+
+    @Test
+    public void testConvertBsonDecimal128ToDecimal() {
+        BsonToRowDataConverters.BsonToRowDataConverter converter =
+                converterFactory.createConverter(new DecimalType(10, 2));
+        Assertions.assertEquals(
+                new BigDecimal("3.14"),
+                converter.convert(new 
BsonDecimal128(Decimal128.parse("3.1415926"))));
+    }
+
+    @Test
+    public void testConvertBsonToString() {
+        BsonToRowDataConverters.BsonToRowDataConverter converter =
+                converterFactory.createConverter(BasicType.STRING_TYPE);
+        Assertions.assertEquals("123456", converter.convert(new 
BsonString("123456")));
+
+        Assertions.assertEquals(
+                "507f191e810c19729de860ea",
+                converter.convert(new BsonObjectId(new 
ObjectId("507f191e810c19729de860ea"))));
+
+        BsonDocument document =
+                new BsonDocument()
+                        .append("key", new BsonString("123456"))
+                        .append("value", new BsonInt64(123456789L));
+        Assertions.assertEquals(
+                "{\"key\": \"123456\", \"value\": 123456789}", 
converter.convert(document));
+    }
+
+    @Test
+    public void testConvertBsonToLocalDateTime() {
+        LocalDateTime now = LocalDateTime.now().truncatedTo(ChronoUnit.MILLIS);
+        long epochMilli = 
now.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
+
+        // localDataTime converter
+        BsonToRowDataConverters.BsonToRowDataConverter localDataTimeConverter =
+                
converterFactory.createConverter(LocalTimeType.LOCAL_DATE_TIME_TYPE);
+        Assertions.assertEquals(now, localDataTimeConverter.convert(new 
BsonTimestamp(epochMilli)));
+        Assertions.assertEquals(now, localDataTimeConverter.convert(new 
BsonDateTime(epochMilli)));
+
+        // localDate converter
+        BsonToRowDataConverters.BsonToRowDataConverter localDataConverter =
+                
converterFactory.createConverter(LocalTimeType.LOCAL_DATE_TYPE);
+        Assertions.assertEquals(
+                now.toLocalDate(), localDataConverter.convert(new 
BsonTimestamp(epochMilli)));
+        Assertions.assertEquals(
+                now.toLocalDate(), localDataConverter.convert(new 
BsonDateTime(epochMilli)));
+
+        // localTime converter
+        BsonToRowDataConverters.BsonToRowDataConverter localTimeConverter =
+                
converterFactory.createConverter(LocalTimeType.LOCAL_TIME_TYPE);
+        Assertions.assertEquals(
+                now.toLocalTime(), localTimeConverter.convert(new 
BsonTimestamp(epochMilli)));
+        Assertions.assertEquals(
+                now.toLocalTime(), localTimeConverter.convert(new 
BsonDateTime(epochMilli)));
     }
 }

Reply via email to