This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new d5abf8f506 [BugFix][Connector-V2][Maxcompute]fix:Maxcompute sink can't map field(#7164) (#7168) d5abf8f506 is described below commit d5abf8f506c1a7674572077fa685da3241564fb6 Author: Zhihong Pan <49435072+panpan2...@users.noreply.github.com> AuthorDate: Tue Jul 16 22:14:29 2024 +0800 [BugFix][Connector-V2][Maxcompute]fix:Maxcompute sink can't map field(#7164) (#7168) --- .../seatunnel/maxcompute/sink/MaxcomputeSink.java | 2 +- .../seatunnel/maxcompute/sink/MaxcomputeWriter.java | 9 +++++++-- .../maxcompute/util/MaxcomputeTypeMapper.java | 19 ++++++++++++++----- .../seatunnel/maxcompute/BasicTypeToOdpsTypeTest.java | 3 ++- 4 files changed, 24 insertions(+), 9 deletions(-) diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java index c5acadb173..6abce7e417 100644 --- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java @@ -59,6 +59,6 @@ public class MaxcomputeSink extends AbstractSimpleSink<SeaTunnelRow, Void> { @Override public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context) { - return new MaxcomputeWriter(this.pluginConfig); + return new MaxcomputeWriter(this.pluginConfig, this.typeInfo); } } diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java index c6ee285a4b..51492ae591 100644 --- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java @@ -20,6 +20,7 @@ package org.apache.seatunnel.connectors.seatunnel.maxcompute.sink; import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; import org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException; @@ -46,9 +47,11 @@ public class MaxcomputeWriter extends AbstractSinkWriter<SeaTunnelRow, Void> { private final TableTunnel.UploadSession session; private final TableSchema tableSchema; private static final Long BLOCK_0 = 0L; + private SeaTunnelRowType rowType; - public MaxcomputeWriter(Config pluginConfig) { + public MaxcomputeWriter(Config pluginConfig, SeaTunnelRowType rowType) { try { + this.rowType = rowType; Table table = MaxcomputeUtil.getTable(pluginConfig); this.tableSchema = table.getSchema(); TableTunnel tunnel = MaxcomputeUtil.getTableTunnel(pluginConfig); @@ -76,7 +79,9 @@ public class MaxcomputeWriter extends AbstractSinkWriter<SeaTunnelRow, Void> { @Override public void write(SeaTunnelRow seaTunnelRow) throws IOException { - Record record = MaxcomputeTypeMapper.getMaxcomputeRowData(seaTunnelRow, this.tableSchema); + Record record = + MaxcomputeTypeMapper.getMaxcomputeRowData( + seaTunnelRow, this.tableSchema, this.rowType); recordWriter.write(record); } diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java index fccc056274..2a3eda909a 100644 --- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java @@ -67,14 +67,23 @@ public class MaxcomputeTypeMapper implements Serializable { return new SeaTunnelRow(fields.toArray()); } - public static Record getMaxcomputeRowData(SeaTunnelRow seaTunnelRow, TableSchema tableSchema) { + public static Record getMaxcomputeRowData( + SeaTunnelRow seaTunnelRow, TableSchema tableSchema, SeaTunnelRowType rowType) { ArrayRecord arrayRecord = new ArrayRecord(tableSchema); - List<Column> columns = tableSchema.getColumns(); for (int i = 0; i < seaTunnelRow.getFields().length; i++) { + String fieldName = rowType.getFieldName(i); + if (!tableSchema.containsColumn(fieldName)) { + throw new MaxcomputeConnectorException( + CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, + String.format( + "field not found in written table: %s,rowType: %s", + fieldName, seaTunnelRow.getField(i))); + } + Column column = tableSchema.getColumn(fieldName); + arrayRecord.set( - i, - resolveObject2Maxcompute( - seaTunnelRow.getField(i), columns.get(i).getTypeInfo())); + tableSchema.getColumnIndex(fieldName), + resolveObject2Maxcompute(seaTunnelRow.getField(i), column.getTypeInfo())); } return arrayRecord; } diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/BasicTypeToOdpsTypeTest.java b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/BasicTypeToOdpsTypeTest.java index 0eeff7c4d3..d4542af820 100644 --- a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/BasicTypeToOdpsTypeTest.java +++ b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/BasicTypeToOdpsTypeTest.java @@ -53,7 +53,8 @@ public class BasicTypeToOdpsTypeTest { } SeaTunnelRow seaTunnelRow = MaxcomputeTypeMapper.getSeaTunnelRowData(record, typeInfo); - Record tRecord = MaxcomputeTypeMapper.getMaxcomputeRowData(seaTunnelRow, tableSchema); + Record tRecord = + MaxcomputeTypeMapper.getMaxcomputeRowData(seaTunnelRow, tableSchema, typeInfo); for (int i = 0; i < tRecord.getColumns().length; i++) { Assertions.assertEquals(record.get(i), tRecord.get(i));