This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e0c11e6c78 [Test][Spark] Make sure the value in spark not be reused.
(#5767)
e0c11e6c78 is described below
commit e0c11e6c7884880b1227d5ee7a64aa223e9a7fe0
Author: Jia Fan <[email protected]>
AuthorDate: Fri Nov 3 10:27:06 2023 +0800
[Test][Spark] Make sure the value in spark not be reused. (#5767)
---
pom.xml | 8 +-
seatunnel-dist/release-docs/LICENSE | 12 +-
.../spark/sink/SeaTunnelSinkWithBuffer.java | 144 ++++++++++
.../spark/sink/SeaTunnelSinkWithBufferWriter.java | 252 +++++++++++++++++
.../translation/spark/sink/SparkSinkTest.java | 298 +++++++++++++++++++++
tools/dependencies/known-dependencies.txt | 12 +-
6 files changed, 710 insertions(+), 16 deletions(-)
diff --git a/pom.xml b/pom.xml
index e744b342dd..41afca5217 100644
--- a/pom.xml
+++ b/pom.xml
@@ -59,8 +59,8 @@
<seatunnel.config.shade.version>2.1.1</seatunnel.config.shade.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
- <scala.version>2.11.12</scala.version>
- <scala.binary.version>2.11</scala.binary.version>
+ <scala.version>2.12.15</scala.version>
+ <scala.binary.version>2.12</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
@@ -84,7 +84,7 @@
<jersey.version>1.19</jersey.version>
<javax.servlet.jap.version>2.1</javax.servlet.jap.version>
<hadoop.binary.version>2.7</hadoop.binary.version>
- <jackson.version>2.12.6</jackson.version>
+ <jackson.version>2.13.3</jackson.version>
<lombok.version>1.18.24</lombok.version>
<commons-compress.version>1.20</commons-compress.version>
<skip.pmd.check>false</skip.pmd.check>
@@ -101,7 +101,7 @@
<elasticsearch6.client.version>6.3.1</elasticsearch6.client.version>
<elasticsearch7.client.version>7.5.1</elasticsearch7.client.version>
<flink-shaded-hadoop-2.version>2.7.5-7.0</flink-shaded-hadoop-2.version>
- <commons-lang3.version>3.4</commons-lang3.version>
+ <commons-lang3.version>3.5</commons-lang3.version>
<commons-io.version>2.11.0</commons-io.version>
<commons-collections4.version>4.4</commons-collections4.version>
<maven-assembly-plugin.version>3.3.0</maven-assembly-plugin.version>
diff --git a/seatunnel-dist/release-docs/LICENSE
b/seatunnel-dist/release-docs/LICENSE
index 5d19a35a87..08f41da2a5 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -225,7 +225,7 @@ The text of each license is the standard Apache 2.0 license.
(Apache License, Version 2.0) Apache Commons Compress
(org.apache.commons:commons-compress:1.20 -
https://commons.apache.org/proper/commons-compress/)
(The Apache Software License, Version 2.0) Commons Lang
(commons-lang:commons-lang:2.6 - http://commons.apache.org/lang/)
(Apache License, Version 2.0) Apache Commons IO
(commons-io:commons-io:2.11.0 - http://commons.apache.org/proper/commons-io/)
- (Apache License, Version 2.0) Apache Commons Lang
(org.apache.commons:commons-lang3:3.4 -
http://commons.apache.org/proper/commons-lang/)
+ (Apache License, Version 2.0) Apache Commons Lang
(org.apache.commons:commons-lang3:3.5 -
http://commons.apache.org/proper/commons-lang/)
(The Apache Software License, Version 2.0) Commons Pool
(commons-pool:commons-pool:1.6 - http://commons.apache.org/pool/)
(Apache License, Version 2.0) config (com.typesafe:config:1.3.3 -
https://github.com/lightbend/config)
(The Apache Software License, Version 2.0) Flink : Formats : Avro
(org.apache.flink:flink-avro:1.13.6 -
https://flink.apache.org/flink-formats/flink-avro)
@@ -236,11 +236,11 @@ The text of each license is the standard Apache 2.0
license.
(Apache License, Version 2.0) Flink : Tools : Force Shading
(org.apache.flink:force-shading:1.13.6 - https://www.apache.org/force-shading/)
(The Apache Software License, Version 2.0) Guava: Google Core Libraries
for Java (com.google.guava:guava:27.0-jre -
https://github.com/google/guava/guava)
(Apache License, Version 2.0) Hive Storage API
(org.apache.hive:hive-storage-api:2.6.0 -
https://www.apache.org/hive-storage-api/)
- (The Apache Software License, Version 2.0) Jackson-annotations
(com.fasterxml.jackson.core:jackson-annotations:2.12.6 -
http://github.com/FasterXML/jackson)
- (The Apache Software License, Version 2.0) Jackson-core
(com.fasterxml.jackson.core:jackson-core:2.12.6 -
https://github.com/FasterXML/jackson-core)
+ (The Apache Software License, Version 2.0) Jackson-annotations
(com.fasterxml.jackson.core:jackson-annotations:2.13.3 -
http://github.com/FasterXML/jackson)
+ (The Apache Software License, Version 2.0) Jackson-core
(com.fasterxml.jackson.core:jackson-core:2.13.3 -
https://github.com/FasterXML/jackson-core)
(The Apache Software License, Version 2.0) Jackson
(org.codehaus.jackson:jackson-core-asl:1.9.13 - http://jackson.codehaus.org)
- (The Apache Software License, Version 2.0) jackson-databind
(com.fasterxml.jackson.core:jackson-databind:2.12.6 -
http://github.com/FasterXML/jackson)
- (The Apache Software License, Version 2.0) Jackson-dataformat-properties
(com.fasterxml.jackson.dataformat:jackson-dataformat-properties:2.12.6 -
https://github.com/FasterXML/jackson-dataformats-text)
+ (The Apache Software License, Version 2.0) jackson-databind
(com.fasterxml.jackson.core:jackson-databind:2.13.3 -
http://github.com/FasterXML/jackson)
+ (The Apache Software License, Version 2.0) Jackson-dataformat-properties
(com.fasterxml.jackson.dataformat:jackson-dataformat-properties:2.13.3 -
https://github.com/FasterXML/jackson-dataformats-text)
(The Apache Software License, Version 2.0) Data Mapper for Jackson
(org.codehaus.jackson:jackson-mapper-asl:1.9.13 - http://jackson.codehaus.org)
(Apache License, Version 2.0) jcommander (com.beust:jcommander:1.81 -
https://jcommander.org)
(The Apache Software License, Version 2.0) FindBugs-jsr305
(com.google.code.findbugs:jsr305:1.3.9 - http://findbugs.sourceforge.net/)
@@ -292,7 +292,7 @@ The following components are provided under a BSD license.
See project link for
The text of each license is also included at licenses/LICENSE-[project].txt.
(New BSD license) Protocol Buffer Java API
(com.google.protobuf:protobuf-java:2.5.0 - http://code.google.com/p/protobuf)
- (BSD 3-Clause) Scala Library (org.scala-lang:scala-library:2.11.12 -
http://www.scala-lang.org/)
+ (BSD 3-Clause) Scala Library (org.scala-lang:scala-library:2.12.15 -
http://www.scala-lang.org/)
(BSD 3-Clause) Scala Library (org.ow2.asm:asm:9.1 -
https://mvnrepository.com/artifact/org.ow2.asm/asm/)
========================================================================
CDDL License
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelSinkWithBuffer.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelSinkWithBuffer.java
new file mode 100644
index 0000000000..e16a29466b
--- /dev/null
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelSinkWithBuffer.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.spark.sink;
+
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import java.io.IOException;
+
+public class SeaTunnelSinkWithBuffer implements SeaTunnelSink<SeaTunnelRow,
Void, Void, Void> {
+
+ @Override
+ public String getPluginName() {
+ return "SeaTunnelSinkWithBuffer";
+ }
+
+ @Override
+ public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+ return new SeaTunnelRowType(
+ new String[] {
+ "int",
+ "string",
+ "boolean",
+ "float",
+ "double",
+ "byte",
+ "short",
+ "long",
+ "decimal",
+ "date",
+ "timestamp",
+ "null",
+ "array_string",
+ "array_boolean",
+ "array_byte",
+ "array_short",
+ "array_int",
+ "array_long",
+ "array_float",
+ "array_double",
+ "map",
+ "row"
+ },
+ new SeaTunnelDataType[] {
+ BasicType.INT_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.BOOLEAN_TYPE,
+ BasicType.FLOAT_TYPE,
+ BasicType.DOUBLE_TYPE,
+ BasicType.BYTE_TYPE,
+ BasicType.SHORT_TYPE,
+ BasicType.LONG_TYPE,
+ new DecimalType(10, 2),
+ LocalTimeType.LOCAL_DATE_TYPE,
+ LocalTimeType.LOCAL_DATE_TIME_TYPE,
+ BasicType.VOID_TYPE,
+ ArrayType.STRING_ARRAY_TYPE,
+ ArrayType.BOOLEAN_ARRAY_TYPE,
+ ArrayType.BYTE_ARRAY_TYPE,
+ ArrayType.SHORT_ARRAY_TYPE,
+ ArrayType.INT_ARRAY_TYPE,
+ ArrayType.LONG_ARRAY_TYPE,
+ ArrayType.FLOAT_ARRAY_TYPE,
+ ArrayType.DOUBLE_ARRAY_TYPE,
+ new MapType<>(BasicType.STRING_TYPE,
BasicType.STRING_TYPE),
+ new SeaTunnelRowType(
+ new String[] {
+ "int",
+ "string",
+ "boolean",
+ "float",
+ "double",
+ "byte",
+ "short",
+ "long",
+ "decimal",
+ "date",
+ "timestamp",
+ "null",
+ "array_string",
+ "array_boolean",
+ "array_byte",
+ "array_short",
+ "array_int",
+ "array_long",
+ "array_float",
+ "array_double",
+ "map"
+ },
+ new SeaTunnelDataType[] {
+ BasicType.INT_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.BOOLEAN_TYPE,
+ BasicType.FLOAT_TYPE,
+ BasicType.DOUBLE_TYPE,
+ BasicType.BYTE_TYPE,
+ BasicType.SHORT_TYPE,
+ BasicType.LONG_TYPE,
+ new DecimalType(10, 2),
+ LocalTimeType.LOCAL_DATE_TYPE,
+ LocalTimeType.LOCAL_DATE_TIME_TYPE,
+ BasicType.VOID_TYPE,
+ ArrayType.STRING_ARRAY_TYPE,
+ ArrayType.BOOLEAN_ARRAY_TYPE,
+ ArrayType.BYTE_ARRAY_TYPE,
+ ArrayType.SHORT_ARRAY_TYPE,
+ ArrayType.INT_ARRAY_TYPE,
+ ArrayType.LONG_ARRAY_TYPE,
+ ArrayType.FLOAT_ARRAY_TYPE,
+ ArrayType.DOUBLE_ARRAY_TYPE,
+ new MapType<>(BasicType.STRING_TYPE,
BasicType.STRING_TYPE)
+ })
+ });
+ }
+
+ @Override
+ public SinkWriter<SeaTunnelRow, Void, Void>
createWriter(SinkWriter.Context context)
+ throws IOException {
+ return new SeaTunnelSinkWithBufferWriter();
+ }
+}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelSinkWithBufferWriter.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelSinkWithBufferWriter.java
new file mode 100644
index 0000000000..ca9aa86b66
--- /dev/null
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelSinkWithBufferWriter.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.spark.sink;
+
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.junit.jupiter.api.Assertions;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+public class SeaTunnelSinkWithBufferWriter implements SinkWriter<SeaTunnelRow,
Void, Void> {
+
+ private final List<Object[]> valueBuffer;
+
+ public SeaTunnelSinkWithBufferWriter() {
+ this.valueBuffer = new ArrayList<>();
+ }
+
+ @Override
+ public void write(SeaTunnelRow element) throws IOException {
+ valueBuffer.add(element.getFields());
+ if (valueBuffer.size() == 3) {
+ List<Object[]> expected =
+ Arrays.asList(
+ new Object[] {
+ 42,
+ "string1",
+ true,
+ 1.1f,
+ 33.33,
+ (byte) 1,
+ (short) 2,
+ Long.MAX_VALUE,
+ new BigDecimal("55.55"),
+ LocalDate.parse("2021-01-01"),
+ LocalDateTime.parse("2021-01-01T00:00:00"),
+ null,
+ new Object[] {"string1", "string2", "string3"},
+ new Object[] {true, false, true},
+ new Object[] {(byte) 1, (byte) 2, (byte) 3},
+ new Object[] {(short) 1, (short) 2, (short) 3},
+ new Object[] {1, 2, 3},
+ new Object[] {1L, 2L, 3L},
+ new Object[] {1.1f, 2.2f, 3.3f},
+ new Object[] {11.11, 22.22, 33.33},
+ new HashMap<String, String>() {
+ {
+ put("key1", "value1");
+ put("key2", "value2");
+ put("key3", "value3");
+ }
+ },
+ new SeaTunnelRow(
+ new Object[] {
+ 42,
+ "string1",
+ true,
+ 1.1f,
+ 33.33,
+ (byte) 1,
+ (short) 2,
+ Long.MAX_VALUE,
+ new BigDecimal("55.55"),
+ LocalDate.parse("2021-01-01"),
+
LocalDateTime.parse("2021-01-01T00:00:00"),
+ null,
+ new Object[] {"string1",
"string2", "string3"},
+ new Object[] {true, false, true},
+ new Object[] {(byte) 1, (byte) 2,
(byte) 3},
+ new Object[] {(short) 1, (short)
2, (short) 3},
+ new Object[] {1, 2, 3},
+ new Object[] {1L, 2L, 3L},
+ new Object[] {1.1f, 2.2f, 3.3f},
+ new Object[] {11.11, 22.22, 33.33},
+ new HashMap<String, String>() {
+ {
+ put("key1", "value1");
+ put("key2", "value2");
+ put("key3", "value3");
+ }
+ }
+ })
+ },
+ new Object[] {
+ 12,
+ "string2",
+ false,
+ 2.2f,
+ 43.33,
+ (byte) 5,
+ (short) 42,
+ Long.MAX_VALUE - 1,
+ new BigDecimal("25.55"),
+ LocalDate.parse("2011-01-01"),
+ LocalDateTime.parse("2020-01-01T00:00:00"),
+ null,
+ new Object[] {"string3", "string2", "string1"},
+ new Object[] {true, false, false},
+ new Object[] {(byte) 3, (byte) 4, (byte) 5},
+ new Object[] {(short) 2, (short) 6, (short) 8},
+ new Object[] {2, 4, 6},
+ new Object[] {643634L, 421412L, 543543L},
+ new Object[] {1.24f, 21.2f, 32.3f},
+ new Object[] {421.11, 5322.22, 323.33},
+ new HashMap<String, String>() {
+ {
+ put("key2", "value534");
+ put("key3", "value3");
+ put("key4", "value43");
+ }
+ },
+ new SeaTunnelRow(
+ new Object[] {
+ 12,
+ "string2",
+ false,
+ 2.2f,
+ 43.33,
+ (byte) 5,
+ (short) 42,
+ Long.MAX_VALUE - 1,
+ new BigDecimal("25.55"),
+ LocalDate.parse("2011-01-01"),
+
LocalDateTime.parse("2020-01-01T00:00:00"),
+ null,
+ new Object[] {"string3",
"string2", "string1"},
+ new Object[] {true, false, false},
+ new Object[] {(byte) 3, (byte) 4,
(byte) 5},
+ new Object[] {(short) 2, (short)
6, (short) 8},
+ new Object[] {2, 4, 6},
+ new Object[] {643634L, 421412L,
543543L},
+ new Object[] {1.24f, 21.2f, 32.3f},
+ new Object[] {421.11, 5322.22,
323.33},
+ new HashMap<String, String>() {
+ {
+ put("key2", "value534");
+ put("key3", "value3");
+ put("key4", "value43");
+ }
+ }
+ })
+ },
+ new Object[] {
+ 233,
+ "string3",
+ true,
+ 231.1f,
+ 3533.33,
+ (byte) 7,
+ (short) 2,
+ Long.MAX_VALUE - 2,
+ new BigDecimal("65.55"),
+ LocalDate.parse("2001-01-01"),
+ LocalDateTime.parse("2031-01-01T00:00:00"),
+ null,
+ new Object[] {"string1fsa", "stringdsa2",
"strfdsaing3"},
+ new Object[] {false, true, true},
+ new Object[] {(byte) 6, (byte) 2, (byte) 1},
+ new Object[] {(short) 7, (short) 8, (short) 9},
+ new Object[] {3, 77, 22},
+ new Object[] {143L, 642L, 533L},
+ new Object[] {24.1f, 54.2f, 1.3f},
+ new Object[] {431.11, 2422.22, 3243.33},
+ new HashMap<String, String>() {
+ {
+ put("keyfs1", "valfdsue1");
+ put("kedfasy2", "vafdslue2");
+ put("kefdsay3", "vfdasalue3");
+ }
+ },
+ new SeaTunnelRow(
+ new Object[] {
+ 233,
+ "string3",
+ true,
+ 231.1f,
+ 3533.33,
+ (byte) 7,
+ (short) 2,
+ Long.MAX_VALUE - 2,
+ new BigDecimal("65.55"),
+ LocalDate.parse("2001-01-01"),
+
LocalDateTime.parse("2031-01-01T00:00:00"),
+ null,
+ new Object[] {
+ "string1fsa", "stringdsa2",
"strfdsaing3"
+ },
+ new Object[] {false, true, true},
+ new Object[] {(byte) 6, (byte) 2,
(byte) 1},
+ new Object[] {(short) 7, (short)
8, (short) 9},
+ new Object[] {3, 77, 22},
+ new Object[] {143L, 642L, 533L},
+ new Object[] {24.1f, 54.2f, 1.3f},
+ new Object[] {431.11, 2422.22,
3243.33},
+ new HashMap<String, String>() {
+ {
+ put("keyfs1", "valfdsue1");
+ put("kedfasy2",
"vafdslue2");
+ put("kefdsay3",
"vfdasalue3");
+ }
+ }
+ })
+ });
+ for (int i = 0; i < expected.size(); i++) {
+ Object[] values = expected.get(i);
+ Object[] actual = valueBuffer.get(i);
+ for (int v = 0; v < values.length; v++) {
+ if (values[v] instanceof Object[]) {
+ Assertions.assertArrayEquals((Object[]) values[v],
(Object[]) actual[v]);
+ } else {
+ Assertions.assertEquals(values[v], actual[v]);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public Optional<Void> prepareCommit() throws IOException {
+ return Optional.empty();
+ }
+
+ @Override
+ public void abortPrepare() {}
+
+ @Override
+ public void close() throws IOException {}
+}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SparkSinkTest.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SparkSinkTest.java
new file mode 100644
index 0000000000..d22789e1be
--- /dev/null
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SparkSinkTest.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.spark.sink;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.expressions.GenericRow;
+import org.apache.spark.sql.types.ArrayType;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.MapType;
+import org.apache.spark.sql.types.StructType;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnJre;
+import org.junit.jupiter.api.condition.JRE;
+
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.util.Arrays;
+import java.util.HashMap;
+
+import static org.apache.spark.sql.types.DataTypes.BooleanType;
+import static org.apache.spark.sql.types.DataTypes.ByteType;
+import static org.apache.spark.sql.types.DataTypes.DateType;
+import static org.apache.spark.sql.types.DataTypes.DoubleType;
+import static org.apache.spark.sql.types.DataTypes.FloatType;
+import static org.apache.spark.sql.types.DataTypes.IntegerType;
+import static org.apache.spark.sql.types.DataTypes.LongType;
+import static org.apache.spark.sql.types.DataTypes.NullType;
+import static org.apache.spark.sql.types.DataTypes.ShortType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+import static org.apache.spark.sql.types.DataTypes.TimestampType;
+
+public class SparkSinkTest {
+
+ @Test
+ @DisabledOnJre(
+ value = JRE.JAVA_11,
+ disabledReason =
+ "We should update apache common lang3 version to 3.8 to
avoid NPE, "
+ + "see
https://github.com/apache/commons-lang/commit/50ce8c44e1601acffa39f5568f0fc140aade0564")
+ public void testSparkSinkWriteDataWithCopy() {
+ // We should make sure that the data is written to the sink with copy.
+ SparkSession spark =
+ SparkSession.builder()
+ .master("local")
+ .appName("testSparkSinkWriteDataWithCopy")
+ .getOrCreate();
+ StructType structType =
+ new StructType()
+ .add("int", IntegerType)
+ .add("string", StringType)
+ .add("boolean", BooleanType)
+ .add("float", FloatType)
+ .add("double", DoubleType)
+ .add("byte", ByteType)
+ .add("short", ShortType)
+ .add("long", LongType)
+ .add("decimal", new DecimalType(10, 2))
+ .add("date", DateType)
+ // .add("time", TimeType) unsupported time type in
Spark 3.3.0. Please trace
+ // https://issues.apache.org/jira/browse/SPARK-41549
+ .add("timestamp", TimestampType)
+ .add("null", NullType)
+ .add("array_string", new ArrayType(StringType, true))
+ .add("array_boolean", new ArrayType(BooleanType, true))
+ .add("array_byte", new ArrayType(ByteType, true))
+ .add("array_short", new ArrayType(ShortType, true))
+ .add("array_int", new ArrayType(IntegerType, true))
+ .add("array_long", new ArrayType(LongType, true))
+ .add("array_float", new ArrayType(FloatType, true))
+ .add("array_double", new ArrayType(DoubleType, true))
+ .add("map", new MapType(StringType, StringType, true));
+
+ GenericRow row1 =
+ new GenericRow(
+ new Object[] {
+ 42,
+ "string1",
+ true,
+ 1.1f,
+ 33.33,
+ (byte) 1,
+ (short) 2,
+ Long.MAX_VALUE,
+ new BigDecimal("55.55"),
+ LocalDate.parse("2021-01-01"),
+ Timestamp.valueOf("2021-01-01 00:00:00"),
+ null,
+ Arrays.asList("string1", "string2", "string3"),
+ Arrays.asList(true, false, true),
+ Arrays.asList((byte) 1, (byte) 2, (byte) 3),
+ Arrays.asList((short) 1, (short) 2, (short) 3),
+ Arrays.asList(1, 2, 3),
+ Arrays.asList(1L, 2L, 3L),
+ Arrays.asList(1.1f, 2.2f, 3.3f),
+ Arrays.asList(11.11, 22.22, 33.33),
+ new HashMap<String, String>() {
+ {
+ put("key1", "value1");
+ put("key2", "value2");
+ put("key3", "value3");
+ }
+ }
+ });
+
+ GenericRow row1WithRow =
+ new GenericRow(
+ new Object[] {
+ 42,
+ "string1",
+ true,
+ 1.1f,
+ 33.33,
+ (byte) 1,
+ (short) 2,
+ Long.MAX_VALUE,
+ new BigDecimal("55.55"),
+ LocalDate.parse("2021-01-01"),
+ Timestamp.valueOf("2021-01-01 00:00:00"),
+ null,
+ Arrays.asList("string1", "string2", "string3"),
+ Arrays.asList(true, false, true),
+ Arrays.asList((byte) 1, (byte) 2, (byte) 3),
+ Arrays.asList((short) 1, (short) 2, (short) 3),
+ Arrays.asList(1, 2, 3),
+ Arrays.asList(1L, 2L, 3L),
+ Arrays.asList(1.1f, 2.2f, 3.3f),
+ Arrays.asList(11.11, 22.22, 33.33),
+ new HashMap<String, String>() {
+ {
+ put("key1", "value1");
+ put("key2", "value2");
+ put("key3", "value3");
+ }
+ },
+ row1
+ });
+
+ GenericRow row2 =
+ new GenericRow(
+ new Object[] {
+ 12,
+ "string2",
+ false,
+ 2.2f,
+ 43.33,
+ (byte) 5,
+ (short) 42,
+ Long.MAX_VALUE - 1,
+ new BigDecimal("25.55"),
+ LocalDate.parse("2011-01-01"),
+ Timestamp.valueOf("2020-01-01 00:00:00"),
+ null,
+ Arrays.asList("string3", "string2", "string1"),
+ Arrays.asList(true, false, false),
+ Arrays.asList((byte) 3, (byte) 4, (byte) 5),
+ Arrays.asList((short) 2, (short) 6, (short) 8),
+ Arrays.asList(2, 4, 6),
+ Arrays.asList(643634L, 421412L, 543543L),
+ Arrays.asList(1.24f, 21.2f, 32.3f),
+ Arrays.asList(421.11, 5322.22, 323.33),
+ new HashMap<String, String>() {
+ {
+ put("key2", "value534");
+ put("key3", "value3");
+ put("key4", "value43");
+ }
+ }
+ });
+
+ GenericRow row2WithRow =
+ new GenericRow(
+ new Object[] {
+ 12,
+ "string2",
+ false,
+ 2.2f,
+ 43.33,
+ (byte) 5,
+ (short) 42,
+ Long.MAX_VALUE - 1,
+ new BigDecimal("25.55"),
+ LocalDate.parse("2011-01-01"),
+ Timestamp.valueOf("2020-01-01 00:00:00"),
+ null,
+ Arrays.asList("string3", "string2", "string1"),
+ Arrays.asList(true, false, false),
+ Arrays.asList((byte) 3, (byte) 4, (byte) 5),
+ Arrays.asList((short) 2, (short) 6, (short) 8),
+ Arrays.asList(2, 4, 6),
+ Arrays.asList(643634L, 421412L, 543543L),
+ Arrays.asList(1.24f, 21.2f, 32.3f),
+ Arrays.asList(421.11, 5322.22, 323.33),
+ new HashMap<String, String>() {
+ {
+ put("key2", "value534");
+ put("key3", "value3");
+ put("key4", "value43");
+ }
+ },
+ row2
+ });
+
+ GenericRow row3 =
+ new GenericRow(
+ new Object[] {
+ 233,
+ "string3",
+ true,
+ 231.1f,
+ 3533.33,
+ (byte) 7,
+ (short) 2,
+ Long.MAX_VALUE - 2,
+ new BigDecimal("65.55"),
+ LocalDate.parse("2001-01-01"),
+ Timestamp.valueOf("2031-01-01 00:00:00"),
+ null,
+ Arrays.asList("string1fsa", "stringdsa2",
"strfdsaing3"),
+ Arrays.asList(false, true, true),
+ Arrays.asList((byte) 6, (byte) 2, (byte) 1),
+ Arrays.asList((short) 7, (short) 8, (short) 9),
+ Arrays.asList(3, 77, 22),
+ Arrays.asList(143L, 642L, 533L),
+ Arrays.asList(24.1f, 54.2f, 1.3f),
+ Arrays.asList(431.11, 2422.22, 3243.33),
+ new HashMap<String, String>() {
+ {
+ put("keyfs1", "valfdsue1");
+ put("kedfasy2", "vafdslue2");
+ put("kefdsay3", "vfdasalue3");
+ }
+ }
+ });
+
+ GenericRow row3WithRow =
+ new GenericRow(
+ new Object[] {
+ 233,
+ "string3",
+ true,
+ 231.1f,
+ 3533.33,
+ (byte) 7,
+ (short) 2,
+ Long.MAX_VALUE - 2,
+ new BigDecimal("65.55"),
+ LocalDate.parse("2001-01-01"),
+ Timestamp.valueOf("2031-01-01 00:00:00"),
+ null,
+ Arrays.asList("string1fsa", "stringdsa2",
"strfdsaing3"),
+ Arrays.asList(false, true, true),
+ Arrays.asList((byte) 6, (byte) 2, (byte) 1),
+ Arrays.asList((short) 7, (short) 8, (short) 9),
+ Arrays.asList(3, 77, 22),
+ Arrays.asList(143L, 642L, 533L),
+ Arrays.asList(24.1f, 54.2f, 1.3f),
+ Arrays.asList(431.11, 2422.22, 3243.33),
+ new HashMap<String, String>() {
+ {
+ put("keyfs1", "valfdsue1");
+ put("kedfasy2", "vafdslue2");
+ put("kefdsay3", "vfdasalue3");
+ }
+ },
+ row3
+ });
+
+ Dataset<Row> dataset =
+ spark.createDataFrame(
+ Arrays.asList(row1WithRow, row2WithRow, row3WithRow),
+ structType.add("row", structType));
+ SparkSinkInjector.inject(dataset.write(), new
SeaTunnelSinkWithBuffer())
+ .option("checkpointLocation", "/tmp")
+ .mode(SaveMode.Append)
+ .save();
+ spark.close();
+ }
+}
diff --git a/tools/dependencies/known-dependencies.txt
b/tools/dependencies/known-dependencies.txt
index 024961241b..60aea7fa79 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -2,15 +2,15 @@ commons-codec-1.13.jar
commons-collections4-4.4.jar
commons-compress-1.20.jar
commons-io-2.11.0.jar
-commons-lang3-3.4.jar
+commons-lang3-3.5.jar
config-1.3.3.jar
disruptor-3.4.4.jar
guava-27.0-jre.jar
hazelcast-5.1.jar
-jackson-annotations-2.12.6.jar
-jackson-core-2.12.6.jar
-jackson-databind-2.12.6.jar
-jackson-dataformat-properties-2.12.6.jar
+jackson-annotations-2.13.3.jar
+jackson-core-2.13.3.jar
+jackson-databind-2.13.3.jar
+jackson-dataformat-properties-2.13.3.jar
jcl-over-slf4j-1.7.25.jar
jcommander-1.81.jar
log4j-api-2.17.1.jar
@@ -21,7 +21,7 @@ protostuff-api-1.8.0.jar
protostuff-collectionschema-1.8.0.jar
protostuff-core-1.8.0.jar
protostuff-runtime-1.8.0.jar
-scala-library-2.11.12.jar
+scala-library-2.12.15.jar
seatunnel-jackson-2.3.4-SNAPSHOT-optional.jar
seatunnel-guava-2.3.4-SNAPSHOT-optional.jar
slf4j-api-1.7.25.jar