This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new d56d64fc04 [Improve][Paimon] Add check for the base type between source and sink before write. (#6953) d56d64fc04 is described below commit d56d64fc0439a33076d80ed3f3586b80d5512dbc Author: dailai <dai...@chinatelecom.cn> AuthorDate: Tue Jun 11 21:03:11 2024 +0800 [Improve][Paimon] Add check for the base type between source and sink before write. (#6953) --- .../seatunnel/common/exception/CommonError.java | 26 +++++++++ .../common/exception/CommonErrorCode.java | 10 +++- .../seatunnel/paimon/utils/RowConverter.java | 33 ++++++++++-- .../e2e/connector/paimon/PaimonSinkCDCIT.java | 14 +++++ ...ke_cdc_sink_paimon_case1_with_error_schema.conf | 62 ++++++++++++++++++++++ .../src/test/resources/schema-0.json | 4 +- 6 files changed, 143 insertions(+), 6 deletions(-) diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java index f283963347..4aec9d2211 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java @@ -45,6 +45,8 @@ import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_ import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_ROW_KIND; import static org.apache.seatunnel.common.exception.CommonErrorCode.VERSION_NOT_SUPPORTED; import static org.apache.seatunnel.common.exception.CommonErrorCode.WRITE_SEATUNNEL_ROW_ERROR; +import static org.apache.seatunnel.common.exception.CommonErrorCode.WRITE_SEATUNNEL_ROW_ERROR_WITH_FILEDS_NOT_MATCH; +import static org.apache.seatunnel.common.exception.CommonErrorCode.WRITE_SEATUNNEL_ROW_ERROR_WITH_SCHEMA_INCOMPATIBLE_SCHEMA; /** * The common error of SeaTunnel. This is an alternative to {@link CommonErrorCodeDeprecated} and is @@ -245,4 +247,28 @@ public class CommonError { params.put("rowKind", rowKind); return new SeaTunnelRuntimeException(UNSUPPORTED_ROW_KIND, params); } + + public static SeaTunnelRuntimeException writeRowErrorWithSchemaIncompatibleSchema( + String connector, + String sourceFieldSqlSchema, + String exceptFieldSqlSchema, + String sinkFieldSqlSchema) { + Map<String, String> params = new HashMap<>(); + params.put("connector", connector); + params.put("sourceFieldSqlSchema", sourceFieldSqlSchema); + params.put("exceptFieldSqlSchema", exceptFieldSqlSchema); + params.put("sinkFieldSqlSchema", sinkFieldSqlSchema); + return new SeaTunnelRuntimeException( + WRITE_SEATUNNEL_ROW_ERROR_WITH_SCHEMA_INCOMPATIBLE_SCHEMA, params); + } + + public static SeaTunnelRuntimeException writeRowErrorWithFiledsCountNotMatch( + String connector, int sourceFieldsNum, int sinkFieldsNum) { + Map<String, String> params = new HashMap<>(); + params.put("connector", connector); + params.put("sourceFiledName", String.valueOf(sourceFieldsNum)); + params.put("sourceFiledType", String.valueOf(sinkFieldsNum)); + return new SeaTunnelRuntimeException( + WRITE_SEATUNNEL_ROW_ERROR_WITH_FILEDS_NOT_MATCH, params); + } } diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java index 3cf69285cb..f51c983456 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java @@ -62,7 +62,15 @@ public enum CommonErrorCode implements SeaTunnelErrorCode { "COMMON-28", "'<identifier>' array type not support genericType '<genericType>' of '<fieldName>'"), UNSUPPORTED_ROW_KIND( - "COMMON-29", "'<identifier>' table '<tableId>' not support rowKind '<rowKind>'"); + "COMMON-29", "'<identifier>' table '<tableId>' not support rowKind '<rowKind>'"), + + WRITE_SEATUNNEL_ROW_ERROR_WITH_SCHEMA_INCOMPATIBLE_SCHEMA( + "COMMON-30", + "<connector>: The source filed with schema '<sourceFieldSqlSchema>', except filed schema of sink is '<exceptFieldSqlSchema>'; but the filed in sink table which actual schema is '<sinkFieldSqlSchema>'. Please check schema of sink table."), + + WRITE_SEATUNNEL_ROW_ERROR_WITH_FILEDS_NOT_MATCH( + "COMMON-31", + "<connector>: The source has '<sourceFieldsNum>' fields, but the table of sink has '<sinkFieldsNum>' fields. Please check schema of sink table."); private final String code; private final String description; diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java index f92d175c2a..9c576018a3 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java @@ -26,6 +26,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.exception.CommonError; import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig; +import org.apache.commons.lang3.StringUtils; import org.apache.paimon.data.BinaryArray; import org.apache.paimon.data.BinaryArrayWriter; import org.apache.paimon.data.BinaryMap; @@ -350,8 +351,14 @@ public class RowConverter { */ public static InternalRow reconvert( SeaTunnelRow seaTunnelRow, SeaTunnelRowType seaTunnelRowType, TableSchema tableSchema) { - List<DataField> fields = tableSchema.fields(); - BinaryRow binaryRow = new BinaryRow(seaTunnelRowType.getTotalFields()); + List<DataField> sinkTotalFields = tableSchema.fields(); + int sourceTotalFields = seaTunnelRowType.getTotalFields(); + if (sourceTotalFields != sinkTotalFields.size()) { + throw new CommonError() + .writeRowErrorWithFiledsCountNotMatch( + "Paimon", sourceTotalFields, sinkTotalFields.size()); + } + BinaryRow binaryRow = new BinaryRow(sourceTotalFields); BinaryWriter binaryWriter = new BinaryRowWriter(binaryRow); // Convert SeaTunnel RowKind to Paimon RowKind org.apache.paimon.types.RowKind rowKind = @@ -370,6 +377,7 @@ public class RowConverter { binaryWriter.setNullAt(i); continue; } + checkCanWriteWithType(i, seaTunnelRowType, sinkTotalFields); String fieldName = seaTunnelRowType.getFieldName(i); switch (fieldTypes[i].getSqlType()) { case TINYINT: @@ -416,7 +424,7 @@ public class RowConverter { .setValue(binaryWriter, i, DateTimeUtils.toInternal(date)); break; case TIMESTAMP: - DataField dataField = SchemaUtil.getDataField(fields, fieldName); + DataField dataField = SchemaUtil.getDataField(sinkTotalFields, fieldName); int precision = ((TimestampType) dataField.type()).getPrecision(); LocalDateTime datetime = (LocalDateTime) seaTunnelRow.getField(i); binaryWriter.writeTimestamp( @@ -470,4 +478,23 @@ public class RowConverter { } return binaryRow; } + + private static void checkCanWriteWithType( + int i, SeaTunnelRowType seaTunnelRowType, List<DataField> fields) { + String sourceFieldName = seaTunnelRowType.getFieldName(i); + SeaTunnelDataType<?> sourceFieldType = seaTunnelRowType.getFieldType(i); + DataField sinkDataField = fields.get(i); + DataType exceptDataType = + RowTypeConverter.reconvert(sourceFieldName, seaTunnelRowType.getFieldType(i)); + DataField exceptDataField = new DataField(i, sourceFieldName, exceptDataType); + DataType sinkDataType = sinkDataField.type(); + if (!exceptDataType.getTypeRoot().equals(sinkDataType.getTypeRoot())) { + throw new CommonError() + .writeRowErrorWithSchemaIncompatibleSchema( + "Paimon", + sourceFieldName + StringUtils.SPACE + sourceFieldType.getSqlType(), + exceptDataField.asSQLString(), + sinkDataField.asSQLString()); + } + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java index 90b9a63cdd..c899dd0e8b 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java @@ -148,6 +148,20 @@ public class PaimonSinkCDCIT extends TestSuiteBase implements TestResource { }); } + @TestTemplate + public void testSinkWithIncompatibleSchema(TestContainer container) throws Exception { + Container.ExecResult execResult = container.executeJob("/fake_cdc_sink_paimon_case1.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + Container.ExecResult errResult = + container.executeJob("/fake_cdc_sink_paimon_case1_with_error_schema.conf"); + Assertions.assertEquals(1, errResult.getExitCode()); + Assertions.assertTrue( + errResult + .getStderr() + .contains( + "[Paimon: The source filed with schema 'name INT', except filed schema of sink is '`name` INT'; but the filed in sink table which actual schema is '`name` STRING'. Please check schema of sink table.]")); + } + @TestTemplate public void testFakeMultipleTableSinkPaimon(TestContainer container) throws Exception { Container.ExecResult execResult = container.executeJob("/fake_cdc_sink_paimon_case2.conf"); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1_with_error_schema.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1_with_error_schema.conf new file mode 100644 index 0000000000..70bcedff29 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1_with_error_schema.conf @@ -0,0 +1,62 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + schema = { + fields { + pk_id = bigint + name = int + score = string + } + primaryKey { + name = "pk_id" + columnNames = [pk_id] + } + } + rows = [ + { + kind = INSERT + fields = [1, 100, "A"] + }, + { + kind = INSERT + fields = [2, 100, "B"] + }, + { + kind = INSERT + fields = [3, 100, "C"] + } + ] + } +} + +sink { + Paimon { + warehouse = "file:///tmp/paimon" + database = "seatunnel_namespace1" + table = "st_test" + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/schema-0.json b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/schema-0.json index c28d252b35..bc425e1057 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/schema-0.json +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/schema-0.json @@ -58,7 +58,7 @@ }, { "id" : 12, "name" : "c_date", - "type" : "TIMESTAMP(3)" + "type" : "DATE" }, { "id" : 13, "name" : "c_timestamp", @@ -68,4 +68,4 @@ "partitionKeys" : [ ], "primaryKeys" : [ ], "options" : { } -} \ No newline at end of file +}