This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d5abf8f506 [BugFix][Connector-V2][Maxcompute]fix:Maxcompute sink can't 
map field(#7164) (#7168)
d5abf8f506 is described below

commit d5abf8f506c1a7674572077fa685da3241564fb6
Author: Zhihong Pan <49435072+panpan2...@users.noreply.github.com>
AuthorDate: Tue Jul 16 22:14:29 2024 +0800

    [BugFix][Connector-V2][Maxcompute]fix:Maxcompute sink can't map 
field(#7164) (#7168)
---
 .../seatunnel/maxcompute/sink/MaxcomputeSink.java     |  2 +-
 .../seatunnel/maxcompute/sink/MaxcomputeWriter.java   |  9 +++++++--
 .../maxcompute/util/MaxcomputeTypeMapper.java         | 19 ++++++++++++++-----
 .../seatunnel/maxcompute/BasicTypeToOdpsTypeTest.java |  3 ++-
 4 files changed, 24 insertions(+), 9 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
index c5acadb173..6abce7e417 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
@@ -59,6 +59,6 @@ public class MaxcomputeSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
 
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context) {
-        return new MaxcomputeWriter(this.pluginConfig);
+        return new MaxcomputeWriter(this.pluginConfig, this.typeInfo);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
index c6ee285a4b..51492ae591 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
@@ -20,6 +20,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.maxcompute.sink;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;
@@ -46,9 +47,11 @@ public class MaxcomputeWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void> {
     private final TableTunnel.UploadSession session;
     private final TableSchema tableSchema;
     private static final Long BLOCK_0 = 0L;
+    private SeaTunnelRowType rowType;
 
-    public MaxcomputeWriter(Config pluginConfig) {
+    public MaxcomputeWriter(Config pluginConfig, SeaTunnelRowType rowType) {
         try {
+            this.rowType = rowType;
             Table table = MaxcomputeUtil.getTable(pluginConfig);
             this.tableSchema = table.getSchema();
             TableTunnel tunnel = MaxcomputeUtil.getTableTunnel(pluginConfig);
@@ -76,7 +79,9 @@ public class MaxcomputeWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void> {
 
     @Override
     public void write(SeaTunnelRow seaTunnelRow) throws IOException {
-        Record record = 
MaxcomputeTypeMapper.getMaxcomputeRowData(seaTunnelRow, this.tableSchema);
+        Record record =
+                MaxcomputeTypeMapper.getMaxcomputeRowData(
+                        seaTunnelRow, this.tableSchema, this.rowType);
         recordWriter.write(record);
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
index fccc056274..2a3eda909a 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
@@ -67,14 +67,23 @@ public class MaxcomputeTypeMapper implements Serializable {
         return new SeaTunnelRow(fields.toArray());
     }
 
-    public static Record getMaxcomputeRowData(SeaTunnelRow seaTunnelRow, 
TableSchema tableSchema) {
+    public static Record getMaxcomputeRowData(
+            SeaTunnelRow seaTunnelRow, TableSchema tableSchema, 
SeaTunnelRowType rowType) {
         ArrayRecord arrayRecord = new ArrayRecord(tableSchema);
-        List<Column> columns = tableSchema.getColumns();
         for (int i = 0; i < seaTunnelRow.getFields().length; i++) {
+            String fieldName = rowType.getFieldName(i);
+            if (!tableSchema.containsColumn(fieldName)) {
+                throw new MaxcomputeConnectorException(
+                        CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+                        String.format(
+                                "field not found in written table: %s,rowType: 
%s",
+                                fieldName, seaTunnelRow.getField(i)));
+            }
+            Column column = tableSchema.getColumn(fieldName);
+
             arrayRecord.set(
-                    i,
-                    resolveObject2Maxcompute(
-                            seaTunnelRow.getField(i), 
columns.get(i).getTypeInfo()));
+                    tableSchema.getColumnIndex(fieldName),
+                    resolveObject2Maxcompute(seaTunnelRow.getField(i), 
column.getTypeInfo()));
         }
         return arrayRecord;
     }
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/BasicTypeToOdpsTypeTest.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/BasicTypeToOdpsTypeTest.java
index 0eeff7c4d3..d4542af820 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/BasicTypeToOdpsTypeTest.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/BasicTypeToOdpsTypeTest.java
@@ -53,7 +53,8 @@ public class BasicTypeToOdpsTypeTest {
         }
 
         SeaTunnelRow seaTunnelRow = 
MaxcomputeTypeMapper.getSeaTunnelRowData(record, typeInfo);
-        Record tRecord = 
MaxcomputeTypeMapper.getMaxcomputeRowData(seaTunnelRow, tableSchema);
+        Record tRecord =
+                MaxcomputeTypeMapper.getMaxcomputeRowData(seaTunnelRow, 
tableSchema, typeInfo);
 
         for (int i = 0; i < tRecord.getColumns().length; i++) {
             Assertions.assertEquals(record.get(i), tRecord.get(i));

Reply via email to