This is an automated email from the ASF dual-hosted git repository. wuchunfu pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new df210ea73d [Hotfix][Connector-V2] Fixed lost data precision for decimal data types (#7527) df210ea73d is described below commit df210ea73d5b14b36b9637fc4d12bee09fa8decd Author: dailai <dai...@chinatelecom.cn> AuthorDate: Fri Aug 30 12:44:00 2024 +0800 [Hotfix][Connector-V2] Fixed lost data precision for decimal data types (#7527) --- .../seatunnel/paimon/utils/RowConverter.java | 56 ++++++---- .../seatunnel/paimon/utils/RowConverterTest.java | 118 ++++++++++++--------- 2 files changed, 106 insertions(+), 68 deletions(-) diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java index 9c576018a3..45c2c492c1 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java @@ -346,17 +346,18 @@ public class RowConverter { * * @param seaTunnelRow SeaTunnel row object * @param seaTunnelRowType SeaTunnel row type - * @param tableSchema Paimon table schema + * @param sinkTableSchema Paimon table schema * @return Paimon row object */ public static InternalRow reconvert( - SeaTunnelRow seaTunnelRow, SeaTunnelRowType seaTunnelRowType, TableSchema tableSchema) { - List<DataField> sinkTotalFields = tableSchema.fields(); + SeaTunnelRow seaTunnelRow, + SeaTunnelRowType seaTunnelRowType, + TableSchema sinkTableSchema) { + List<DataField> sinkTotalFields = sinkTableSchema.fields(); int sourceTotalFields = seaTunnelRowType.getTotalFields(); if (sourceTotalFields != sinkTotalFields.size()) { - throw new CommonError() - .writeRowErrorWithFiledsCountNotMatch( - "Paimon", sourceTotalFields, sinkTotalFields.size()); + throw CommonError.writeRowErrorWithFiledsCountNotMatch( + "Paimon", sourceTotalFields, sinkTotalFields.size()); } BinaryRow binaryRow = new BinaryRow(sourceTotalFields); BinaryWriter binaryWriter = new BinaryRowWriter(binaryRow); @@ -399,14 +400,17 @@ public class RowConverter { binaryWriter.writeDouble(i, (Double) seaTunnelRow.getField(i)); break; case DECIMAL: - DecimalType fieldType = (DecimalType) seaTunnelRowType.getFieldType(i); + DataField decimalDataField = + SchemaUtil.getDataField(sinkTotalFields, fieldName); + org.apache.paimon.types.DecimalType decimalType = + (org.apache.paimon.types.DecimalType) decimalDataField.type(); binaryWriter.writeDecimal( i, Decimal.fromBigDecimal( (BigDecimal) seaTunnelRow.getField(i), - fieldType.getPrecision(), - fieldType.getScale()), - fieldType.getPrecision()); + decimalType.getPrecision(), + decimalType.getScale()), + decimalType.getPrecision()); break; case STRING: binaryWriter.writeString( @@ -464,9 +468,12 @@ public class RowConverter { SeaTunnelDataType<?> rowType = seaTunnelRowType.getFieldType(i); Object row = seaTunnelRow.getField(i); InternalRow paimonRow = - reconvert((SeaTunnelRow) row, (SeaTunnelRowType) rowType, tableSchema); + reconvert( + (SeaTunnelRow) row, + (SeaTunnelRowType) rowType, + sinkTableSchema); RowType paimonRowType = - RowTypeConverter.reconvert((SeaTunnelRowType) rowType, tableSchema); + RowTypeConverter.reconvert((SeaTunnelRowType) rowType, sinkTableSchema); binaryWriter.writeRow(i, paimonRow, new InternalRowSerializer(paimonRowType)); break; default: @@ -489,12 +496,25 @@ public class RowConverter { DataField exceptDataField = new DataField(i, sourceFieldName, exceptDataType); DataType sinkDataType = sinkDataField.type(); if (!exceptDataType.getTypeRoot().equals(sinkDataType.getTypeRoot())) { - throw new CommonError() - .writeRowErrorWithSchemaIncompatibleSchema( - "Paimon", - sourceFieldName + StringUtils.SPACE + sourceFieldType.getSqlType(), - exceptDataField.asSQLString(), - sinkDataField.asSQLString()); + throw CommonError.writeRowErrorWithSchemaIncompatibleSchema( + "Paimon", + sourceFieldName + StringUtils.SPACE + sourceFieldType.getSqlType(), + exceptDataField.asSQLString(), + sinkDataField.asSQLString()); + } + if (sourceFieldType instanceof DecimalType + && sinkDataType instanceof org.apache.paimon.types.DecimalType) { + DecimalType sourceDecimalType = (DecimalType) sourceFieldType; + org.apache.paimon.types.DecimalType sinkDecimalType = + (org.apache.paimon.types.DecimalType) sinkDataType; + if (sinkDecimalType.getPrecision() < sourceDecimalType.getPrecision() + || sinkDecimalType.getScale() < sourceDecimalType.getScale()) { + throw CommonError.writeRowErrorWithSchemaIncompatibleSchema( + "Paimon", + sourceFieldName + StringUtils.SPACE + sourceFieldType.getSqlType(), + exceptDataField.asSQLString(), + sinkDataField.asSQLString()); + } } } } diff --git a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java index ebde744d03..8f7eea228f 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java +++ b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java @@ -26,7 +26,10 @@ import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonError; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; +import org.apache.commons.lang3.StringUtils; import org.apache.paimon.data.BinaryArray; import org.apache.paimon.data.BinaryArrayWriter; import org.apache.paimon.data.BinaryMap; @@ -66,45 +69,54 @@ public class RowConverterTest { private SeaTunnelRowType seaTunnelRowType; - private TableSchema tableSchema; - - public static final RowType DEFAULT_ROW_TYPE = - RowType.of( - new DataType[] { - DataTypes.TINYINT(), - DataTypes.SMALLINT(), - DataTypes.INT(), - DataTypes.BIGINT(), - DataTypes.FLOAT(), - DataTypes.DOUBLE(), - DataTypes.DECIMAL(10, 10), - DataTypes.STRING(), - DataTypes.BYTES(), - DataTypes.BOOLEAN(), - DataTypes.DATE(), - DataTypes.TIMESTAMP(), - DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()), - DataTypes.ARRAY(DataTypes.STRING()) - }, - new String[] { - "c_tinyint", - "c_smallint", - "c_int", - "c_bigint", - "c_float", - "c_double", - "c_decimal", - "c_string", - "c_bytes", - "c_boolean", - "c_date", - "c_timestamp", - "c_map", - "c_array" - }); - public static final List<String> KEY_NAME_LIST = Arrays.asList("c_tinyint"); + public TableSchema getTableSchema(int decimalPrecision, int decimalScale) { + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.TINYINT(), + DataTypes.SMALLINT(), + DataTypes.INT(), + DataTypes.BIGINT(), + DataTypes.FLOAT(), + DataTypes.DOUBLE(), + DataTypes.DECIMAL(decimalPrecision, decimalScale), + DataTypes.STRING(), + DataTypes.BYTES(), + DataTypes.BOOLEAN(), + DataTypes.DATE(), + DataTypes.TIMESTAMP(), + DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()), + DataTypes.ARRAY(DataTypes.STRING()) + }, + new String[] { + "c_tinyint", + "c_smallint", + "c_int", + "c_bigint", + "c_float", + "c_double", + "c_decimal", + "c_string", + "c_bytes", + "c_boolean", + "c_date", + "c_timestamp", + "c_map", + "c_array" + }); + + return new TableSchema( + 0, + TableSchema.newFields(rowType), + rowType.getFieldCount(), + Collections.EMPTY_LIST, + KEY_NAME_LIST, + Collections.EMPTY_MAP, + ""); + } + @BeforeEach public void before() { seaTunnelRowType = @@ -215,27 +227,33 @@ public class RowConverterTest { binaryRowWriter.writeArray( 13, binaryArray2, new InternalArraySerializer(DataTypes.STRING())); internalRow = binaryRow; - - tableSchema = - new TableSchema( - 0, - TableSchema.newFields(DEFAULT_ROW_TYPE), - DEFAULT_ROW_TYPE.getFieldCount(), - Collections.EMPTY_LIST, - KEY_NAME_LIST, - Collections.EMPTY_MAP, - ""); } @Test public void seaTunnelToPaimon() { - InternalRow convert = RowConverter.reconvert(seaTunnelRow, seaTunnelRowType, tableSchema); - Assertions.assertEquals(convert, internalRow); + SeaTunnelRuntimeException actualException = + Assertions.assertThrows( + SeaTunnelRuntimeException.class, + () -> + RowConverter.reconvert( + seaTunnelRow, seaTunnelRowType, getTableSchema(10, 10))); + SeaTunnelRuntimeException exceptedException = + CommonError.writeRowErrorWithSchemaIncompatibleSchema( + "Paimon", + "c_decimal" + StringUtils.SPACE + "DECIMAL", + "`c_decimal` DECIMAL(30, 8)", + "`c_decimal` DECIMAL(10, 10)"); + Assertions.assertEquals(exceptedException.getMessage(), actualException.getMessage()); + + InternalRow reconvert = + RowConverter.reconvert(seaTunnelRow, seaTunnelRowType, getTableSchema(30, 8)); + Assertions.assertEquals(reconvert, internalRow); } @Test public void paimonToSeaTunnel() { - SeaTunnelRow convert = RowConverter.convert(internalRow, seaTunnelRowType, tableSchema); + SeaTunnelRow convert = + RowConverter.convert(internalRow, seaTunnelRowType, getTableSchema(10, 10)); Assertions.assertEquals(convert, seaTunnelRow); } }