This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d56d64fc04 [Improve][Paimon] Add check for the base type between 
source and sink before write. (#6953)
d56d64fc04 is described below

commit d56d64fc0439a33076d80ed3f3586b80d5512dbc
Author: dailai <dai...@chinatelecom.cn>
AuthorDate: Tue Jun 11 21:03:11 2024 +0800

    [Improve][Paimon] Add check for the base type between source and sink 
before write. (#6953)
---
 .../seatunnel/common/exception/CommonError.java    | 26 +++++++++
 .../common/exception/CommonErrorCode.java          | 10 +++-
 .../seatunnel/paimon/utils/RowConverter.java       | 33 ++++++++++--
 .../e2e/connector/paimon/PaimonSinkCDCIT.java      | 14 +++++
 ...ke_cdc_sink_paimon_case1_with_error_schema.conf | 62 ++++++++++++++++++++++
 .../src/test/resources/schema-0.json               |  4 +-
 6 files changed, 143 insertions(+), 6 deletions(-)

diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
index f283963347..4aec9d2211 100644
--- 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
@@ -45,6 +45,8 @@ import static 
org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_
 import static 
org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_ROW_KIND;
 import static 
org.apache.seatunnel.common.exception.CommonErrorCode.VERSION_NOT_SUPPORTED;
 import static 
org.apache.seatunnel.common.exception.CommonErrorCode.WRITE_SEATUNNEL_ROW_ERROR;
+import static 
org.apache.seatunnel.common.exception.CommonErrorCode.WRITE_SEATUNNEL_ROW_ERROR_WITH_FILEDS_NOT_MATCH;
+import static 
org.apache.seatunnel.common.exception.CommonErrorCode.WRITE_SEATUNNEL_ROW_ERROR_WITH_SCHEMA_INCOMPATIBLE_SCHEMA;
 
 /**
  * The common error of SeaTunnel. This is an alternative to {@link 
CommonErrorCodeDeprecated} and is
@@ -245,4 +247,28 @@ public class CommonError {
         params.put("rowKind", rowKind);
         return new SeaTunnelRuntimeException(UNSUPPORTED_ROW_KIND, params);
     }
+
+    public static SeaTunnelRuntimeException 
writeRowErrorWithSchemaIncompatibleSchema(
+            String connector,
+            String sourceFieldSqlSchema,
+            String exceptFieldSqlSchema,
+            String sinkFieldSqlSchema) {
+        Map<String, String> params = new HashMap<>();
+        params.put("connector", connector);
+        params.put("sourceFieldSqlSchema", sourceFieldSqlSchema);
+        params.put("exceptFieldSqlSchema", exceptFieldSqlSchema);
+        params.put("sinkFieldSqlSchema", sinkFieldSqlSchema);
+        return new SeaTunnelRuntimeException(
+                WRITE_SEATUNNEL_ROW_ERROR_WITH_SCHEMA_INCOMPATIBLE_SCHEMA, 
params);
+    }
+
+    public static SeaTunnelRuntimeException 
writeRowErrorWithFiledsCountNotMatch(
+            String connector, int sourceFieldsNum, int sinkFieldsNum) {
+        Map<String, String> params = new HashMap<>();
+        params.put("connector", connector);
+        params.put("sourceFiledName", String.valueOf(sourceFieldsNum));
+        params.put("sourceFiledType", String.valueOf(sinkFieldsNum));
+        return new SeaTunnelRuntimeException(
+                WRITE_SEATUNNEL_ROW_ERROR_WITH_FILEDS_NOT_MATCH, params);
+    }
 }
diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
index 3cf69285cb..f51c983456 100644
--- 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
@@ -62,7 +62,15 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
             "COMMON-28",
             "'<identifier>' array type not support genericType '<genericType>' 
of '<fieldName>'"),
     UNSUPPORTED_ROW_KIND(
-            "COMMON-29", "'<identifier>' table '<tableId>' not support rowKind 
 '<rowKind>'");
+            "COMMON-29", "'<identifier>' table '<tableId>' not support rowKind 
 '<rowKind>'"),
+
+    WRITE_SEATUNNEL_ROW_ERROR_WITH_SCHEMA_INCOMPATIBLE_SCHEMA(
+            "COMMON-30",
+            "<connector>: The source filed with schema 
'<sourceFieldSqlSchema>', except filed schema of sink is 
'<exceptFieldSqlSchema>'; but the filed in sink table which actual schema is 
'<sinkFieldSqlSchema>'. Please check schema of sink table."),
+
+    WRITE_SEATUNNEL_ROW_ERROR_WITH_FILEDS_NOT_MATCH(
+            "COMMON-31",
+            "<connector>: The source has '<sourceFieldsNum>' fields, but the 
table of sink has '<sinkFieldsNum>' fields. Please check schema of sink 
table.");
 
     private final String code;
     private final String description;
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
index f92d175c2a..9c576018a3 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonError;
 import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.paimon.data.BinaryArray;
 import org.apache.paimon.data.BinaryArrayWriter;
 import org.apache.paimon.data.BinaryMap;
@@ -350,8 +351,14 @@ public class RowConverter {
      */
     public static InternalRow reconvert(
             SeaTunnelRow seaTunnelRow, SeaTunnelRowType seaTunnelRowType, 
TableSchema tableSchema) {
-        List<DataField> fields = tableSchema.fields();
-        BinaryRow binaryRow = new BinaryRow(seaTunnelRowType.getTotalFields());
+        List<DataField> sinkTotalFields = tableSchema.fields();
+        int sourceTotalFields = seaTunnelRowType.getTotalFields();
+        if (sourceTotalFields != sinkTotalFields.size()) {
+            throw new CommonError()
+                    .writeRowErrorWithFiledsCountNotMatch(
+                            "Paimon", sourceTotalFields, 
sinkTotalFields.size());
+        }
+        BinaryRow binaryRow = new BinaryRow(sourceTotalFields);
         BinaryWriter binaryWriter = new BinaryRowWriter(binaryRow);
         // Convert SeaTunnel RowKind to Paimon RowKind
         org.apache.paimon.types.RowKind rowKind =
@@ -370,6 +377,7 @@ public class RowConverter {
                 binaryWriter.setNullAt(i);
                 continue;
             }
+            checkCanWriteWithType(i, seaTunnelRowType, sinkTotalFields);
             String fieldName = seaTunnelRowType.getFieldName(i);
             switch (fieldTypes[i].getSqlType()) {
                 case TINYINT:
@@ -416,7 +424,7 @@ public class RowConverter {
                             .setValue(binaryWriter, i, 
DateTimeUtils.toInternal(date));
                     break;
                 case TIMESTAMP:
-                    DataField dataField = SchemaUtil.getDataField(fields, 
fieldName);
+                    DataField dataField = 
SchemaUtil.getDataField(sinkTotalFields, fieldName);
                     int precision = ((TimestampType) 
dataField.type()).getPrecision();
                     LocalDateTime datetime = (LocalDateTime) 
seaTunnelRow.getField(i);
                     binaryWriter.writeTimestamp(
@@ -470,4 +478,23 @@ public class RowConverter {
         }
         return binaryRow;
     }
+
+    private static void checkCanWriteWithType(
+            int i, SeaTunnelRowType seaTunnelRowType, List<DataField> fields) {
+        String sourceFieldName = seaTunnelRowType.getFieldName(i);
+        SeaTunnelDataType<?> sourceFieldType = 
seaTunnelRowType.getFieldType(i);
+        DataField sinkDataField = fields.get(i);
+        DataType exceptDataType =
+                RowTypeConverter.reconvert(sourceFieldName, 
seaTunnelRowType.getFieldType(i));
+        DataField exceptDataField = new DataField(i, sourceFieldName, 
exceptDataType);
+        DataType sinkDataType = sinkDataField.type();
+        if (!exceptDataType.getTypeRoot().equals(sinkDataType.getTypeRoot())) {
+            throw new CommonError()
+                    .writeRowErrorWithSchemaIncompatibleSchema(
+                            "Paimon",
+                            sourceFieldName + StringUtils.SPACE + 
sourceFieldType.getSqlType(),
+                            exceptDataField.asSQLString(),
+                            sinkDataField.asSQLString());
+        }
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
index 90b9a63cdd..c899dd0e8b 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
@@ -148,6 +148,20 @@ public class PaimonSinkCDCIT extends TestSuiteBase 
implements TestResource {
                         });
     }
 
+    @TestTemplate
+    public void testSinkWithIncompatibleSchema(TestContainer container) throws 
Exception {
+        Container.ExecResult execResult = 
container.executeJob("/fake_cdc_sink_paimon_case1.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        Container.ExecResult errResult =
+                
container.executeJob("/fake_cdc_sink_paimon_case1_with_error_schema.conf");
+        Assertions.assertEquals(1, errResult.getExitCode());
+        Assertions.assertTrue(
+                errResult
+                        .getStderr()
+                        .contains(
+                                "[Paimon: The source filed with schema 'name 
INT', except filed schema of sink is '`name` INT'; but the filed in sink table 
which actual schema is '`name` STRING'. Please check schema of sink table.]"));
+    }
+
     @TestTemplate
     public void testFakeMultipleTableSinkPaimon(TestContainer container) 
throws Exception {
         Container.ExecResult execResult = 
container.executeJob("/fake_cdc_sink_paimon_case2.conf");
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1_with_error_schema.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1_with_error_schema.conf
new file mode 100644
index 0000000000..70bcedff29
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1_with_error_schema.conf
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        pk_id = bigint
+        name = int
+        score = string
+      }
+      primaryKey {
+        name = "pk_id"
+        columnNames = [pk_id]
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, 100, "A"]
+      },
+      {
+        kind = INSERT
+        fields = [2, 100, "B"]
+      },
+      {
+        kind = INSERT
+        fields = [3, 100, "C"]
+      }
+    ]
+  }
+}
+
+sink {
+  Paimon {
+    warehouse = "file:///tmp/paimon"
+    database = "seatunnel_namespace1"
+    table = "st_test"
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/schema-0.json
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/schema-0.json
index c28d252b35..bc425e1057 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/schema-0.json
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/schema-0.json
@@ -58,7 +58,7 @@
   }, {
     "id" : 12,
     "name" : "c_date",
-    "type" : "TIMESTAMP(3)"
+    "type" : "DATE"
   }, {
     "id" : 13,
     "name" : "c_timestamp",
@@ -68,4 +68,4 @@
   "partitionKeys" : [ ],
   "primaryKeys" : [ ],
   "options" : { }
-}
\ No newline at end of file
+}

Reply via email to