This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new eb9c8704b9 [Improve][Connector-V2] Support maxcompute sink writer
upsert/delete action with upsert session mode (#9462)
eb9c8704b9 is described below
commit eb9c8704b9a349e0163d49e40357cce79851d86e
Author: dy102 <[email protected]>
AuthorDate: Thu Jun 26 22:38:53 2025 +0900
[Improve][Connector-V2] Support maxcompute sink writer upsert/delete action
with upsert session mode (#9462)
---
docs/en/connector-v2/sink/Maxcompute.md | 18 +++
.../maxcompute/config/MaxcomputeSinkOptions.java | 7 +
.../maxcompute/sink/MaxcomputeSinkFactory.java | 1 +
.../maxcompute/sink/MaxcomputeWriter.java | 72 +++--------
.../maxcompute/util/MaxcomputeOutputFormat.java | 141 +++++++++++++++++++++
.../maxcompute/util/MaxcomputeTypeMapper.java | 7 +-
.../maxcompute/BasicTypeToOdpsTypeTest.java | 12 +-
.../e2e/connector/maxcompute/MaxComputeIT.java | 120 +++++++++++++++---
...maxcompute.conf => fake_maxcompute_delete.conf} | 31 ++++-
...maxcompute.conf => fake_maxcompute_upsert.conf} | 31 ++++-
.../test/resources/maxcompute_to_maxcompute.conf | 39 +++++-
.../maxcompute_to_maxcompute_multi_table.conf | 62 +++++++--
12 files changed, 433 insertions(+), 108 deletions(-)
diff --git a/docs/en/connector-v2/sink/Maxcompute.md
b/docs/en/connector-v2/sink/Maxcompute.md
index ed268102f4..559b30f3e5 100644
--- a/docs/en/connector-v2/sink/Maxcompute.md
+++ b/docs/en/connector-v2/sink/Maxcompute.md
@@ -132,6 +132,24 @@ Example values:
Default: `yyyy-MM-dd HH:mm:ss`
+### tunnel_endpoint[String]
+Specifies the custom endpoint URL for the MaxCompute Tunnel service.
+
+By default, the endpoint is automatically inferred from the configured region.
+
+This option allows you to override the default behavior and use a custom
Tunnel endpoint.
+If not specified, the connector will use the region-based default Tunnel
endpoint.
+
+In general, you do **not** need to set tunnel_endpoint. It is only needed for
custom networking, debugging, or local development.
+
+Example values:
+
+- `https://dt.cn-hangzhou.maxcompute.aliyun.com`
+- `https://dt.ap-southeast-1.maxcompute.aliyun.com`
+- `http://maxcompute:8080`
+
+Default: Not set (auto-inferred from region)
+
### common options
Sink plugin common parameters, please refer to [Sink Common
Options](../sink-common-options.md) for details.
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeSinkOptions.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeSinkOptions.java
index 8a6da00f00..85b8d541e4 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeSinkOptions.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeSinkOptions.java
@@ -61,4 +61,11 @@ public class MaxcomputeSinkOptions extends
MaxcomputeBaseOptions {
+ "' ;")
.withDescription(
"Create table statement template, used to create
MaxCompute table");
+
+ public static final Option<String> TUNNEL_ENDPOINT =
+ Options.key("tunnel_endpoint")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Tunnel endpoint, e.g.
https://dt.cn-hangzhou.maxcompute.aliyun.com");
}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java
index 1f1e7c0f1d..d58f9a4eba 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java
@@ -54,6 +54,7 @@ public class MaxcomputeSinkFactory implements
TableSinkFactory {
MaxcomputeSinkOptions.SAVE_MODE_CREATE_TEMPLATE,
MaxcomputeSinkOptions.CUSTOM_SQL,
FormatOptions.DATETIME_FORMAT,
+ MaxcomputeSinkOptions.TUNNEL_ENDPOINT,
SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
index e800a504c8..dfc73d8f0d 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java
@@ -18,24 +18,16 @@
package org.apache.seatunnel.connectors.seatunnel.maxcompute.sink;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.options.table.FormatOptions;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
-import
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeSinkOptions;
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeBaseOptions;
import
org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;
-import
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.FormatterContext;
-import
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeTypeMapper;
-import
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeUtil;
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeOutputFormat;
-import com.aliyun.odps.PartitionSpec;
-import com.aliyun.odps.Table;
-import com.aliyun.odps.TableSchema;
-import com.aliyun.odps.data.Record;
-import com.aliyun.odps.data.RecordWriter;
-import com.aliyun.odps.tunnel.TableTunnel;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
@@ -43,38 +35,11 @@ import java.io.IOException;
@Slf4j
public class MaxcomputeWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
implements SupportMultiTableSinkWriter<Void> {
- private RecordWriter recordWriter;
- private final TableTunnel.UploadSession session;
- private final TableSchema tableSchema;
- private final SeaTunnelRowType rowType;
- private final FormatterContext formatterContext;
+ private MaxcomputeOutputFormat writer;
public MaxcomputeWriter(ReadonlyConfig readonlyConfig, SeaTunnelRowType
rowType) {
try {
- this.rowType = rowType;
- Table table = MaxcomputeUtil.getTable(readonlyConfig);
- this.tableSchema = table.getSchema();
- TableTunnel tunnel = MaxcomputeUtil.getTableTunnel(readonlyConfig);
- if
(readonlyConfig.getOptional(MaxcomputeSinkOptions.PARTITION_SPEC).isPresent()) {
- PartitionSpec partitionSpec =
- new
PartitionSpec(readonlyConfig.get(MaxcomputeSinkOptions.PARTITION_SPEC));
- session =
- tunnel.createUploadSession(
-
readonlyConfig.get(MaxcomputeSinkOptions.PROJECT),
-
readonlyConfig.get(MaxcomputeSinkOptions.TABLE_NAME),
- partitionSpec);
- } else {
- session =
- tunnel.createUploadSession(
-
readonlyConfig.get(MaxcomputeSinkOptions.PROJECT),
-
readonlyConfig.get(MaxcomputeSinkOptions.TABLE_NAME));
- }
-
- this.formatterContext =
- new
FormatterContext(readonlyConfig.get(FormatOptions.DATETIME_FORMAT));
-
- this.recordWriter = session.openBufferedWriter();
- log.info("open record writer success");
+ writer = new MaxcomputeOutputFormat(rowType, readonlyConfig);
} catch (Exception e) {
throw new MaxcomputeConnectorException(
CommonErrorCodeDeprecated.WRITER_OPERATION_FAILED, e);
@@ -83,23 +48,24 @@ public class MaxcomputeWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
@Override
public void write(SeaTunnelRow seaTunnelRow) throws IOException {
- Record record =
- MaxcomputeTypeMapper.getMaxcomputeRowData(
- seaTunnelRow, this.tableSchema, this.rowType,
formatterContext);
- recordWriter.write(record);
+ try {
+ writer.write(seaTunnelRow);
+ } catch (IOException e1) {
+ throw e1;
+ } catch (Exception e2) {
+ throw CommonError.writeSeaTunnelRowFailed(
+ MaxcomputeBaseOptions.PLUGIN_NAME,
seaTunnelRow.toString(), e2);
+ }
}
@Override
public void close() throws IOException {
- if (recordWriter != null) {
- recordWriter.close();
- try {
- session.commit();
- } catch (Exception e) {
- throw new MaxcomputeConnectorException(
- CommonErrorCodeDeprecated.WRITER_OPERATION_FAILED, e);
- }
- recordWriter = null;
+ try {
+ writer.close();
+ } catch (IOException e1) {
+ throw e1;
+ } catch (Exception e2) {
+ throw CommonError.closeFailed(MaxcomputeBaseOptions.PLUGIN_NAME,
e2);
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeOutputFormat.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeOutputFormat.java
new file mode 100644
index 0000000000..0cc37b0ad0
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeOutputFormat.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.util;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.options.table.FormatOptions;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonError;
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeBaseOptions;
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeSinkOptions;
+
+import com.aliyun.odps.PartitionSpec;
+import com.aliyun.odps.TableSchema;
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.tunnel.TableTunnel;
+import com.aliyun.odps.tunnel.TunnelException;
+import com.aliyun.odps.tunnel.streams.UpsertStream;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+
+@Slf4j
+public class MaxcomputeOutputFormat {
+ private final ReadonlyConfig readonlyConfig;
+
+ private final TableSchema tableSchema;
+ private final SeaTunnelRowType rowType;
+ private final FormatterContext formatterContext;
+
+ private UpsertStream upsertStream;
+ private TableTunnel.UpsertSession upsertSession;
+
+ public MaxcomputeOutputFormat(SeaTunnelRowType rowType, ReadonlyConfig
readonlyConfig) {
+ this.rowType = rowType;
+ this.readonlyConfig = readonlyConfig;
+ this.tableSchema = MaxcomputeUtil.getTable(readonlyConfig).getSchema();
+ this.formatterContext =
+ new
FormatterContext(readonlyConfig.get(FormatOptions.DATETIME_FORMAT));
+ }
+
+ public void write(SeaTunnelRow seaTunnelRow) throws IOException,
TunnelException {
+ ensureUpsertSessionAndWriter();
+ Record newRecord =
+ MaxcomputeTypeMapper.getMaxcomputeRowData(
+ upsertSession.newRecord(),
+ seaTunnelRow,
+ this.tableSchema,
+ this.rowType,
+ formatterContext);
+
+ switch (seaTunnelRow.getRowKind()) {
+ case INSERT:
+ case UPDATE_AFTER:
+ upsertStream.upsert(newRecord);
+ break;
+ case DELETE:
+ upsertStream.delete(newRecord);
+ break;
+ default:
+ throw CommonError.unsupportedDataType(
+ MaxcomputeBaseOptions.PLUGIN_NAME,
+ seaTunnelRow.getRowKind().toString(),
+ seaTunnelRow.toString());
+ }
+ }
+
+ public void close() throws IOException, TunnelException {
+ if (upsertStream != null) {
+ try {
+ upsertStream.close();
+ } finally {
+ upsertStream = null;
+ }
+ }
+
+ if (upsertSession != null) {
+ try {
+ upsertSession.commit(true);
+ } finally {
+ upsertSession.close();
+ upsertSession = null;
+ }
+ }
+ }
+
+ private void ensureUpsertSessionAndWriter() throws TunnelException,
IOException {
+ if (upsertSession == null) {
+ initializeUpsertSession();
+ }
+ if (upsertStream == null) {
+ this.upsertStream = upsertSession.buildUpsertStream().build();
+ log.info("build upsert stream success");
+ }
+ }
+
+ private TableTunnel getTableTunnel(String tunnelEndpoint) {
+ TableTunnel tableTunnel =
MaxcomputeUtil.getTableTunnel(readonlyConfig);
+ if (tunnelEndpoint != null && !tunnelEndpoint.trim().isEmpty()) {
+ tableTunnel.setEndpoint(tunnelEndpoint);
+ }
+ return tableTunnel;
+ }
+
+ private void initializeUpsertSession() throws TunnelException, IOException
{
+ TableTunnel tunnel =
+
getTableTunnel(readonlyConfig.get(MaxcomputeSinkOptions.TUNNEL_ENDPOINT));
+ if
(readonlyConfig.getOptional(MaxcomputeSinkOptions.PARTITION_SPEC).isPresent()) {
+ PartitionSpec partitionSpec =
+ new
PartitionSpec(readonlyConfig.get(MaxcomputeSinkOptions.PARTITION_SPEC));
+ upsertSession =
+ tunnel.buildUpsertSession(
+
readonlyConfig.get(MaxcomputeSinkOptions.PROJECT),
+
readonlyConfig.get(MaxcomputeSinkOptions.TABLE_NAME))
+ .setPartitionSpec(partitionSpec)
+ .build();
+
+ } else {
+ upsertSession =
+ tunnel.buildUpsertSession(
+
readonlyConfig.get(MaxcomputeSinkOptions.PROJECT),
+
readonlyConfig.get(MaxcomputeSinkOptions.TABLE_NAME))
+ .build();
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
index 6b50f50431..790287338c 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
@@ -30,7 +30,6 @@ import
org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.Maxcompute
import com.aliyun.odps.Column;
import com.aliyun.odps.Table;
import com.aliyun.odps.TableSchema;
-import com.aliyun.odps.data.ArrayRecord;
import com.aliyun.odps.data.Binary;
import com.aliyun.odps.data.Char;
import com.aliyun.odps.data.Record;
@@ -69,11 +68,11 @@ public class MaxcomputeTypeMapper implements Serializable {
}
public static Record getMaxcomputeRowData(
+ Record record,
SeaTunnelRow seaTunnelRow,
TableSchema tableSchema,
SeaTunnelRowType rowType,
FormatterContext formatterContext) {
- ArrayRecord arrayRecord = new ArrayRecord(tableSchema);
for (int i = 0; i < seaTunnelRow.getFields().length; i++) {
String fieldName = rowType.getFieldName(i);
if (!tableSchema.containsColumn(fieldName)) {
@@ -85,12 +84,12 @@ public class MaxcomputeTypeMapper implements Serializable {
}
Column column = tableSchema.getColumn(fieldName);
- arrayRecord.set(
+ record.set(
tableSchema.getColumnIndex(fieldName),
resolveObject2Maxcompute(
seaTunnelRow.getField(i), column.getTypeInfo(),
formatterContext));
}
- return arrayRecord;
+ return record;
}
public static SeaTunnelRowType getSeaTunnelRowType(ReadonlyConfig config) {
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/BasicTypeToOdpsTypeTest.java
b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/BasicTypeToOdpsTypeTest.java
index 15d971f15e..02398cb432 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/BasicTypeToOdpsTypeTest.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/BasicTypeToOdpsTypeTest.java
@@ -65,7 +65,11 @@ public class BasicTypeToOdpsTypeTest {
SeaTunnelRow seaTunnelRow =
MaxcomputeTypeMapper.getSeaTunnelRowData(record, typeInfo);
Record tRecord =
MaxcomputeTypeMapper.getMaxcomputeRowData(
- seaTunnelRow, tableSchema, typeInfo,
defaultFormatterContext);
+ new ArrayRecord(tableSchema),
+ seaTunnelRow,
+ tableSchema,
+ typeInfo,
+ defaultFormatterContext);
for (int i = 0; i < tRecord.getColumns().length; i++) {
Assertions.assertEquals(record.get(i), tRecord.get(i));
@@ -166,7 +170,11 @@ public class BasicTypeToOdpsTypeTest {
Record finalOutputRecord =
MaxcomputeTypeMapper.getMaxcomputeRowData(
- seaTunnelRow, outputSchema, typeInfo,
formatterContext);
+ new ArrayRecord(outputSchema),
+ seaTunnelRow,
+ outputSchema,
+ typeInfo,
+ formatterContext);
Assertions.assertEquals(expectedObject,
finalOutputRecord.get(fieldName));
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/java/org/apache/seatunnel/e2e/connector/maxcompute/MaxComputeIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/java/org/apache/seatunnel/e2e/connector/maxcompute/MaxComputeIT.java
index 0c084f8641..5efe692954 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/java/org/apache/seatunnel/e2e/connector/maxcompute/MaxComputeIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/java/org/apache/seatunnel/e2e/connector/maxcompute/MaxComputeIT.java
@@ -28,12 +28,12 @@ import
org.apache.seatunnel.connectors.seatunnel.maxcompute.source.MaxcomputeSou
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.container.spark.Spark3Container;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
@@ -145,7 +145,11 @@ public class MaxComputeIT extends TestSuiteBase implements
TestResource {
private static void createTableWithData(Odps odps, String tableName)
throws OdpsException {
Instance instance =
- SQLTask.run(odps, "create table " + tableName + " (id INT,
name STRING, age INT);");
+ SQLTask.run(
+ odps,
+ "create table "
+ + tableName
+ + " (id INT, name STRING, age INT, PRIMARY
KEY(id));");
instance.waitForSuccess();
Assertions.assertTrue(odps.tables().exists(tableName));
Instance insert =
@@ -158,6 +162,17 @@ public class MaxComputeIT extends TestSuiteBase implements
TestResource {
Assertions.assertEquals(3, queryTable(odps, tableName).size());
}
+ private static void createEmptyTable(Odps odps, String tableName) throws
OdpsException {
+ Instance instance =
+ SQLTask.run(
+ odps,
+ "create table "
+ + tableName
+ + " (id INT, name STRING, age INT, PRIMARY
KEY(id));");
+ instance.waitForSuccess();
+ Assertions.assertTrue(odps.tables().exists(tableName));
+ }
+
private static List<Record> queryTable(Odps odps, String tableName) throws
OdpsException {
Instance instance = SQLTask.run(odps, "select * from " + tableName +
";");
instance.waitForSuccess();
@@ -179,45 +194,51 @@ public class MaxComputeIT extends TestSuiteBase
implements TestResource {
}
@TestTemplate
- @Disabled(
- "maxcompute-emulator does not support upload session for now, we
need move to upsert session in MaxComputeWriter")
public void testMaxCompute(TestContainer container)
throws IOException, InterruptedException, OdpsException {
+ if (container instanceof Spark3Container) {
+ log.info("Skip on Spark 3.3.0 due to Netty conflict.");
+ return;
+ }
Odps odps = getTestOdps();
odps.tables().delete("mocked_mc", "test_table_sink", true);
+ createEmptyTable(odps, "test_table_sink");
prepareContainer();
Container.ExecResult execResult =
container.executeJob("/maxcompute_to_maxcompute.conf");
- prepareLocal();
Assertions.assertEquals(0, execResult.getExitCode());
- Assertions.assertEquals(3,
odps.tables().get("test_table_sink").getRecordNum());
+ prepareLocal();
List<Record> records = queryTable(odps, "test_table_sink");
Assertions.assertEquals(3, records.size());
- Assertions.assertEquals(1, records.get(0).get("id"));
- Assertions.assertEquals("test", records.get(0).get("name"));
- Assertions.assertEquals(20, records.get(0).get("age"));
- Assertions.assertEquals(2, records.get(1).get("id"));
- Assertions.assertEquals("test2", records.get(1).get("name"));
- Assertions.assertEquals(30, records.get(1).get("age"));
- Assertions.assertEquals(3, records.get(2).get("id"));
- Assertions.assertEquals("test3", records.get(2).get("name"));
- Assertions.assertEquals(40, records.get(2).get("age"));
+ Assertions.assertEquals("1", records.get(0).get(0));
+ Assertions.assertEquals("INSERT_TEST1", records.get(0).get(1));
+ Assertions.assertEquals("20", records.get(0).get(2));
+ Assertions.assertEquals("2", records.get(1).get(0));
+ Assertions.assertEquals("INSERT_TEST2", records.get(1).get(1));
+ Assertions.assertEquals("30", records.get(1).get(2));
+ Assertions.assertEquals("3", records.get(2).get(0));
+ Assertions.assertEquals("INSERT_TEST3", records.get(2).get(1));
+ Assertions.assertEquals("40", records.get(2).get(2));
}
@TestTemplate
- @Disabled(
- "maxcompute-emulator does not support upload session for now, we
need move to upsert session in MaxComputeWriter")
public void testMaxComputeMultiTable(TestContainer container)
throws OdpsException, IOException, InterruptedException {
+ if (container instanceof Spark3Container) {
+ log.info("Skip on Spark 3.3.0 due to Netty conflict.");
+ return;
+ }
Odps odps = getTestOdps();
odps.tables().delete("mocked_mc", "test_table_sink", true);
odps.tables().delete("mocked_mc", "test_table_2_sink", true);
+ createEmptyTable(odps, "test_table_sink");
+ createEmptyTable(odps, "test_table_2_sink");
prepareContainer();
Container.ExecResult execResult =
container.executeJob("/maxcompute_to_maxcompute_multi_table.conf");
prepareLocal();
Assertions.assertEquals(0, execResult.getExitCode());
Assertions.assertEquals(3, queryTable(odps, "test_table_sink").size());
- Assertions.assertEquals(3, queryTable(odps,
"test_table_2_sink").size());
+ Assertions.assertEquals(2, queryTable(odps,
"test_table_2_sink").size());
}
@Test
@@ -241,6 +262,69 @@ public class MaxComputeIT extends TestSuiteBase implements
TestResource {
new String[] {"ID", "NAME"},
table.getTableSchema().getFieldNames());
}
+ @TestTemplate
+ public void testMaxComputeUpsert(TestContainer container)
+ throws IOException, InterruptedException, OdpsException {
+ if (container instanceof Spark3Container) {
+ log.info("Skip on Spark 3.3.0 due to Netty conflict.");
+ return;
+ }
+ Odps odps = getTestOdps();
+ odps.tables().delete("mocked_mc", "test_table_sink", true);
+ createTableWithData(odps, "test_table_sink");
+ List<Record> records = queryTable(odps, "test_table_sink");
+ Assertions.assertEquals("1", records.get(0).get(0));
+ Assertions.assertEquals("TEST", records.get(0).get(1));
+ Assertions.assertEquals("20", records.get(0).get(2));
+
+ prepareContainer();
+ Container.ExecResult execResult =
container.executeJob("/fake_maxcompute_upsert.conf");
+
+ Assertions.assertEquals(0, execResult.getExitCode());
+ prepareLocal();
+ records = queryTable(odps, "test_table_sink");
+ Assertions.assertEquals(3, records.size());
+ Assertions.assertEquals("1", records.get(0).get(0));
+ Assertions.assertEquals("UPSERT_TEST", records.get(0).get(1));
+ Assertions.assertEquals("100", records.get(0).get(2));
+ Assertions.assertEquals("2", records.get(1).get(0));
+ Assertions.assertEquals("TEST2", records.get(1).get(1));
+ Assertions.assertEquals("30", records.get(1).get(2));
+ Assertions.assertEquals("3", records.get(2).get(0));
+ Assertions.assertEquals("TEST3", records.get(2).get(1));
+ Assertions.assertEquals("40", records.get(2).get(2));
+ }
+
+ @TestTemplate
+ public void testMaxComputeDelete(TestContainer container)
+ throws IOException, InterruptedException, OdpsException {
+ if (container instanceof Spark3Container) {
+ log.info("Skip on Spark 3.3.0 due to Netty conflict.");
+ return;
+ }
+ Odps odps = getTestOdps();
+ odps.tables().delete("mocked_mc", "test_table_sink", true);
+ createTableWithData(odps, "test_table_sink");
+ List<Record> records = queryTable(odps, "test_table_sink");
+ Assertions.assertEquals("1", records.get(0).get(0));
+ Assertions.assertEquals("TEST", records.get(0).get(1));
+ Assertions.assertEquals("20", records.get(0).get(2));
+
+ prepareContainer();
+ Container.ExecResult execResult =
container.executeJob("/fake_maxcompute_delete.conf");
+
+ Assertions.assertEquals(0, execResult.getExitCode());
+ prepareLocal();
+ records = queryTable(odps, "test_table_sink");
+ Assertions.assertEquals(2, records.size());
+ Assertions.assertEquals("2", records.get(0).get(0));
+ Assertions.assertEquals("TEST2", records.get(0).get(1));
+ Assertions.assertEquals("30", records.get(0).get(2));
+ Assertions.assertEquals("3", records.get(1).get(0));
+ Assertions.assertEquals("TEST3", records.get(1).get(1));
+ Assertions.assertEquals("40", records.get(1).get(2));
+ }
+
// here use java http client to send post, okhttp or other http client can
also be used
public static void sendPOST(String postUrl, String postData) throws
IOException {
URL url = new URL(postUrl);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/maxcompute_to_maxcompute.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/fake_maxcompute_delete.conf
similarity index 70%
copy from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/maxcompute_to_maxcompute.conf
copy to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/fake_maxcompute_delete.conf
index 7c2d49940b..226280dfbf 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/maxcompute_to_maxcompute.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/fake_maxcompute_delete.conf
@@ -25,13 +25,29 @@ env {
}
source {
- # This is a example source plugin **only for test and demonstrate the
feature source plugin**
- Maxcompute {
- accessId = "ak"
- accesskey = "sk"
- endpoint = "http://maxcompute:8080"
- project = "mocked_mc"
- table_name = "test_table"
+ FakeSource {
+ tables_configs = [
+ {
+ schema = {
+ table = "test_table_sink"
+ fields {
+ ID = int
+ NAME = string
+ AGE = int
+ }
+ primaryKey {
+ name = "ID"
+ columnNames = [ID]
+ }
+ }
+ rows = [
+ {
+ kind = DELETE
+ fields = [1, "TEST", 20]
+ }
+ ]
+ }
+ ]
}
}
@@ -43,6 +59,7 @@ sink {
accessId = "ak"
accesskey = "sk"
endpoint = "http://maxcompute:8080"
+ tunnel_endpoint = "http://maxcompute:8080"
project = "mocked_mc"
table_name = "test_table_sink"
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/maxcompute_to_maxcompute.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/fake_maxcompute_upsert.conf
similarity index 69%
copy from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/maxcompute_to_maxcompute.conf
copy to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/fake_maxcompute_upsert.conf
index 7c2d49940b..42724897e4 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/maxcompute_to_maxcompute.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/fake_maxcompute_upsert.conf
@@ -25,13 +25,29 @@ env {
}
source {
- # This is a example source plugin **only for test and demonstrate the
feature source plugin**
- Maxcompute {
- accessId = "ak"
- accesskey = "sk"
- endpoint = "http://maxcompute:8080"
- project = "mocked_mc"
- table_name = "test_table"
+ FakeSource {
+ tables_configs = [
+ {
+ schema = {
+ table = "test_table_sink"
+ fields {
+ ID = int
+ NAME = string
+ AGE = int
+ }
+ primaryKey {
+ name = "ID"
+ columnNames = [ID]
+ }
+ }
+ rows = [
+ {
+ kind = UPDATE_AFTER
+ fields = [1, "UPSERT_TEST", 100]
+ }
+ ]
+ }
+ ]
}
}
@@ -43,6 +59,7 @@ sink {
accessId = "ak"
accesskey = "sk"
endpoint = "http://maxcompute:8080"
+ tunnel_endpoint = "http://maxcompute:8080"
project = "mocked_mc"
table_name = "test_table_sink"
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/maxcompute_to_maxcompute.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/maxcompute_to_maxcompute.conf
index 7c2d49940b..c995717b27 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/maxcompute_to_maxcompute.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/maxcompute_to_maxcompute.conf
@@ -25,13 +25,37 @@ env {
}
source {
- # This is a example source plugin **only for test and demonstrate the
feature source plugin**
- Maxcompute {
- accessId = "ak"
- accesskey = "sk"
- endpoint = "http://maxcompute:8080"
- project = "mocked_mc"
- table_name = "test_table"
+ FakeSource {
+ tables_configs = [
+ {
+ schema = {
+ table = "test_table_sink"
+ fields {
+ ID = int
+ NAME = string
+ AGE = int
+ }
+ primaryKey {
+ name = "ID"
+ columnNames = [ID]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "INSERT_TEST1", 20]
+ }
+ {
+ kind = INSERT
+ fields = [2, "INSERT_TEST2", 30]
+ }
+ {
+ kind = INSERT
+ fields = [3, "INSERT_TEST3", 40]
+ }
+ ]
+ }
+ ]
}
}
@@ -43,6 +67,7 @@ sink {
accessId = "ak"
accesskey = "sk"
endpoint = "http://maxcompute:8080"
+ tunnel_endpoint = "http://maxcompute:8080"
project = "mocked_mc"
table_name = "test_table_sink"
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/maxcompute_to_maxcompute_multi_table.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/maxcompute_to_maxcompute_multi_table.conf
index 68ad00f9e5..00b9d5e53c 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/maxcompute_to_maxcompute_multi_table.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/resources/maxcompute_to_maxcompute_multi_table.conf
@@ -25,19 +25,60 @@ env {
}
source {
- # This is a example source plugin **only for test and demonstrate the
feature source plugin**
- Maxcompute {
- accessId = "ak"
- accesskey = "sk"
- endpoint = "http://maxcompute:8080"
- project = "mocked_mc"
- table_list = [
+ FakeSource {
+ tables_configs = [
{
- table_name = "test_table"
- },
+ schema = {
+ table = "test_table"
+ fields {
+ ID = int
+ NAME = string
+ AGE = int
+ }
+ primaryKey {
+ name = "ID"
+ columnNames = [ID]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "INSERT_TEST1", 20]
+ }
+ {
+ kind = INSERT
+ fields = [2, "INSERT_TEST2", 30]
+ }
+ {
+ kind = INSERT
+ fields = [3, "INSERT_TEST3", 40]
+ }
+ ]
+ }
{
- table_name = "test_table_2"
+ schema = {
+ table = "test_table_2"
+ fields {
+ ID = int
+ NAME = string
+ AGE = int
+ }
+ primaryKey {
+ name = "ID"
+ columnNames = [ID]
+ }
}
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "INSERT_TEST1", 20]
+ }
+ {
+ kind = INSERT
+ fields = [2, "INSERT_TEST2", 30]
+ }
+ ]
+ }
]
}
}
@@ -50,6 +91,7 @@ sink {
accessId = "ak"
accesskey = "sk"
endpoint = "http://maxcompute:8080"
+ tunnel_endpoint = "http://maxcompute:8080"
project = "mocked_mc"
table_name = "${table_name}_sink"
}