This is an automated email from the ASF dual-hosted git repository.

wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new df210ea73d [Hotfix][Connector-V2] Fixed lost data precision for 
decimal data types (#7527)
df210ea73d is described below

commit df210ea73d5b14b36b9637fc4d12bee09fa8decd
Author: dailai <dai...@chinatelecom.cn>
AuthorDate: Fri Aug 30 12:44:00 2024 +0800

    [Hotfix][Connector-V2] Fixed lost data precision for decimal data types 
(#7527)
---
 .../seatunnel/paimon/utils/RowConverter.java       |  56 ++++++----
 .../seatunnel/paimon/utils/RowConverterTest.java   | 118 ++++++++++++---------
 2 files changed, 106 insertions(+), 68 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
index 9c576018a3..45c2c492c1 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
@@ -346,17 +346,18 @@ public class RowConverter {
      *
      * @param seaTunnelRow SeaTunnel row object
      * @param seaTunnelRowType SeaTunnel row type
-     * @param tableSchema Paimon table schema
+     * @param sinkTableSchema Paimon table schema
      * @return Paimon row object
      */
     public static InternalRow reconvert(
-            SeaTunnelRow seaTunnelRow, SeaTunnelRowType seaTunnelRowType, 
TableSchema tableSchema) {
-        List<DataField> sinkTotalFields = tableSchema.fields();
+            SeaTunnelRow seaTunnelRow,
+            SeaTunnelRowType seaTunnelRowType,
+            TableSchema sinkTableSchema) {
+        List<DataField> sinkTotalFields = sinkTableSchema.fields();
         int sourceTotalFields = seaTunnelRowType.getTotalFields();
         if (sourceTotalFields != sinkTotalFields.size()) {
-            throw new CommonError()
-                    .writeRowErrorWithFiledsCountNotMatch(
-                            "Paimon", sourceTotalFields, 
sinkTotalFields.size());
+            throw CommonError.writeRowErrorWithFiledsCountNotMatch(
+                    "Paimon", sourceTotalFields, sinkTotalFields.size());
         }
         BinaryRow binaryRow = new BinaryRow(sourceTotalFields);
         BinaryWriter binaryWriter = new BinaryRowWriter(binaryRow);
@@ -399,14 +400,17 @@ public class RowConverter {
                     binaryWriter.writeDouble(i, (Double) 
seaTunnelRow.getField(i));
                     break;
                 case DECIMAL:
-                    DecimalType fieldType = (DecimalType) 
seaTunnelRowType.getFieldType(i);
+                    DataField decimalDataField =
+                            SchemaUtil.getDataField(sinkTotalFields, 
fieldName);
+                    org.apache.paimon.types.DecimalType decimalType =
+                            (org.apache.paimon.types.DecimalType) 
decimalDataField.type();
                     binaryWriter.writeDecimal(
                             i,
                             Decimal.fromBigDecimal(
                                     (BigDecimal) seaTunnelRow.getField(i),
-                                    fieldType.getPrecision(),
-                                    fieldType.getScale()),
-                            fieldType.getPrecision());
+                                    decimalType.getPrecision(),
+                                    decimalType.getScale()),
+                            decimalType.getPrecision());
                     break;
                 case STRING:
                     binaryWriter.writeString(
@@ -464,9 +468,12 @@ public class RowConverter {
                     SeaTunnelDataType<?> rowType = 
seaTunnelRowType.getFieldType(i);
                     Object row = seaTunnelRow.getField(i);
                     InternalRow paimonRow =
-                            reconvert((SeaTunnelRow) row, (SeaTunnelRowType) 
rowType, tableSchema);
+                            reconvert(
+                                    (SeaTunnelRow) row,
+                                    (SeaTunnelRowType) rowType,
+                                    sinkTableSchema);
                     RowType paimonRowType =
-                            RowTypeConverter.reconvert((SeaTunnelRowType) 
rowType, tableSchema);
+                            RowTypeConverter.reconvert((SeaTunnelRowType) 
rowType, sinkTableSchema);
                     binaryWriter.writeRow(i, paimonRow, new 
InternalRowSerializer(paimonRowType));
                     break;
                 default:
@@ -489,12 +496,25 @@ public class RowConverter {
         DataField exceptDataField = new DataField(i, sourceFieldName, 
exceptDataType);
         DataType sinkDataType = sinkDataField.type();
         if (!exceptDataType.getTypeRoot().equals(sinkDataType.getTypeRoot())) {
-            throw new CommonError()
-                    .writeRowErrorWithSchemaIncompatibleSchema(
-                            "Paimon",
-                            sourceFieldName + StringUtils.SPACE + 
sourceFieldType.getSqlType(),
-                            exceptDataField.asSQLString(),
-                            sinkDataField.asSQLString());
+            throw CommonError.writeRowErrorWithSchemaIncompatibleSchema(
+                    "Paimon",
+                    sourceFieldName + StringUtils.SPACE + 
sourceFieldType.getSqlType(),
+                    exceptDataField.asSQLString(),
+                    sinkDataField.asSQLString());
+        }
+        if (sourceFieldType instanceof DecimalType
+                && sinkDataType instanceof 
org.apache.paimon.types.DecimalType) {
+            DecimalType sourceDecimalType = (DecimalType) sourceFieldType;
+            org.apache.paimon.types.DecimalType sinkDecimalType =
+                    (org.apache.paimon.types.DecimalType) sinkDataType;
+            if (sinkDecimalType.getPrecision() < 
sourceDecimalType.getPrecision()
+                    || sinkDecimalType.getScale() < 
sourceDecimalType.getScale()) {
+                throw CommonError.writeRowErrorWithSchemaIncompatibleSchema(
+                        "Paimon",
+                        sourceFieldName + StringUtils.SPACE + 
sourceFieldType.getSqlType(),
+                        exceptDataField.asSQLString(),
+                        sinkDataField.asSQLString());
+            }
         }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java
 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java
index ebde744d03..8f7eea228f 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java
@@ -26,7 +26,10 @@ import 
org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.paimon.data.BinaryArray;
 import org.apache.paimon.data.BinaryArrayWriter;
 import org.apache.paimon.data.BinaryMap;
@@ -66,45 +69,54 @@ public class RowConverterTest {
 
     private SeaTunnelRowType seaTunnelRowType;
 
-    private TableSchema tableSchema;
-
-    public static final RowType DEFAULT_ROW_TYPE =
-            RowType.of(
-                    new DataType[] {
-                        DataTypes.TINYINT(),
-                        DataTypes.SMALLINT(),
-                        DataTypes.INT(),
-                        DataTypes.BIGINT(),
-                        DataTypes.FLOAT(),
-                        DataTypes.DOUBLE(),
-                        DataTypes.DECIMAL(10, 10),
-                        DataTypes.STRING(),
-                        DataTypes.BYTES(),
-                        DataTypes.BOOLEAN(),
-                        DataTypes.DATE(),
-                        DataTypes.TIMESTAMP(),
-                        DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()),
-                        DataTypes.ARRAY(DataTypes.STRING())
-                    },
-                    new String[] {
-                        "c_tinyint",
-                        "c_smallint",
-                        "c_int",
-                        "c_bigint",
-                        "c_float",
-                        "c_double",
-                        "c_decimal",
-                        "c_string",
-                        "c_bytes",
-                        "c_boolean",
-                        "c_date",
-                        "c_timestamp",
-                        "c_map",
-                        "c_array"
-                    });
-
     public static final List<String> KEY_NAME_LIST = 
Arrays.asList("c_tinyint");
 
+    public TableSchema getTableSchema(int decimalPrecision, int decimalScale) {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.TINYINT(),
+                            DataTypes.SMALLINT(),
+                            DataTypes.INT(),
+                            DataTypes.BIGINT(),
+                            DataTypes.FLOAT(),
+                            DataTypes.DOUBLE(),
+                            DataTypes.DECIMAL(decimalPrecision, decimalScale),
+                            DataTypes.STRING(),
+                            DataTypes.BYTES(),
+                            DataTypes.BOOLEAN(),
+                            DataTypes.DATE(),
+                            DataTypes.TIMESTAMP(),
+                            DataTypes.MAP(DataTypes.STRING(), 
DataTypes.STRING()),
+                            DataTypes.ARRAY(DataTypes.STRING())
+                        },
+                        new String[] {
+                            "c_tinyint",
+                            "c_smallint",
+                            "c_int",
+                            "c_bigint",
+                            "c_float",
+                            "c_double",
+                            "c_decimal",
+                            "c_string",
+                            "c_bytes",
+                            "c_boolean",
+                            "c_date",
+                            "c_timestamp",
+                            "c_map",
+                            "c_array"
+                        });
+
+        return new TableSchema(
+                0,
+                TableSchema.newFields(rowType),
+                rowType.getFieldCount(),
+                Collections.EMPTY_LIST,
+                KEY_NAME_LIST,
+                Collections.EMPTY_MAP,
+                "");
+    }
+
     @BeforeEach
     public void before() {
         seaTunnelRowType =
@@ -215,27 +227,33 @@ public class RowConverterTest {
         binaryRowWriter.writeArray(
                 13, binaryArray2, new 
InternalArraySerializer(DataTypes.STRING()));
         internalRow = binaryRow;
-
-        tableSchema =
-                new TableSchema(
-                        0,
-                        TableSchema.newFields(DEFAULT_ROW_TYPE),
-                        DEFAULT_ROW_TYPE.getFieldCount(),
-                        Collections.EMPTY_LIST,
-                        KEY_NAME_LIST,
-                        Collections.EMPTY_MAP,
-                        "");
     }
 
     @Test
     public void seaTunnelToPaimon() {
-        InternalRow convert = RowConverter.reconvert(seaTunnelRow, 
seaTunnelRowType, tableSchema);
-        Assertions.assertEquals(convert, internalRow);
+        SeaTunnelRuntimeException actualException =
+                Assertions.assertThrows(
+                        SeaTunnelRuntimeException.class,
+                        () ->
+                                RowConverter.reconvert(
+                                        seaTunnelRow, seaTunnelRowType, 
getTableSchema(10, 10)));
+        SeaTunnelRuntimeException exceptedException =
+                CommonError.writeRowErrorWithSchemaIncompatibleSchema(
+                        "Paimon",
+                        "c_decimal" + StringUtils.SPACE + "DECIMAL",
+                        "`c_decimal` DECIMAL(30, 8)",
+                        "`c_decimal` DECIMAL(10, 10)");
+        Assertions.assertEquals(exceptedException.getMessage(), 
actualException.getMessage());
+
+        InternalRow reconvert =
+                RowConverter.reconvert(seaTunnelRow, seaTunnelRowType, 
getTableSchema(30, 8));
+        Assertions.assertEquals(reconvert, internalRow);
     }
 
     @Test
     public void paimonToSeaTunnel() {
-        SeaTunnelRow convert = RowConverter.convert(internalRow, 
seaTunnelRowType, tableSchema);
+        SeaTunnelRow convert =
+                RowConverter.convert(internalRow, seaTunnelRowType, 
getTableSchema(10, 10));
         Assertions.assertEquals(convert, seaTunnelRow);
     }
 }

Reply via email to