This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new eb9c8704b9 [Improve][Connector-V2] Support maxcompute sink writer 
upsert/delete action with upsert session mode (#9462)
eb9c8704b9 is described below

commit eb9c8704b9a349e0163d49e40357cce79851d86e
Author: dy102 <[email protected]>
AuthorDate: Thu Jun 26 22:38:53 2025 +0900

    [Improve][Connector-V2] Support maxcompute sink writer upsert/delete action 
with upsert session mode (#9462)
---
 docs/en/connector-v2/sink/Maxcompute.md            |  18 +++
 .../maxcompute/config/MaxcomputeSinkOptions.java   |   7 +
 .../maxcompute/sink/MaxcomputeSinkFactory.java     |   1 +
 .../maxcompute/sink/MaxcomputeWriter.java          |  72 +++--------
 .../maxcompute/util/MaxcomputeOutputFormat.java    | 141 +++++++++++++++++++++
 .../maxcompute/util/MaxcomputeTypeMapper.java      |   7 +-
 .../maxcompute/BasicTypeToOdpsTypeTest.java        |  12 +-
 .../e2e/connector/maxcompute/MaxComputeIT.java     | 120 +++++++++++++++---
 ...maxcompute.conf => fake_maxcompute_delete.conf} |  31 ++++-
 ...maxcompute.conf => fake_maxcompute_upsert.conf} |  31 ++++-
 .../test/resources/maxcompute_to_maxcompute.conf   |  39 +++++-
 .../maxcompute_to_maxcompute_multi_table.conf      |  62 +++++++--
 12 files changed, 433 insertions(+), 108 deletions(-)

diff --git a/docs/en/connector-v2/sink/Maxcompute.md 
b/docs/en/connector-v2/sink/Maxcompute.md
index ed268102f4..559b30f3e5 100644
--- a/docs/en/connector-v2/sink/Maxcompute.md
+++ b/docs/en/connector-v2/sink/Maxcompute.md
@@ -132,6 +132,24 @@ Example values:
 
 Default: `yyyy-MM-dd HH:mm:ss`
 
+### tunnel_endpoint[String]
+Specifies the custom endpoint URL for the MaxCompute Tunnel service.
+
+By default, the endpoint is automatically inferred from the configured region.
+
+This option allows you to override the default behavior and use a custom 
Tunnel endpoint.
+If not specified, the connector will use the region-based default Tunnel 
endpoint.
+
+In general, you do **not** need  to set tunnel_endpoint. It is only needed for 
custom networking, debugging, or local development.
+
+Example values:
+
+- `https://dt.cn-hangzhou.maxcompute.aliyun.com`
+- `https://dt.ap-southeast-1.maxcompute.aliyun.com`
+- `http://maxcompute:8080`
+
+Default: Not set (auto-inferred from region)
+
 ### common options
 
 Sink plugin common parameters, please refer to [Sink Common 
Options](../sink-common-options.md) for details.
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeSinkOptions.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeSinkOptions.java
index 8a6da00f00..85b8d541e4 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeSinkOptions.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeSinkOptions.java
@@ -61,4 +61,11 @@ public class MaxcomputeSinkOptions extends 
MaxcomputeBaseOptions {
                                     + "' ;")
                     .withDescription(
                             "Create table statement template, used to create 
MaxCompute table");
+
+    public static final Option<String> TUNNEL_ENDPOINT =
+            Options.key("tunnel_endpoint")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Tunnel endpoint, e.g. 
https://dt.cn-hangzhou.maxcompute.aliyun.com";);
 }
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java
index 1f1e7c0f1d..d58f9a4eba 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java
@@ -54,6 +54,7 @@ public class MaxcomputeSinkFactory implements 
TableSinkFactory {
                         MaxcomputeSinkOptions.SAVE_MODE_CREATE_TEMPLATE,
                         MaxcomputeSinkOptions.CUSTOM_SQL,
                         FormatOptions.DATETIME_FORMAT,
+                        MaxcomputeSinkOptions.TUNNEL_ENDPOINT,
                         SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA)
                 .build();
     }
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
index e800a504c8..dfc73d8f0d 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
@@ -18,24 +18,16 @@
 package org.apache.seatunnel.connectors.seatunnel.maxcompute.sink;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.options.table.FormatOptions;
 import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonError;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
-import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeSinkOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeBaseOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;
-import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.FormatterContext;
-import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeTypeMapper;
-import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeUtil;
+import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeOutputFormat;
 
-import com.aliyun.odps.PartitionSpec;
-import com.aliyun.odps.Table;
-import com.aliyun.odps.TableSchema;
-import com.aliyun.odps.data.Record;
-import com.aliyun.odps.data.RecordWriter;
-import com.aliyun.odps.tunnel.TableTunnel;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
@@ -43,38 +35,11 @@ import java.io.IOException;
 @Slf4j
 public class MaxcomputeWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
         implements SupportMultiTableSinkWriter<Void> {
-    private RecordWriter recordWriter;
-    private final TableTunnel.UploadSession session;
-    private final TableSchema tableSchema;
-    private final SeaTunnelRowType rowType;
-    private final FormatterContext formatterContext;
+    private MaxcomputeOutputFormat writer;
 
     public MaxcomputeWriter(ReadonlyConfig readonlyConfig, SeaTunnelRowType 
rowType) {
         try {
-            this.rowType = rowType;
-            Table table = MaxcomputeUtil.getTable(readonlyConfig);
-            this.tableSchema = table.getSchema();
-            TableTunnel tunnel = MaxcomputeUtil.getTableTunnel(readonlyConfig);
-            if 
(readonlyConfig.getOptional(MaxcomputeSinkOptions.PARTITION_SPEC).isPresent()) {
-                PartitionSpec partitionSpec =
-                        new 
PartitionSpec(readonlyConfig.get(MaxcomputeSinkOptions.PARTITION_SPEC));
-                session =
-                        tunnel.createUploadSession(
-                                
readonlyConfig.get(MaxcomputeSinkOptions.PROJECT),
-                                
readonlyConfig.get(MaxcomputeSinkOptions.TABLE_NAME),
-                                partitionSpec);
-            } else {
-                session =
-                        tunnel.createUploadSession(
-                                
readonlyConfig.get(MaxcomputeSinkOptions.PROJECT),
-                                
readonlyConfig.get(MaxcomputeSinkOptions.TABLE_NAME));
-            }
-
-            this.formatterContext =
-                    new 
FormatterContext(readonlyConfig.get(FormatOptions.DATETIME_FORMAT));
-
-            this.recordWriter = session.openBufferedWriter();
-            log.info("open record writer success");
+            writer = new MaxcomputeOutputFormat(rowType, readonlyConfig);
         } catch (Exception e) {
             throw new MaxcomputeConnectorException(
                     CommonErrorCodeDeprecated.WRITER_OPERATION_FAILED, e);
@@ -83,23 +48,24 @@ public class MaxcomputeWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
 
     @Override
     public void write(SeaTunnelRow seaTunnelRow) throws IOException {
-        Record record =
-                MaxcomputeTypeMapper.getMaxcomputeRowData(
-                        seaTunnelRow, this.tableSchema, this.rowType, 
formatterContext);
-        recordWriter.write(record);
+        try {
+            writer.write(seaTunnelRow);
+        } catch (IOException e1) {
+            throw e1;
+        } catch (Exception e2) {
+            throw CommonError.writeSeaTunnelRowFailed(
+                    MaxcomputeBaseOptions.PLUGIN_NAME, 
seaTunnelRow.toString(), e2);
+        }
     }
 
     @Override
     public void close() throws IOException {
-        if (recordWriter != null) {
-            recordWriter.close();
-            try {
-                session.commit();
-            } catch (Exception e) {
-                throw new MaxcomputeConnectorException(
-                        CommonErrorCodeDeprecated.WRITER_OPERATION_FAILED, e);
-            }
-            recordWriter = null;
+        try {
+            writer.close();
+        } catch (IOException e1) {
+            throw e1;
+        } catch (Exception e2) {
+            throw CommonError.closeFailed(MaxcomputeBaseOptions.PLUGIN_NAME, 
e2);
         }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeOutputFormat.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeOutputFormat.java
new file mode 100644
index 0000000000..0cc37b0ad0
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeOutputFormat.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.util;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.options.table.FormatOptions;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonError;
+import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeBaseOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeSinkOptions;
+
+import com.aliyun.odps.PartitionSpec;
+import com.aliyun.odps.TableSchema;
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.tunnel.TableTunnel;
+import com.aliyun.odps.tunnel.TunnelException;
+import com.aliyun.odps.tunnel.streams.UpsertStream;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+
+@Slf4j
+public class MaxcomputeOutputFormat {
+    private final ReadonlyConfig readonlyConfig;
+
+    private final TableSchema tableSchema;
+    private final SeaTunnelRowType rowType;
+    private final FormatterContext formatterContext;
+
+    private UpsertStream upsertStream;
+    private TableTunnel.UpsertSession upsertSession;
+
+    public MaxcomputeOutputFormat(SeaTunnelRowType rowType, ReadonlyConfig 
readonlyConfig) {
+        this.rowType = rowType;
+        this.readonlyConfig = readonlyConfig;
+        this.tableSchema = MaxcomputeUtil.getTable(readonlyConfig).getSchema();
+        this.formatterContext =
+                new 
FormatterContext(readonlyConfig.get(FormatOptions.DATETIME_FORMAT));
+    }
+
+    public void write(SeaTunnelRow seaTunnelRow) throws IOException, 
TunnelException {
+        ensureUpsertSessionAndWriter();
+        Record newRecord =
+                MaxcomputeTypeMapper.getMaxcomputeRowData(
+                        upsertSession.newRecord(),
+                        seaTunnelRow,
+                        this.tableSchema,
+                        this.rowType,
+                        formatterContext);
+
+        switch (seaTunnelRow.getRowKind()) {
+            case INSERT:
+            case UPDATE_AFTER:
+                upsertStream.upsert(newRecord);
+                break;
+            case DELETE:
+                upsertStream.delete(newRecord);
+                break;
+            default:
+                throw CommonError.unsupportedDataType(
+                        MaxcomputeBaseOptions.PLUGIN_NAME,
+                        seaTunnelRow.getRowKind().toString(),
+                        seaTunnelRow.toString());
+        }
+    }
+
+    public void close() throws IOException, TunnelException {
+        if (upsertStream != null) {
+            try {
+                upsertStream.close();
+            } finally {
+                upsertStream = null;
+            }
+        }
+
+        if (upsertSession != null) {
+            try {
+                upsertSession.commit(true);
+            } finally {
+                upsertSession.close();
+                upsertSession = null;
+            }
+        }
+    }
+
+    private void ensureUpsertSessionAndWriter() throws TunnelException, 
IOException {
+        if (upsertSession == null) {
+            initializeUpsertSession();
+        }
+        if (upsertStream == null) {
+            this.upsertStream = upsertSession.buildUpsertStream().build();
+            log.info("build upsert stream success");
+        }
+    }
+
+    private TableTunnel getTableTunnel(String tunnelEndpoint) {
+        TableTunnel tableTunnel = 
MaxcomputeUtil.getTableTunnel(readonlyConfig);
+        if (tunnelEndpoint != null && !tunnelEndpoint.trim().isEmpty()) {
+            tableTunnel.setEndpoint(tunnelEndpoint);
+        }
+        return tableTunnel;
+    }
+
+    private void initializeUpsertSession() throws TunnelException, IOException 
{
+        TableTunnel tunnel =
+                
getTableTunnel(readonlyConfig.get(MaxcomputeSinkOptions.TUNNEL_ENDPOINT));
+        if 
(readonlyConfig.getOptional(MaxcomputeSinkOptions.PARTITION_SPEC).isPresent()) {
+            PartitionSpec partitionSpec =
+                    new 
PartitionSpec(readonlyConfig.get(MaxcomputeSinkOptions.PARTITION_SPEC));
+            upsertSession =
+                    tunnel.buildUpsertSession(
+                                    
readonlyConfig.get(MaxcomputeSinkOptions.PROJECT),
+                                    
readonlyConfig.get(MaxcomputeSinkOptions.TABLE_NAME))
+                            .setPartitionSpec(partitionSpec)
+                            .build();
+
+        } else {
+            upsertSession =
+                    tunnel.buildUpsertSession(
+                                    
readonlyConfig.get(MaxcomputeSinkOptions.PROJECT),
+                                    
readonlyConfig.get(MaxcomputeSinkOptions.TABLE_NAME))
+                            .build();
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
index 6b50f50431..790287338c 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
@@ -30,7 +30,6 @@ import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.Maxcompute
 import com.aliyun.odps.Column;
 import com.aliyun.odps.Table;
 import com.aliyun.odps.TableSchema;
-import com.aliyun.odps.data.ArrayRecord;
 import com.aliyun.odps.data.Binary;
 import com.aliyun.odps.data.Char;
 import com.aliyun.odps.data.Record;
@@ -69,11 +68,11 @@ public class MaxcomputeTypeMapper implements Serializable {
     }
 
     public static Record getMaxcomputeRowData(
+            Record record,
             SeaTunnelRow seaTunnelRow,
             TableSchema tableSchema,
             SeaTunnelRowType rowType,
             FormatterContext formatterContext) {
-        ArrayRecord arrayRecord = new ArrayRecord(tableSchema);
         for (int i = 0; i < seaTunnelRow.getFields().length; i++) {
             String fieldName = rowType.getFieldName(i);
             if (!tableSchema.containsColumn(fieldName)) {
@@ -85,12 +84,12 @@ public class MaxcomputeTypeMapper implements Serializable {
             }
             Column column = tableSchema.getColumn(fieldName);
 
-            arrayRecord.set(
+            record.set(
                     tableSchema.getColumnIndex(fieldName),
                     resolveObject2Maxcompute(
                             seaTunnelRow.getField(i), column.getTypeInfo(), 
formatterContext));
         }
-        return arrayRecord;
+        return record;
     }
 
     public static SeaTunnelRowType getSeaTunnelRowType(ReadonlyConfig config) {
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/BasicTypeToOdpsTypeTest.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/BasicTypeToOdpsTypeTest.java
index 15d971f15e..02398cb432 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/BasicTypeToOdpsTypeTest.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/BasicTypeToOdpsTypeTest.java
@@ -65,7 +65,11 @@ public class BasicTypeToOdpsTypeTest {
         SeaTunnelRow seaTunnelRow = 
MaxcomputeTypeMapper.getSeaTunnelRowData(record, typeInfo);
         Record tRecord =
                 MaxcomputeTypeMapper.getMaxcomputeRowData(
-                        seaTunnelRow, tableSchema, typeInfo, 
defaultFormatterContext);
+                        new ArrayRecord(tableSchema),
+                        seaTunnelRow,
+                        tableSchema,
+                        typeInfo,
+                        defaultFormatterContext);
 
         for (int i = 0; i < tRecord.getColumns().length; i++) {
             Assertions.assertEquals(record.get(i), tRecord.get(i));
@@ -166,7 +170,11 @@ public class BasicTypeToOdpsTypeTest {
 
         Record finalOutputRecord =
                 MaxcomputeTypeMapper.getMaxcomputeRowData(
-                        seaTunnelRow, outputSchema, typeInfo, 
formatterContext);
+                        new ArrayRecord(outputSchema),
+                        seaTunnelRow,
+                        outputSchema,
+                        typeInfo,
+                        formatterContext);
 
         Assertions.assertEquals(expectedObject, 
finalOutputRecord.get(fieldName));
     }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/java/org/apache/seatunnel/e2e/connector/maxcompute/MaxComputeIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/java/org/apache/seatunnel/e2e/connector/maxcompute/MaxComputeIT.java
index 0c084f8641..5efe692954 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/java/org/apache/seatunnel/e2e/connector/maxcompute/MaxComputeIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/java/org/apache/seatunnel/e2e/connector/maxcompute/MaxComputeIT.java
@@ -28,12 +28,12 @@ import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.source.MaxcomputeSou
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.container.spark.Spark3Container;
 
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestTemplate;
 import org.testcontainers.containers.Container;
@@ -145,7 +145,11 @@ public class MaxComputeIT extends TestSuiteBase implements 
TestResource {
 
     private static void createTableWithData(Odps odps, String tableName) 
throws OdpsException {
         Instance instance =
-                SQLTask.run(odps, "create table " + tableName + " (id INT, 
name STRING, age INT);");
+                SQLTask.run(
+                        odps,
+                        "create table "
+                                + tableName
+                                + " (id INT, name STRING, age INT, PRIMARY 
KEY(id));");
         instance.waitForSuccess();
         Assertions.assertTrue(odps.tables().exists(tableName));
         Instance insert =
@@ -158,6 +162,17 @@ public class MaxComputeIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertEquals(3, queryTable(odps, tableName).size());
     }
 
+    private static void createEmptyTable(Odps odps, String tableName) throws 
OdpsException {
+        Instance instance =
+                SQLTask.run(
+                        odps,
+                        "create table "
+                                + tableName
+                                + " (id INT, name STRING, age INT, PRIMARY 
KEY(id));");
+        instance.waitForSuccess();
+        Assertions.assertTrue(odps.tables().exists(tableName));
+    }
+
     private static List<Record> queryTable(Odps odps, String tableName) throws 
OdpsException {
         Instance instance = SQLTask.run(odps, "select * from " + tableName + 
";");
         instance.waitForSuccess();
@@ -179,45 +194,51 @@ public class MaxComputeIT extends TestSuiteBase 
implements TestResource {
     }
 
     @TestTemplate
-    @Disabled(
-            "maxcompute-emulator does not support upload session for now, we 
need move to upsert session in MaxComputeWriter")
     public void testMaxCompute(TestContainer container)
             throws IOException, InterruptedException, OdpsException {
+        if (container instanceof Spark3Container) {
+            log.info("Skip on Spark 3.3.0 due to Netty conflict.");
+            return;
+        }
         Odps odps = getTestOdps();
         odps.tables().delete("mocked_mc", "test_table_sink", true);
+        createEmptyTable(odps, "test_table_sink");
         prepareContainer();
         Container.ExecResult execResult = 
container.executeJob("/maxcompute_to_maxcompute.conf");
-        prepareLocal();
         Assertions.assertEquals(0, execResult.getExitCode());
-        Assertions.assertEquals(3, 
odps.tables().get("test_table_sink").getRecordNum());
+        prepareLocal();
         List<Record> records = queryTable(odps, "test_table_sink");
         Assertions.assertEquals(3, records.size());
-        Assertions.assertEquals(1, records.get(0).get("id"));
-        Assertions.assertEquals("test", records.get(0).get("name"));
-        Assertions.assertEquals(20, records.get(0).get("age"));
-        Assertions.assertEquals(2, records.get(1).get("id"));
-        Assertions.assertEquals("test2", records.get(1).get("name"));
-        Assertions.assertEquals(30, records.get(1).get("age"));
-        Assertions.assertEquals(3, records.get(2).get("id"));
-        Assertions.assertEquals("test3", records.get(2).get("name"));
-        Assertions.assertEquals(40, records.get(2).get("age"));
+        Assertions.assertEquals("1", records.get(0).get(0));
+        Assertions.assertEquals("INSERT_TEST1", records.get(0).get(1));
+        Assertions.assertEquals("20", records.get(0).get(2));
+        Assertions.assertEquals("2", records.get(1).get(0));
+        Assertions.assertEquals("INSERT_TEST2", records.get(1).get(1));
+        Assertions.assertEquals("30", records.get(1).get(2));
+        Assertions.assertEquals("3", records.get(2).get(0));
+        Assertions.assertEquals("INSERT_TEST3", records.get(2).get(1));
+        Assertions.assertEquals("40", records.get(2).get(2));
     }
 
     @TestTemplate
-    @Disabled(
-            "maxcompute-emulator does not support upload session for now, we 
need move to upsert session in MaxComputeWriter")
     public void testMaxComputeMultiTable(TestContainer container)
             throws OdpsException, IOException, InterruptedException {
+        if (container instanceof Spark3Container) {
+            log.info("Skip on Spark 3.3.0 due to Netty conflict.");
+            return;
+        }
         Odps odps = getTestOdps();
         odps.tables().delete("mocked_mc", "test_table_sink", true);
         odps.tables().delete("mocked_mc", "test_table_2_sink", true);
+        createEmptyTable(odps, "test_table_sink");
+        createEmptyTable(odps, "test_table_2_sink");
         prepareContainer();
         Container.ExecResult execResult =
                 
container.executeJob("/maxcompute_to_maxcompute_multi_table.conf");
         prepareLocal();
         Assertions.assertEquals(0, execResult.getExitCode());
         Assertions.assertEquals(3, queryTable(odps, "test_table_sink").size());
-        Assertions.assertEquals(3, queryTable(odps, 
"test_table_2_sink").size());
+        Assertions.assertEquals(2, queryTable(odps, 
"test_table_2_sink").size());
     }
 
     @Test
@@ -241,6 +262,69 @@ public class MaxComputeIT extends TestSuiteBase implements 
TestResource {
                 new String[] {"ID", "NAME"}, 
table.getTableSchema().getFieldNames());
     }
 
+    @TestTemplate
+    public void testMaxComputeUpsert(TestContainer container)
+            throws IOException, InterruptedException, OdpsException {
+        if (container instanceof Spark3Container) {
+            log.info("Skip on Spark 3.3.0 due to Netty conflict.");
+            return;
+        }
+        Odps odps = getTestOdps();
+        odps.tables().delete("mocked_mc", "test_table_sink", true);
+        createTableWithData(odps, "test_table_sink");
+        List<Record> records = queryTable(odps, "test_table_sink");
+        Assertions.assertEquals("1", records.get(0).get(0));
+        Assertions.assertEquals("TEST", records.get(0).get(1));
+        Assertions.assertEquals("20", records.get(0).get(2));
+
+        prepareContainer();
+        Container.ExecResult execResult = 
container.executeJob("/fake_maxcompute_upsert.conf");
+
+        Assertions.assertEquals(0, execResult.getExitCode());
+        prepareLocal();
+        records = queryTable(odps, "test_table_sink");
+        Assertions.assertEquals(3, records.size());
+        Assertions.assertEquals("1", records.get(0).get(0));
+        Assertions.assertEquals("UPSERT_TEST", records.get(0).get(1));
+        Assertions.assertEquals("100", records.get(0).get(2));
+        Assertions.assertEquals("2", records.get(1).get(0));
+        Assertions.assertEquals("TEST2", records.get(1).get(1));
+        Assertions.assertEquals("30", records.get(1).get(2));
+        Assertions.assertEquals("3", records.get(2).get(0));
+        Assertions.assertEquals("TEST3", records.get(2).get(1));
+        Assertions.assertEquals("40", records.get(2).get(2));
+    }
+
+    @TestTemplate
+    public void testMaxComputeDelete(TestContainer container)
+            throws IOException, InterruptedException, OdpsException {
+        if (container instanceof Spark3Container) {
+            log.info("Skip on Spark 3.3.0 due to Netty conflict.");
+            return;
+        }
+        Odps odps = getTestOdps();
+        odps.tables().delete("mocked_mc", "test_table_sink", true);
+        createTableWithData(odps, "test_table_sink");
+        List<Record> records = queryTable(odps, "test_table_sink");
+        Assertions.assertEquals("1", records.get(0).get(0));
+        Assertions.assertEquals("TEST", records.get(0).get(1));
+        Assertions.assertEquals("20", records.get(0).get(2));
+
+        prepareContainer();
+        Container.ExecResult execResult = 
container.executeJob("/fake_maxcompute_delete.conf");
+
+        Assertions.assertEquals(0, execResult.getExitCode());
+        prepareLocal();
+        records = queryTable(odps, "test_table_sink");
+        Assertions.assertEquals(2, records.size());
+        Assertions.assertEquals("2", records.get(0).get(0));
+        Assertions.assertEquals("TEST2", records.get(0).get(1));
+        Assertions.assertEquals("30", records.get(0).get(2));
+        Assertions.assertEquals("3", records.get(1).get(0));
+        Assertions.assertEquals("TEST3", records.get(1).get(1));
+        Assertions.assertEquals("40", records.get(1).get(2));
+    }
+
     // here use java http client to send post, okhttp or other http client can 
also be used
     public static void sendPOST(String postUrl, String postData) throws 
IOException {
         URL url = new URL(postUrl);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/maxcompute_to_maxcompute.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/fake_maxcompute_delete.conf
similarity index 70%
copy from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/maxcompute_to_maxcompute.conf
copy to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/fake_maxcompute_delete.conf
index 7c2d49940b..226280dfbf 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/maxcompute_to_maxcompute.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/fake_maxcompute_delete.conf
@@ -25,13 +25,29 @@ env {
 }
 
 source {
-  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
-  Maxcompute {
-    accessId = "ak"
-    accesskey = "sk"
-    endpoint = "http://maxcompute:8080";
-    project = "mocked_mc"
-    table_name = "test_table"
+  FakeSource {
+    tables_configs = [
+      {
+        schema = {
+          table = "test_table_sink"
+          fields {
+            ID = int
+            NAME = string
+            AGE = int
+          }
+          primaryKey {
+            name = "ID"
+            columnNames = [ID]
+          }
+        }
+        rows = [
+          {
+            kind = DELETE
+            fields = [1, "TEST", 20]
+          }
+        ]
+      }
+    ]
   }
 }
 
@@ -43,6 +59,7 @@ sink {
     accessId = "ak"
     accesskey = "sk"
     endpoint = "http://maxcompute:8080";
+    tunnel_endpoint = "http://maxcompute:8080";
     project = "mocked_mc"
     table_name = "test_table_sink"
   }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/maxcompute_to_maxcompute.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/fake_maxcompute_upsert.conf
similarity index 69%
copy from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/maxcompute_to_maxcompute.conf
copy to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/fake_maxcompute_upsert.conf
index 7c2d49940b..42724897e4 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/maxcompute_to_maxcompute.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/fake_maxcompute_upsert.conf
@@ -25,13 +25,29 @@ env {
 }
 
 source {
-  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
-  Maxcompute {
-    accessId = "ak"
-    accesskey = "sk"
-    endpoint = "http://maxcompute:8080";
-    project = "mocked_mc"
-    table_name = "test_table"
+  FakeSource {
+    tables_configs = [
+      {
+        schema = {
+          table = "test_table_sink"
+          fields {
+            ID = int
+            NAME = string
+            AGE = int
+          }
+          primaryKey {
+            name = "ID"
+            columnNames = [ID]
+          }
+        }
+        rows = [
+          {
+            kind = UPDATE_AFTER
+            fields = [1, "UPSERT_TEST", 100]
+          }
+        ]
+      }
+    ]
   }
 }
 
@@ -43,6 +59,7 @@ sink {
     accessId = "ak"
     accesskey = "sk"
     endpoint = "http://maxcompute:8080";
+    tunnel_endpoint = "http://maxcompute:8080";
     project = "mocked_mc"
     table_name = "test_table_sink"
   }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/maxcompute_to_maxcompute.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/maxcompute_to_maxcompute.conf
index 7c2d49940b..c995717b27 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/maxcompute_to_maxcompute.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/maxcompute_to_maxcompute.conf
@@ -25,13 +25,37 @@ env {
 }
 
 source {
-  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
-  Maxcompute {
-    accessId = "ak"
-    accesskey = "sk"
-    endpoint = "http://maxcompute:8080";
-    project = "mocked_mc"
-    table_name = "test_table"
+  FakeSource {
+    tables_configs = [
+      {
+        schema = {
+          table = "test_table_sink"
+          fields {
+            ID = int
+            NAME = string
+            AGE = int
+          }
+          primaryKey {
+            name = "ID"
+            columnNames = [ID]
+          }
+        }
+        rows = [
+          {
+            kind = INSERT
+            fields = [1, "INSERT_TEST1", 20]
+          }
+          {
+            kind = INSERT
+            fields = [2, "INSERT_TEST2", 30]
+          }
+          {
+            kind = INSERT
+            fields = [3, "INSERT_TEST3", 40]
+          }
+        ]
+      }
+    ]
   }
 }
 
@@ -43,6 +67,7 @@ sink {
     accessId = "ak"
     accesskey = "sk"
     endpoint = "http://maxcompute:8080";
+    tunnel_endpoint = "http://maxcompute:8080";
     project = "mocked_mc"
     table_name = "test_table_sink"
   }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/maxcompute_to_maxcompute_multi_table.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/maxcompute_to_maxcompute_multi_table.conf
index 68ad00f9e5..00b9d5e53c 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/maxcompute_to_maxcompute_multi_table.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/maxcompute_to_maxcompute_multi_table.conf
@@ -25,19 +25,60 @@ env {
 }
 
 source {
-  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
-  Maxcompute {
-    accessId = "ak"
-    accesskey = "sk"
-    endpoint = "http://maxcompute:8080";
-    project = "mocked_mc"
-    table_list = [
+  FakeSource {
+    tables_configs = [
       {
-        table_name = "test_table"
-      },
+        schema = {
+          table = "test_table"
+          fields {
+            ID = int
+            NAME = string
+            AGE = int
+          }
+          primaryKey {
+            name = "ID"
+            columnNames = [ID]
+          }
+        }
+        rows = [
+          {
+            kind = INSERT
+            fields = [1, "INSERT_TEST1", 20]
+          }
+          {
+            kind = INSERT
+            fields = [2, "INSERT_TEST2", 30]
+          }
+          {
+            kind = INSERT
+            fields = [3, "INSERT_TEST3", 40]
+          }
+        ]
+      }
       {
-        table_name = "test_table_2"
+      schema = {
+        table = "test_table_2"
+        fields {
+          ID = int
+          NAME = string
+          AGE = int
+        }
+        primaryKey {
+          name = "ID"
+          columnNames = [ID]
+        }
       }
+      rows = [
+        {
+          kind = INSERT
+          fields = [1, "INSERT_TEST1", 20]
+        }
+        {
+          kind = INSERT
+          fields = [2, "INSERT_TEST2", 30]
+        }
+      ]
+    }
     ]
   }
 }
@@ -50,6 +91,7 @@ sink {
     accessId = "ak"
     accesskey = "sk"
     endpoint = "http://maxcompute:8080";
+    tunnel_endpoint = "http://maxcompute:8080";
     project = "mocked_mc"
     table_name = "${table_name}_sink"
   }


Reply via email to