This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 5f07e8f [improve] add doris sink itcase (#297)
5f07e8f is described below
commit 5f07e8fca0598e9fc38720c2da4d0816440c0e3c
Author: wudi <[email protected]>
AuthorDate: Mon Mar 31 09:57:26 2025 +0800
[improve] add doris sink itcase (#297)
---
.github/workflows/run-e2ecase.yml | 52 +++++
.../client/write/AbstractStreamLoadProcessor.java | 40 ++--
.../spark/client/write/StreamLoadProcessor.java | 10 +-
.../apache/doris/spark/config/DorisOptions.java | 2 +-
.../org/apache/doris/spark/util/EscapeHandler.java | 41 ++++
.../org/apache/doris/spark/util/HttpUtils.scala | 9 +-
.../apache/doris/spark/util/RowConvertors.scala | 3 +-
.../spark/container/AbstractContainerTestBase.java | 63 ++++++
.../spark/container/instance/ContainerService.java | 2 +
.../spark/container/instance/DorisContainer.java | 5 +
.../container/instance/DorisCustomerContainer.java | 5 +
.../doris/spark/sql/Doris2DorisE2ECase.scala | 110 ++++++++++
.../doris/spark/sql/DorisCatalogITCase.scala | 137 +++++++++++++
.../apache/doris/spark/sql/DorisReaderITCase.scala | 5 +-
.../spark/sql/DorisWriterFailoverITCase.scala | 220 ++++++++++++++++++++
.../apache/doris/spark/sql/DorisWriterITCase.scala | 222 +++++++++++++++++++--
.../resources/container/ddl/write_all_type.sql | 31 +++
.../{log4j.properties => log4j2-test.properties} | 11 +-
.../apache/doris/spark/write/DorisDataWriter.scala | 5 +-
.../org/apache/doris/spark/write/DorisWrite.scala | 2 -
20 files changed, 918 insertions(+), 57 deletions(-)
diff --git a/.github/workflows/run-e2ecase.yml
b/.github/workflows/run-e2ecase.yml
new file mode 100644
index 0000000..2f2949b
--- /dev/null
+++ b/.github/workflows/run-e2ecase.yml
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+---
+name: Run E2ECases
+on:
+ pull_request:
+ push:
+
+jobs:
+ build-extension:
+ name: "Run E2ECases"
+ runs-on: ubuntu-latest
+ defaults:
+ run:
+ shell: bash
+ steps:
+ - name: Checkout
+ uses: actions/checkout@master
+
+ - name: Setup java
+ uses: actions/setup-java@v2
+ with:
+ distribution: adopt
+ java-version: '8'
+
+ - name: Run E2ECases for spark 2
+ run: |
+ cd spark-doris-connector && mvn clean test -Pspark-2-it,spark-2.4_2.11
-pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*E2ECase"
-Dimage="apache/doris:doris-all-in-one-2.1.0"
+
+ - name: Run E2ECases for spark 3.1
+ run: |
+ cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.1 -pl
spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*E2ECase"
-Dimage="apache/doris:doris-all-in-one-2.1.0"
+
+ - name: Run E2ECases for spark 3.3
+ run: |
+ cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.3 -pl
spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*E2ECase"
-Dimage="apache/doris:doris-all-in-one-2.1.0"
+
\ No newline at end of file
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
index 2a10ffa..37d3a48 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
@@ -17,8 +17,6 @@
package org.apache.doris.spark.client.write;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.json.JsonMapper;
import org.apache.doris.spark.client.DorisBackendHttpClient;
import org.apache.doris.spark.client.DorisFrontendClient;
import org.apache.doris.spark.client.entity.Backend;
@@ -27,8 +25,12 @@ import org.apache.doris.spark.config.DorisConfig;
import org.apache.doris.spark.config.DorisOptions;
import org.apache.doris.spark.exception.OptionRequiredException;
import org.apache.doris.spark.exception.StreamLoadException;
+import org.apache.doris.spark.util.EscapeHandler;
import org.apache.doris.spark.util.HttpUtils;
import org.apache.doris.spark.util.URLs;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.json.JsonMapper;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpStatus;
@@ -110,6 +112,7 @@ public abstract class AbstractStreamLoadProcessor<R>
extends DorisWriter<R> impl
private transient ExecutorService executor;
private Future<CloseableHttpResponse> requestFuture = null;
+ private volatile String currentLabel;
public AbstractStreamLoadProcessor(DorisConfig config) throws Exception {
super(config.getValue(DorisOptions.DORIS_SINK_BATCH_SIZE));
@@ -129,7 +132,7 @@ public abstract class AbstractStreamLoadProcessor<R>
extends DorisWriter<R> impl
this.isGzipCompressionEnabled =
properties.containsKey("compress_type") &&
"gzip".equals(properties.get("compress_type"));
if (properties.containsKey(GROUP_COMMIT)) {
String message = "";
- if (!isTwoPhaseCommitEnabled) message = "group commit does not
support two-phase commit";
+ if (isTwoPhaseCommitEnabled) message = "group commit does not
support two-phase commit";
if (properties.containsKey(PARTIAL_COLUMNS) &&
"true".equalsIgnoreCase(properties.get(PARTIAL_COLUMNS)))
message = "group commit does not support partial column
updates";
if
(!VALID_GROUP_MODE.contains(properties.get(GROUP_COMMIT).toLowerCase()))
@@ -166,6 +169,7 @@ public abstract class AbstractStreamLoadProcessor<R>
extends DorisWriter<R> impl
output.write(toArrowFormat(rs));
}
output.close();
+ logger.info("stream load stopped with {}", currentLabel != null ?
currentLabel : "group commit");
CloseableHttpResponse res = requestFuture.get();
if (res.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
throw new StreamLoadException("stream load execute failed,
status: " + res.getStatusLine().getStatusCode()
@@ -194,13 +198,13 @@ public abstract class AbstractStreamLoadProcessor<R>
extends DorisWriter<R> impl
} catch (OptionRequiredException e) {
throw new RuntimeException("stream load handle commit
props failed", e);
}
- try {
- CloseableHttpResponse response =
httpClient.execute(httpPut);
+ try(CloseableHttpResponse response =
httpClient.execute(httpPut)){
if (response.getStatusLine().getStatusCode() !=
HttpStatus.SC_OK) {
throw new RuntimeException("commit transaction failed,
transaction: " + msg
+ ", status: " +
response.getStatusLine().getStatusCode()
+ ", reason: " +
response.getStatusLine().getReasonPhrase());
}
+ logger.info("commit response: {}",
EntityUtils.toString(response.getEntity()));
} catch (IOException e) {
throw new RuntimeException("commit transaction failed,
transaction: " + msg, e);
}
@@ -221,13 +225,13 @@ public abstract class AbstractStreamLoadProcessor<R>
extends DorisWriter<R> impl
} catch (OptionRequiredException e) {
throw new RuntimeException("stream load handle abort props
failed", e);
}
- try {
- CloseableHttpResponse response =
httpClient.execute(httpPut);
+ try(CloseableHttpResponse response =
httpClient.execute(httpPut)){
if (response.getStatusLine().getStatusCode() !=
HttpStatus.SC_OK) {
throw new RuntimeException("abort transaction failed,
transaction: " + msg
+ ", status: " +
response.getStatusLine().getStatusCode()
+ ", reason: " +
response.getStatusLine().getReasonPhrase());
}
+ logger.info("abort response: {}",
EntityUtils.toString(response.getEntity()));
} catch (IOException e) {
throw new RuntimeException("abort transaction failed,
transaction: " + msg, e);
}
@@ -274,8 +278,8 @@ public abstract class AbstractStreamLoadProcessor<R>
extends DorisWriter<R> impl
private void handleStreamLoadProperties(HttpPut httpPut) throws
OptionRequiredException {
addCommonHeaders(httpPut);
if (groupCommit == null || groupCommit.equals("off_mode")) {
- String label = generateStreamLoadLabel();
- httpPut.setHeader("label", label);
+ currentLabel = generateStreamLoadLabel();
+ httpPut.setHeader("label", currentLabel);
}
String writeFields = getWriteFields();
httpPut.setHeader("columns", writeFields);
@@ -286,20 +290,12 @@ public abstract class AbstractStreamLoadProcessor<R>
extends DorisWriter<R> impl
switch (format.toLowerCase()) {
case "csv":
- if (!properties.containsKey("column_separator")) {
- properties.put("column_separator", "\t");
- }
- columnSeparator = properties.get("column_separator");
- if (!properties.containsKey("line_delimiter")) {
- properties.put("line_delimiter", "\n");
- }
- lineDelimiter = properties.get("line_delimiter");
+ // Handling hidden delimiters
+ columnSeparator =
EscapeHandler.escapeString(properties.getOrDefault("column_separator", "\t"));
+ lineDelimiter =
EscapeHandler.escapeString(properties.getOrDefault("line_delimiter", "\n"));
break;
case "json":
- if (!properties.containsKey("line_delimiter")) {
- properties.put("line_delimiter", "\n");
- }
- lineDelimiter = properties.get("line_delimiter");
+ lineDelimiter = properties.getOrDefault("line_delimiter",
"\n");
properties.put("read_json_by_line", "true");
break;
}
@@ -346,6 +342,8 @@ public abstract class AbstractStreamLoadProcessor<R>
extends DorisWriter<R> impl
entity = new GzipCompressingEntity(entity);
}
httpPut.setEntity(entity);
+
+ logger.info("table {}.{} stream load started for {} on host {}:{}",
database, table, currentLabel != null ? currentLabel : "group commit", host,
port);
return getExecutors().submit(() -> client.execute(httpPut));
}
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/StreamLoadProcessor.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/StreamLoadProcessor.java
index 2f787a5..97ef1c0 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/StreamLoadProcessor.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/StreamLoadProcessor.java
@@ -16,15 +16,17 @@
// under the License.
package org.apache.doris.spark.client.write;
+import org.apache.doris.spark.config.DorisConfig;
+import org.apache.doris.spark.config.DorisOptions;
+import org.apache.doris.spark.exception.OptionRequiredException;
+import org.apache.doris.spark.util.RowConvertors;
+
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.types.pojo.Schema;
-import org.apache.doris.spark.config.DorisConfig;
-import org.apache.doris.spark.config.DorisOptions;
-import org.apache.doris.spark.exception.OptionRequiredException;
-import org.apache.doris.spark.util.RowConvertors;
+import org.apache.spark.SparkContext;
import org.apache.spark.TaskContext;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.arrow.ArrowWriter;
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
index 5b666f4..4319688 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
@@ -81,7 +81,7 @@ public class DorisOptions {
*/
public static final ConfigOption<Boolean> DORIS_SINK_TASK_USE_REPARTITION
=
ConfigOptions.name("doris.sink.task.use.repartition").booleanType().defaultValue(false).withDescription("");
- public static final ConfigOption<Integer> DORIS_SINK_BATCH_INTERVAL_MS =
ConfigOptions.name("doris.sink.batch.interval.ms").intType().defaultValue(50).withDescription("");
+ public static final ConfigOption<Integer> DORIS_SINK_BATCH_INTERVAL_MS =
ConfigOptions.name("doris.sink.batch.interval.ms").intType().defaultValue(0).withDescription("");
public static final ConfigOption<Boolean> DORIS_SINK_ENABLE_2PC =
ConfigOptions.name("doris.sink.enable-2pc").booleanType().defaultValue(false).withDescription("");
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/util/EscapeHandler.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/util/EscapeHandler.java
new file mode 100644
index 0000000..436658d
--- /dev/null
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/util/EscapeHandler.java
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.util;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/** Handler for escape in properties. */
+public class EscapeHandler {
+ public static final String ESCAPE_DELIMITERS_FLAGS = "\\x";
+ public static final Pattern ESCAPE_PATTERN =
Pattern.compile("\\\\x([0-9|a-f|A-F]{2})");
+
+ public static String escapeString(String source) {
+ if (source.contains(ESCAPE_DELIMITERS_FLAGS)) {
+ Matcher m = ESCAPE_PATTERN.matcher(source);
+ StringBuffer buf = new StringBuffer();
+ while (m.find()) {
+ m.appendReplacement(
+ buf, String.format("%s", (char)
Integer.parseInt(m.group(1), 16)));
+ }
+ m.appendTail(buf);
+ return buf.toString();
+ }
+ return source;
+ }
+}
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/HttpUtils.scala
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/HttpUtils.scala
index 0a11c2e..90031f2 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/HttpUtils.scala
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/HttpUtils.scala
@@ -22,6 +22,7 @@ import org.apache.http.HttpHeaders
import org.apache.http.client.methods.HttpRequestBase
import org.apache.http.conn.ssl.{SSLConnectionSocketFactory, TrustAllStrategy}
import org.apache.http.impl.client.{CloseableHttpClient,
DefaultRedirectStrategy, HttpClients}
+import org.apache.http.protocol.HttpRequestExecutor
import org.apache.http.ssl.SSLContexts
import java.io.{File, FileInputStream}
@@ -33,9 +34,11 @@ import scala.util.{Failure, Success, Try}
object HttpUtils {
def getHttpClient(config: DorisConfig): CloseableHttpClient = {
- val builder = HttpClients.custom().setRedirectStrategy(new
DefaultRedirectStrategy {
- override def isRedirectable(method: String): Boolean = true
- })
+ val builder = HttpClients.custom()
+ .setRequestExecutor(new HttpRequestExecutor(60000))
+ .setRedirectStrategy(new DefaultRedirectStrategy {
+ override def isRedirectable(method: String): Boolean = true
+ })
val enableHttps = config.getValue(DorisOptions.DORIS_ENABLE_HTTPS)
if (enableHttps) {
require(config.contains(DorisOptions.DORIS_HTTPS_KEY_STORE_PATH))
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala
index b75d1ce..1ae0996 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala
@@ -40,7 +40,8 @@ object RowConvertors {
def convertToCsv(row: InternalRow, schema: StructType, sep: String): String
= {
(0 until schema.length).map(i => {
- asScalaValue(row, schema.fields(i).dataType, i)
+ val value = asScalaValue(row, schema.fields(i).dataType, i)
+ if (value == null) NULL_VALUE else value
}).mkString(sep)
}
diff --git
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/AbstractContainerTestBase.java
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/AbstractContainerTestBase.java
index 97e7e26..c9b9768 100644
---
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/AbstractContainerTestBase.java
+++
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/AbstractContainerTestBase.java
@@ -21,6 +21,13 @@ import
org.apache.doris.spark.container.instance.ContainerService;
import org.apache.doris.spark.container.instance.DorisContainer;
import org.apache.doris.spark.container.instance.DorisCustomerContainer;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -28,6 +35,8 @@ import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.util.List;
import java.util.Objects;
@@ -79,6 +88,10 @@ public abstract class AbstractContainerTestBase {
return dorisContainerService.getPassword();
}
+ protected int getQueryPort() {
+ return dorisContainerService.getQueryPort();
+ }
+
protected String getDorisQueryUrl() {
return dorisContainerService.getJdbcUrl();
}
@@ -115,4 +128,54 @@ public abstract class AbstractContainerTestBase {
assertEquals(expected.size(), actual.size());
assertArrayEquals(expected.toArray(new Object[0]), actual.toArray(new
Object[0]));
}
+
+ protected void faultInjectionOpen() throws IOException {
+ String pointName = "FlushToken.submit_flush_error";
+ String apiUrl =
+ String.format(
+ "http://%s/api/debug_point/add/%s",
+ dorisContainerService.getBenodes(), pointName);
+ HttpPost httpPost = new HttpPost(apiUrl);
+ httpPost.addHeader(
+ HttpHeaders.AUTHORIZATION,
+ auth(dorisContainerService.getUsername(),
dorisContainerService.getPassword()));
+ try (CloseableHttpClient httpClient = HttpClients.custom().build()) {
+ try (CloseableHttpResponse response =
httpClient.execute(httpPost)) {
+ int statusCode = response.getStatusLine().getStatusCode();
+ String reason = response.getStatusLine().toString();
+ if (statusCode == 200 && response.getEntity() != null) {
+ LOG.info("Debug point response {}",
EntityUtils.toString(response.getEntity()));
+ } else {
+ LOG.info("Debug point failed, statusCode: {}, reason: {}",
statusCode, reason);
+ }
+ }
+ }
+ }
+
+ protected void faultInjectionClear() throws IOException {
+ String apiUrl =
+ String.format(
+ "http://%s/api/debug_point/clear",
dorisContainerService.getBenodes());
+ HttpPost httpPost = new HttpPost(apiUrl);
+ httpPost.addHeader(
+ HttpHeaders.AUTHORIZATION,
+ auth(dorisContainerService.getUsername(),
dorisContainerService.getPassword()));
+ try (CloseableHttpClient httpClient = HttpClients.custom().build()) {
+ try (CloseableHttpResponse response =
httpClient.execute(httpPost)) {
+ int statusCode = response.getStatusLine().getStatusCode();
+ String reason = response.getStatusLine().toString();
+ if (statusCode == 200 && response.getEntity() != null) {
+ LOG.info("Debug point response {}",
EntityUtils.toString(response.getEntity()));
+ } else {
+ LOG.info("Debug point failed, statusCode: {}, reason: {}",
statusCode, reason);
+ }
+ }
+ }
+ }
+
+ protected String auth(String user, String password) {
+ final String authInfo = user + ":" + password;
+ byte[] encoded =
Base64.encodeBase64(authInfo.getBytes(StandardCharsets.UTF_8));
+ return "Basic " + new String(encoded);
+ }
}
diff --git
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/ContainerService.java
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/ContainerService.java
index 3ec7ee5..f8ec293 100644
---
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/ContainerService.java
+++
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/ContainerService.java
@@ -49,4 +49,6 @@ public interface ContainerService {
String getBenodes();
void close();
+
+ int getQueryPort();
}
diff --git
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisContainer.java
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisContainer.java
index 7c9297e..5220876 100644
---
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisContainer.java
+++
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisContainer.java
@@ -193,6 +193,11 @@ public class DorisContainer implements ContainerService {
LOG.info("Doris container closed successfully.");
}
+ @Override
+ public int getQueryPort() {
+ return 9030;
+ }
+
private void initializeJDBCDriver() throws MalformedURLException {
URLClassLoader urlClassLoader =
new URLClassLoader(
diff --git
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisCustomerContainer.java
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisCustomerContainer.java
index 4ba4e74..4f64754 100644
---
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisCustomerContainer.java
+++
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisCustomerContainer.java
@@ -135,4 +135,9 @@ public class DorisCustomerContainer implements
ContainerService {
@Override
public void close() {}
+
+ @Override
+ public int getQueryPort() {
+ return Integer.valueOf(System.getProperty("doris_query_port"));
+ }
}
diff --git
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/Doris2DorisE2ECase.scala
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/Doris2DorisE2ECase.scala
new file mode 100644
index 0000000..032195d
--- /dev/null
+++
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/Doris2DorisE2ECase.scala
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.sql
+
+import
org.apache.doris.spark.container.AbstractContainerTestBase.getDorisQueryConnection
+import org.apache.doris.spark.container.{AbstractContainerTestBase,
ContainerUtils}
+import org.apache.spark.sql.SparkSession
+import org.junit.{Before, Test}
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.slf4j.LoggerFactory
+
+import java.util
+
+object Doris2DorisE2ECase {
+ @Parameterized.Parameters(name = "readMode: {0}, flightSqlPort: {1}")
+ def parameters(): java.util.Collection[Array[AnyRef]] = {
+ import java.util.Arrays
+ Arrays.asList(
+ Array("thrift": java.lang.String, -1: java.lang.Integer),
+ Array("arrow": java.lang.String, 9611: java.lang.Integer)
+ )
+ }
+}
+
+/**
+ * Read Doris to Write Doris.
+ */
+@RunWith(classOf[Parameterized])
+class Doris2DorisE2ECase(readMode: String, flightSqlPort: Int) extends
AbstractContainerTestBase{
+
+ private val LOG = LoggerFactory.getLogger(classOf[Doris2DorisE2ECase])
+ val DATABASE = "test_doris_e2e"
+ val TABLE_READ_TBL_ALL_TYPES = "tbl_read_tbl_all_types"
+ val TABLE_WRITE_TBL_ALL_TYPES = "tbl_write_tbl_all_types"
+
+ @Before
+ def setUp(): Unit = {
+ ContainerUtils.executeSQLStatement(getDorisQueryConnection,
+ LOG,
+ String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE))
+ }
+
+ @Test
+ def testAllTypeE2ESQL(): Unit = {
+ val sourceInitSql: Array[String] =
ContainerUtils.parseFileContentSQL("container/ddl/read_all_type.sql")
+ ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG,
sourceInitSql: _*)
+
+ val targetInitSql: Array[String] =
ContainerUtils.parseFileContentSQL("container/ddl/write_all_type.sql")
+ ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG,
targetInitSql: _*)
+
+ val session = SparkSession.builder().master("local[*]").getOrCreate()
+ session.sql(
+ s"""
+ |CREATE TEMPORARY VIEW test_source
+ |USING doris
+ |OPTIONS(
+ | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_ALL_TYPES}",
+ | "fenodes"="${getFenodes}",
+ | "user"="${getDorisUsername}",
+ | "password"="${getDorisPassword}",
+ | "doris.read.mode"="${readMode}",
+ | "doris.read.arrow-flight-sql.port"="${flightSqlPort}"
+ |)
+ |""".stripMargin)
+
+ session.sql(
+ s"""
+ |CREATE TEMPORARY VIEW test_sink
+ |USING doris
+ |OPTIONS(
+ | "table.identifier"="${DATABASE + "." + TABLE_WRITE_TBL_ALL_TYPES}",
+ | "fenodes"="${getFenodes}",
+ | "user"="${getDorisUsername}",
+ | "password"="${getDorisPassword}"
+ |)
+ |""".stripMargin)
+
+ session.sql(
+ """
+ |insert into test_sink select * from test_source
+ |""".stripMargin)
+ session.stop()
+
+ val excepted =
+ util.Arrays.asList(
+
"1,true,127,32767,2147483647,9223372036854775807,170141183460469231731687303715884105727,3.14,2.71828,12345.6789,2025-03-11,2025-03-11T12:34:56,A,Hello,
Doris!,This is a string,[\"Alice\", \"Bob\"],{\"key1\":\"value1\",
\"key2\":\"value2\"},{\"name\": \"Tom\", \"age\":
30},{\"key\":\"value\"},{\"data\":123,\"type\":\"variant\"}",
+
"2,false,-128,-32768,-2147483648,-9223372036854775808,-170141183460469231731687303715884105728,-1.23,1.0E-4,-9999.9999,2024-12-25,2024-12-25T23:59:59,B,Doris
Test,Another string!,[\"Charlie\", \"David\"],{\"k1\":\"v1\",
\"k2\":\"v2\"},{\"name\": \"Jerry\", \"age\":
25},{\"status\":\"ok\"},{\"data\":[1,2,3]}",
+ "3,true,0,0,0,0,0,0.0,0.0,0.0000,2023-06-15,2023-06-15T08:00,C,Test
Doris,Sample text,[\"Eve\", \"Frank\"],{\"alpha\":\"beta\"},{\"name\":
\"Alice\", \"age\":
40},{\"nested\":{\"key\":\"value\"}},{\"variant\":\"test\"}",
+
"4,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null");
+
+ val query = String.format("select * from %s order by id",
TABLE_WRITE_TBL_ALL_TYPES)
+ ContainerUtils.checkResult(getDorisQueryConnection(DATABASE), LOG,
excepted, query, 20, false)
+ }
+}
diff --git
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisCatalogITCase.scala
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisCatalogITCase.scala
new file mode 100644
index 0000000..39df05a
--- /dev/null
+++
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisCatalogITCase.scala
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.sql
+
+import
org.apache.doris.spark.container.AbstractContainerTestBase.{assertEqualsInAnyOrder,
getDorisQueryConnection}
+import org.apache.doris.spark.container.{AbstractContainerTestBase,
ContainerUtils}
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+import org.junit.Test
+import org.slf4j.LoggerFactory
+
+import java.util
+import scala.collection.JavaConverters._
+
+/**
+ * it case for doris catalog.
+ */
+class DorisCatalogITCase extends AbstractContainerTestBase {
+
+ private val LOG = LoggerFactory.getLogger(classOf[DorisCatalogITCase])
+ private val DATABASE = "test_catalog"
+ private val TBL_CATALOG = "tbl_catalog"
+
+ @Test
+ @throws[Exception]
+ def testSparkCatalog(): Unit = {
+
+ val conf = new SparkConf()
+ conf.set("spark.sql.catalog.doris_catalog",
"org.apache.doris.spark.catalog.DorisTableCatalog")
+ conf.set("spark.sql.catalog.doris_catalog.doris.fenodes", getFenodes)
+ conf.set("spark.sql.catalog.doris_catalog.doris.query.port",
getQueryPort.toString)
+ conf.set("spark.sql.catalog.doris_catalog.doris.user", getDorisUsername)
+ conf.set("spark.sql.catalog.doris_catalog.doris.password",
getDorisPassword)
+ val session =
SparkSession.builder().config(conf).master("local[*]").getOrCreate()
+
+ // session.sessionState.catalogManager.setCurrentCatalog("doris_catalog")
+ // spark 2 no catalogManager property, used reflect
+ try {
+ val stateObj = session.sessionState
+ val catalogManagerObj =
stateObj.getClass.getMethod("catalogManager").invoke(stateObj)
+ val setCurrentCatalogMethod =
catalogManagerObj.getClass.getMethod("setCurrentCatalog", classOf[String])
+ setCurrentCatalogMethod.invoke(catalogManagerObj, "doris_catalog")
+ } catch {
+ case e: Exception =>
+ // if Spark 2,will throw NoSuchMethodException
+ println("Catalog API not available, skipping catalog operations")
+ e.printStackTrace()
+ return
+ }
+
+ // show databases
+ val showDatabaseActual = new util.ArrayList[String](session.sql("show
databases").collect().map(_.getAs[String]("namespace")).toList.asJava)
+ showDatabaseActual.add("information_schema")
+ val showDatabaseExcept = ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection,
+ LOG,
+ String.format("show databases"),
+ 1)
+ checkResultInAnyOrder("testSparkCatalog", showDatabaseExcept.toArray,
showDatabaseActual.toArray)
+
+ ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection,
+ LOG,
+ String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE))
+
+ // mock data
+ ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection,
+ LOG,
+ String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
+ String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TBL_CATALOG),
+ String.format("CREATE TABLE %s.%s ( \n"
+ + "`name` varchar(256),\n"
+ + "`age` int\n"
+ + ") "
+ + " DUPLICATE KEY(`name`) "
+ + " DISTRIBUTED BY HASH(`name`) BUCKETS 1\n"
+ + "PROPERTIES ("
+ + "\"replication_num\" = \"1\")", DATABASE, TBL_CATALOG),
+ String.format("insert into %s.%s values ('doris',18)", DATABASE,
TBL_CATALOG),
+ String.format("insert into %s.%s values ('spark',10)", DATABASE,
TBL_CATALOG)
+ )
+
+ // show tables
+ session.sql("USE " + DATABASE);
+ val showTablesActual = session.sql("show
tables").collect().map(_.getAs[String]("tableName")).toList.asJava
+ val showTablesExcept = ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection(DATABASE),
+ LOG,
+ String.format("show tables"),
+ 1)
+ checkResultInAnyOrder("testSparkCatalog", showTablesExcept.toArray,
showTablesActual.toArray)
+
+ val query = String.format("select * from %s.%s", DATABASE, TBL_CATALOG)
+ // select tables
+ val selectActual = session.sql(query).collect().map(i=>
i.getAs[String]("name") + "," + i.getAs[Int]("age")).toList.asJava
+ val selectExcept = ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection(DATABASE),
+ LOG,
+ query,
+ 2)
+ checkResultInAnyOrder("testSparkCatalog", selectExcept.toArray,
selectActual.toArray)
+
+ session.sql(String.format("desc %s",TBL_CATALOG)).show(true);
+ // insert tables
+ // todo: insert into values('') schema does not match
+ session.sql(String.format("insert overwrite %s.%s select 'insert-data' as
name, 99 as age", DATABASE, TBL_CATALOG))
+ val selectNewExcept = ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection(DATABASE),
+ LOG,
+ query,
+ 2)
+ checkResultInAnyOrder("testSparkCatalog", selectNewExcept.toArray,
util.Arrays.asList("insert-data,99").toArray)
+ }
+
+
+ private def checkResultInAnyOrder(testName: String, expected: Array[AnyRef],
actual: Array[AnyRef]): Unit = {
+ LOG.info("Checking DorisCatalogITCase result. testName={}, actual={},
expected={}", testName, actual, expected)
+ assertEqualsInAnyOrder(expected.toList.asJava, actual.toList.asJava)
+ }
+
+}
diff --git
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
index 67a0688..07c998b 100644
---
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
+++
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
@@ -42,6 +42,9 @@ object DorisReaderITCase {
}
}
+/**
+ * it case for doris reader.
+ */
@RunWith(classOf[Parameterized])
class DorisReaderITCase(readMode: String, flightSqlPort: Int) extends
AbstractContainerTestBase {
@@ -121,7 +124,7 @@ class DorisReaderITCase(readMode: String, flightSqlPort:
Int) extends AbstractCo
| "user"="${getDorisUsername}",
| "password"="${getDorisPassword}",
| "doris.read.mode"="${readMode}",
- | "doris.read.arrow-flight-sql.port"="${flightSqlPort}"
+ | "doris.fe.auto.fetch"="true"
|)
|""".stripMargin)
diff --git
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala
new file mode 100644
index 0000000..bbaf7bd
--- /dev/null
+++
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala
@@ -0,0 +1,220 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.sql
+
+import org.apache.doris.spark.container.{AbstractContainerTestBase,
ContainerUtils}
+import
org.apache.doris.spark.container.AbstractContainerTestBase.{assertEqualsInAnyOrder,
getDorisQueryConnection}
+import org.apache.doris.spark.rest.models.DataModel
+import org.apache.spark.sql.SparkSession
+import org.junit.{Before, Test}
+import org.slf4j.LoggerFactory
+
+import java.util
+import java.util.UUID
+import java.util.concurrent.{Executors, TimeUnit}
+import scala.util.control.Breaks._
+import scala.collection.JavaConverters._
+
+/**
+ * Test DorisWriter failover.
+ */
+class DorisWriterFailoverITCase extends AbstractContainerTestBase {
+
+ private val LOG = LoggerFactory.getLogger(classOf[DorisWriterFailoverITCase])
+ val DATABASE = "test_doris_failover"
+ val TABLE_WRITE_TBL_RETRY = "tbl_write_tbl_retry"
+ val TABLE_WRITE_TBL_TASK_RETRY = "tbl_write_tbl_task_retry"
+ val TABLE_WRITE_TBL_PRECOMMIT_FAIL = "tbl_write_tbl_precommit_fail"
+ val TABLE_WRITE_TBL_COMMIT_FAIL = "tbl_write_tbl_commit_fail"
+
+ @Before
+ def setUp(): Unit = {
+ ContainerUtils.executeSQLStatement(getDorisQueryConnection,
+ LOG,
+ String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE))
+ }
+
+ @Test
+ def testFailoverForRetry(): Unit = {
+ initializeTable(TABLE_WRITE_TBL_RETRY, DataModel.DUPLICATE)
+ val session = SparkSession.builder().master("local[1]").getOrCreate()
+ val df = session.createDataFrame(Seq(
+ ("doris", "1234"),
+ ("spark", "123456"),
+ ("catalog", "12345678")
+ )).toDF("name", "address")
+ df.createTempView("mock_source")
+
+ session.sql(
+ s"""
+ |CREATE TEMPORARY VIEW test_sink
+ |USING doris
+ |OPTIONS(
+ | "table.identifier"="${DATABASE + "." + TABLE_WRITE_TBL_RETRY}",
+ | "fenodes"="${getFenodes}",
+ | "user"="${getDorisUsername}",
+ | "password"="${getDorisPassword}",
+ | "doris.sink.batch.interval.ms"="1000",
+ | "doris.sink.batch.size"="1",
+ | "doris.sink.max-retries"="100",
+ | "doris.sink.enable-2pc"="false"
+ |)
+ |""".stripMargin)
+
+ val service = Executors.newSingleThreadExecutor()
+ val future = service.submit(new Runnable {
+ override def run(): Unit = {
+ session.sql("INSERT INTO test_sink SELECT * FROM mock_source")
+ }
+ })
+
+ val query = String.format("SELECT * FROM %s.%s", DATABASE,
TABLE_WRITE_TBL_RETRY)
+ var result: util.List[String] = null
+ val connection = getDorisQueryConnection(DATABASE)
+ breakable {
+ while (true) {
+ try {
+ // query may be failed
+ result = ContainerUtils.executeSQLStatement(connection, LOG, query,
2)
+ } catch {
+ case ex: Exception =>
+ LOG.error("Failed to query result, cause " + ex.getMessage)
+ }
+
+ // until insert 1 rows
+ if (result.size >= 1){
+ Thread.sleep(5000)
+ ContainerUtils.executeSQLStatement(
+ connection,
+ LOG,
+ String.format("ALTER TABLE %s.%s MODIFY COLUMN address
varchar(256)", DATABASE, TABLE_WRITE_TBL_RETRY))
+ break
+ }
+ }
+ }
+
+ future.get(60, TimeUnit.SECONDS)
+ session.stop()
+ val actual = ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection,
+ LOG,
+ String.format("select * from %s.%s", DATABASE, TABLE_WRITE_TBL_RETRY),
+ 2)
+ val expected = util.Arrays.asList("doris,1234", "spark,123456",
"catalog,12345678");
+ checkResultInAnyOrder("testFailoverForRetry", expected.toArray,
actual.toArray)
+ }
+
+
+ /**
+ * Test failover for task retry and sink.max-retries=0
+ */
+ @Test
+ def testFailoverForTaskRetry(): Unit = {
+ initializeTable(TABLE_WRITE_TBL_TASK_RETRY, DataModel.DUPLICATE)
+ val session = SparkSession.builder().master("local[1,100]").getOrCreate()
+ val df = session.createDataFrame(Seq(
+ ("doris", "cn"),
+ ("spark", "us"),
+ ("catalog", "uk")
+ )).toDF("name", "address")
+ df.createTempView("mock_source")
+
+ var uuid = UUID.randomUUID().toString
+ session.sql(
+ s"""
+ |CREATE TEMPORARY VIEW test_sink
+ |USING doris
+ |OPTIONS(
+ | "table.identifier"="${DATABASE + "." + TABLE_WRITE_TBL_TASK_RETRY}",
+ | "fenodes"="${getFenodes}",
+ | "user"="${getDorisUsername}",
+ | "password"="${getDorisPassword}",
+ | "doris.sink.batch.size"="1",
+ | "doris.sink.batch.interval.ms"="1000",
+ | "doris.sink.max-retries"="0",
+ | "doris.sink.enable-2pc"="true",
+ | "doris.sink.label.prefix"='${uuid}'
+ |)
+ |""".stripMargin)
+
+ val service = Executors.newSingleThreadExecutor()
+ val future = service.submit(new Runnable {
+ override def run(): Unit = {
+ session.sql("INSERT INTO test_sink SELECT * FROM mock_source")
+ }
+ })
+
+ val query = "show transaction from " + DATABASE + " where label like '" +
uuid + "%'"
+ var result: List[String] = null
+ val connection = getDorisQueryConnection(DATABASE)
+ breakable {
+ while (true) {
+ try {
+ // query may be failed
+ result = ContainerUtils.executeSQLStatement(connection, LOG, query,
15).asScala.toList
+ } catch {
+ case ex: Exception =>
+ LOG.error("Failed to query result, cause " + ex.getMessage)
+ }
+
+ // until insert 1 rows
+ if (result.size >= 1 && result.forall(s =>
s.contains("PRECOMMITTED"))){
+ faultInjectionOpen()
+ Thread.sleep(3000)
+ faultInjectionClear()
+ break
+ }
+ }
+ }
+
+ future.get(60, TimeUnit.SECONDS)
+ session.stop()
+ val actual = ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection,
+ LOG,
+ String.format("select * from %s.%s", DATABASE,
TABLE_WRITE_TBL_TASK_RETRY),
+ 2)
+ val expected = util.Arrays.asList("doris,cn", "spark,us", "catalog,uk");
+ checkResultInAnyOrder("testFailoverForTaskRetry", expected.toArray,
actual.toArray)
+ }
+
+
+ private def initializeTable(table: String, dataModel: DataModel): Unit = {
+ val max = if (DataModel.AGGREGATE == dataModel) "MAX" else ""
+ val morProps = if (!(DataModel.UNIQUE_MOR == dataModel)) "" else
",\"enable_unique_key_merge_on_write\" = \"false\""
+ val model = if (dataModel == DataModel.UNIQUE_MOR)
DataModel.UNIQUE.toString else dataModel.toString
+ ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection,
+ LOG,
+ String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
+ String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table),
+ String.format("CREATE TABLE %s.%s ( \n"
+ + "`name` varchar(32),\n"
+ + "`address` varchar(4) %s\n"
+ + ") "
+ + " %s KEY(`name`) "
+ + " DISTRIBUTED BY HASH(`name`) BUCKETS 1\n"
+ + "PROPERTIES ("
+ + "\"replication_num\" = \"1\"\n" + morProps + ")", DATABASE, table,
max, model))
+ }
+
+ private def checkResultInAnyOrder(testName: String, expected: Array[AnyRef],
actual: Array[AnyRef]): Unit = {
+ LOG.info("Checking DorisWriterFailoverITCase result. testName={},
actual={}, expected={}", testName, actual, expected)
+ assertEqualsInAnyOrder(expected.toList.asJava, actual.toList.asJava)
+ }
+}
diff --git
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala
index 7f1e393..51201e4 100644
---
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala
+++
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala
@@ -19,26 +19,36 @@ package org.apache.doris.spark.sql
import
org.apache.doris.spark.container.AbstractContainerTestBase.{assertEqualsInAnyOrder,
getDorisQueryConnection}
import org.apache.doris.spark.container.{AbstractContainerTestBase,
ContainerUtils}
+import org.apache.doris.spark.rest.models.DataModel
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.junit.Test
import org.slf4j.LoggerFactory
import java.util
import scala.collection.JavaConverters._
+
+/**
+ * it case for doris writer.
+ */
class DorisWriterITCase extends AbstractContainerTestBase {
- private val LOG = LoggerFactory.getLogger(classOf[DorisReaderITCase])
+ private val LOG = LoggerFactory.getLogger(classOf[DorisWriterITCase])
- val DATABASE: String = "test"
+ val DATABASE: String = "test_doris_write"
val TABLE_CSV: String = "tbl_csv"
+ val TABLE_CSV_HIDE_SEP: String = "tbl_csv_hide_sep"
+ val TABLE_GROUP_COMMIT: String = "tbl_group_commit"
val TABLE_JSON: String = "tbl_json"
+ val TABLE_JSON_EMPTY_PARTITION: String = "tbl_json_empty_partition"
val TABLE_JSON_TBL: String = "tbl_json_tbl"
+ val TABLE_JSON_TBL_OVERWRITE: String = "tbl_json_tbl_overwrite"
+ val TABLE_JSON_TBL_ARROW: String = "tbl_json_tbl_arrow"
@Test
@throws[Exception]
def testSinkCsvFormat(): Unit = {
- initializeTable(TABLE_CSV)
- val session = SparkSession.builder().master("local[*]").getOrCreate()
+ initializeTable(TABLE_CSV, DataModel.DUPLICATE)
+ val session = SparkSession.builder().master("local[1]").getOrCreate()
val df = session.createDataFrame(Seq(
("doris_csv", 1),
("spark_csv", 2)
@@ -46,17 +56,20 @@ class DorisWriterITCase extends AbstractContainerTestBase {
df.write
.format("doris")
.option("doris.fenodes", getFenodes)
+ .option("doris.sink.auto-redirect", false)
.option("doris.table.identifier", DATABASE + "." + TABLE_CSV)
.option("user", getDorisUsername)
.option("password", getDorisPassword)
.option("sink.properties.column_separator", ",")
.option("sink.properties.line_delimiter", "\n")
.option("sink.properties.format", "csv")
+ .option("doris.sink.batch.interval.ms", "5000")
+ .option("doris.sink.batch.size", "1")
.mode(SaveMode.Append)
.save()
session.stop()
- Thread.sleep(10000)
+ Thread.sleep(15000)
val actual = ContainerUtils.executeSQLStatement(
getDorisQueryConnection,
LOG,
@@ -66,10 +79,136 @@ class DorisWriterITCase extends AbstractContainerTestBase {
checkResultInAnyOrder("testSinkCsvFormat", expected.toArray(),
actual.toArray)
}
+ @Test
+ @throws[Exception]
+ def testSinkCsvFormatHideSep(): Unit = {
+ initializeTable(TABLE_CSV_HIDE_SEP, DataModel.AGGREGATE)
+ val session = SparkSession.builder().master("local[*]").getOrCreate()
+ val df = session.createDataFrame(Seq(
+ ("doris_csv", 1),
+ ("spark_csv", 2)
+ )).toDF("name", "age")
+ df.write
+ .format("doris")
+ .option("doris.fenodes", getFenodes + "," + getFenodes)
+ .option("doris.table.identifier", DATABASE + "." + TABLE_CSV_HIDE_SEP)
+ .option("user", getDorisUsername)
+ .option("password", getDorisPassword)
+ .option("sink.properties.column_separator", "\\x01")
+ .option("sink.properties.line_delimiter", "\\x02")
+ .option("sink.properties.format", "csv")
+ .mode(SaveMode.Append)
+ .save()
+ session.stop()
+
+ Thread.sleep(10000)
+ val actual = ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection,
+ LOG,
+ String.format("select * from %s.%s", DATABASE, TABLE_CSV_HIDE_SEP),
+ 2)
+ val expected = util.Arrays.asList("doris_csv,1", "spark_csv,2")
+ checkResultInAnyOrder("testSinkCsvFormatHideSep", expected.toArray(),
actual.toArray)
+ }
+
+ @Test
+ @throws[Exception]
+ def testSinkGroupCommit(): Unit = {
+ initializeTable(TABLE_GROUP_COMMIT, DataModel.DUPLICATE)
+ val session = SparkSession.builder().master("local[*]").getOrCreate()
+ val df = session.createDataFrame(Seq(
+ ("doris_csv", 1),
+ ("spark_csv", 2)
+ )).toDF("name", "age")
+ df.write
+ .format("doris")
+ .option("doris.fenodes", getFenodes)
+ .option("doris.table.identifier", DATABASE + "." + TABLE_GROUP_COMMIT)
+ .option("user", getDorisUsername)
+ .option("password", getDorisPassword)
+ .option("sink.properties.group_commit", "sync_mode")
+ .mode(SaveMode.Append)
+ .save()
+ session.stop()
+
+ Thread.sleep(10000)
+ val actual = ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection,
+ LOG,
+ String.format("select * from %s.%s", DATABASE, TABLE_GROUP_COMMIT),
+ 2)
+ val expected = util.Arrays.asList("doris_csv,1", "spark_csv,2")
+ checkResultInAnyOrder("testSinkGroupCommit", expected.toArray(),
actual.toArray)
+ }
+
+ @Test
+ @throws[Exception]
+ def testSinkEmptyPartition(): Unit = {
+ initializeTable(TABLE_JSON_EMPTY_PARTITION, DataModel.AGGREGATE)
+ val session = SparkSession.builder().master("local[2]").getOrCreate()
+ val df = session.createDataFrame(Seq(
+ ("doris_json", 1)
+ )).toDF("name", "age")
+ df.repartition(2).write
+ .format("doris")
+ .option("doris.fenodes", getFenodes)
+ .option("doris.table.identifier", DATABASE + "." +
TABLE_JSON_EMPTY_PARTITION)
+ .option("user", getDorisUsername)
+ .option("password", getDorisPassword)
+ .option("sink.properties.read_json_by_line", "true")
+ .option("sink.properties.format", "json")
+ .option("doris.sink.auto-redirect", "false")
+ .option("doris.sink.enable-2pc", "true")
+ .mode(SaveMode.Append)
+ .save()
+ session.stop()
+
+ Thread.sleep(10000)
+ val actual = ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection,
+ LOG,
+ String.format("select * from %s.%s", DATABASE,
TABLE_JSON_EMPTY_PARTITION),
+ 2)
+ val expected = util.Arrays.asList("doris_json,1");
+ checkResultInAnyOrder("testSinkEmptyPartition", expected.toArray,
actual.toArray)
+ }
+
+ @Test
+ @throws[Exception]
+ def testSinkArrowFormat(): Unit = {
+ initializeTable(TABLE_JSON_TBL_ARROW, DataModel.DUPLICATE)
+ val session = SparkSession.builder().master("local[*]").getOrCreate()
+ val df = session.createDataFrame(Seq(
+ ("doris_json", 1),
+ ("spark_json", 2)
+ )).toDF("name", "age")
+ df.write
+ .format("doris")
+ .option("doris.fenodes", getFenodes)
+ .option("doris.table.identifier", DATABASE + "." + TABLE_JSON_TBL_ARROW)
+ .option("user", getDorisUsername)
+ .option("password", getDorisPassword)
+ .option("sink.properties.format", "arrow")
+ .option("doris.sink.batch.size", "1")
+ .option("doris.sink.enable-2pc", "true")
+ .mode(SaveMode.Append)
+ .save()
+ session.stop()
+
+ Thread.sleep(10000)
+ val actual = ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection,
+ LOG,
+ String.format("select * from %s.%s", DATABASE, TABLE_JSON_TBL_ARROW),
+ 2)
+ val expected = util.Arrays.asList("doris_json,1", "spark_json,2");
+ checkResultInAnyOrder("testSinkArrowFormat", expected.toArray,
actual.toArray)
+ }
+
@Test
@throws[Exception]
def testSinkJsonFormat(): Unit = {
- initializeTable(TABLE_JSON)
+ initializeTable(TABLE_JSON, DataModel.UNIQUE)
val session = SparkSession.builder().master("local[*]").getOrCreate()
val df = session.createDataFrame(Seq(
("doris_json", 1),
@@ -101,7 +240,7 @@ class DorisWriterITCase extends AbstractContainerTestBase {
@Test
@throws[Exception]
def testSQLSinkFormat(): Unit = {
- initializeTable(TABLE_JSON_TBL)
+ initializeTable(TABLE_JSON_TBL, DataModel.UNIQUE_MOR)
val session = SparkSession.builder().master("local[*]").getOrCreate()
val df = session.createDataFrame(Seq(
("doris_tbl", 1),
@@ -135,25 +274,74 @@ class DorisWriterITCase extends AbstractContainerTestBase
{
checkResultInAnyOrder("testSQLSinkFormat", expected.toArray,
actual.toArray)
}
-
+ @Test
@throws[Exception]
- private def initializeTable(table: String): Unit = {
+ def testSQLSinkOverwrite(): Unit = {
+ initializeTable(TABLE_JSON_TBL_OVERWRITE, DataModel.DUPLICATE)
+ // init history data
+ ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection,
+ LOG,
+ String.format("insert into %s.%s values ('history-doris',1118)",
DATABASE, TABLE_JSON_TBL_OVERWRITE),
+ String.format("insert into %s.%s values ('history-spark',1110)",
DATABASE, TABLE_JSON_TBL_OVERWRITE))
+
+ val session = SparkSession.builder().master("local[*]").getOrCreate()
+ val df = session.createDataFrame(Seq(
+ ("doris_tbl", 1),
+ ("spark_tbl", 2)
+ )).toDF("name", "age")
+ df.createTempView("mock_source")
+ session.sql(
+ s"""
+ |CREATE TEMPORARY VIEW test_sink
+ |USING doris
+ |OPTIONS(
+ | "table.identifier"="${DATABASE + "." + TABLE_JSON_TBL_OVERWRITE}",
+ | "fenodes"="${getFenodes}",
+ | "user"="${getDorisUsername}",
+ | "password"="${getDorisPassword}",
+ | "doris.query.port"="${getQueryPort}",
+ | "doris.sink.label.prefix"="doris-label-customer",
+ | "doris.sink.enable-2pc"="true"
+ |)
+ |""".stripMargin)
+ session.sql(
+ """
+ |insert overwrite table test_sink select name,age from mock_source
+ |""".stripMargin)
+ session.stop()
+
+ Thread.sleep(10000)
+ val actual = ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection,
+ LOG,
+ String.format("select * from %s.%s", DATABASE, TABLE_JSON_TBL_OVERWRITE),
+ 2)
+ val expected = util.Arrays.asList("doris_tbl,1", "spark_tbl,2");
+ checkResultInAnyOrder("testSQLSinkOverwrite", expected.toArray,
actual.toArray)
+ }
+
+ private def initializeTable(table: String, dataModel: DataModel): Unit = {
+ val max = if (DataModel.AGGREGATE == dataModel) "MAX" else ""
+ val morProps = if (!(DataModel.UNIQUE_MOR == dataModel)) "" else
",\"enable_unique_key_merge_on_write\" = \"false\""
+ val model = if (dataModel == DataModel.UNIQUE_MOR)
DataModel.UNIQUE.toString else dataModel.toString
ContainerUtils.executeSQLStatement(
getDorisQueryConnection,
LOG,
String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table),
- String.format(
- "CREATE TABLE %s.%s ( \n" + "`name` varchar(256),\n" + "`age` int\n" +
") " +
- "DISTRIBUTED BY HASH(`name`) BUCKETS 1\n" +
- "PROPERTIES (\n" +
- "\"replication_num\" = \"1\"\n" + ")\n", DATABASE, table)
- )
+ String.format("CREATE TABLE %s.%s ( \n"
+ + "`name` varchar(256),\n"
+ + "`age` int %s\n"
+ + ") "
+ + " %s KEY(`name`) "
+ + " DISTRIBUTED BY HASH(`name`) BUCKETS 1\n"
+ + "PROPERTIES ("
+ + "\"replication_num\" = \"1\"\n" + morProps + ")", DATABASE, table,
max, model))
}
private def checkResultInAnyOrder(testName: String, expected: Array[AnyRef],
actual: Array[AnyRef]): Unit = {
- LOG.info("Checking DorisSourceITCase result. testName={}, actual={},
expected={}", testName, actual, expected)
+ LOG.info("Checking DorisWriterFailoverITCase result. testName={},
actual={}, expected={}", testName, actual, expected)
assertEqualsInAnyOrder(expected.toList.asJava, actual.toList.asJava)
}
-
}
diff --git
a/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/write_all_type.sql
b/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/write_all_type.sql
new file mode 100644
index 0000000..967bdf5
--- /dev/null
+++
b/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/write_all_type.sql
@@ -0,0 +1,31 @@
+DROP TABLE IF EXISTS tbl_write_tbl_all_types;
+
+CREATE TABLE tbl_write_tbl_all_types (
+`id` int,
+`c1` boolean,
+`c2` tinyint,
+`c3` smallint,
+`c4` int,
+`c5` bigint,
+`c6` largeint,
+`c7` float,
+`c8` double,
+`c9` decimal(12,4),
+`c10` date,
+`c11` datetime,
+`c12` char(1),
+`c13` varchar(256),
+`c14` string,
+`c15` Array<String>,
+`c16` Map<String, String>,
+`c17` Struct<name: String, age: int>,
+`c18` JSON,
+`c19` VARIANT
+)
+DUPLICATE KEY(`id`)
+DISTRIBUTED BY HASH(`id`) BUCKETS 2
+PROPERTIES (
+"replication_num" = "1",
+"light_schema_change" = "true"
+);
+
diff --git
a/spark-doris-connector/spark-doris-connector-it/src/test/resources/log4j.properties
b/spark-doris-connector/spark-doris-connector-it/src/test/resources/log4j2-test.properties
similarity index 76%
rename from
spark-doris-connector/spark-doris-connector-it/src/test/resources/log4j.properties
rename to
spark-doris-connector/spark-doris-connector-it/src/test/resources/log4j2-test.properties
index ecb73d3..de6bfd5 100644
---
a/spark-doris-connector/spark-doris-connector-it/src/test/resources/log4j.properties
+++
b/spark-doris-connector/spark-doris-connector-it/src/test/resources/log4j2-test.properties
@@ -16,8 +16,11 @@
# limitations under the License.
################################################################################
-log4j.rootLogger=INFO, console
+rootLogger.level = info
+rootLogger.appenderRef.stdout.ref = console
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c
[%t] %x - %m%n
\ No newline at end of file
+appender.console.type = Console
+appender.console.name = console
+appender.console.target = SYSTEM_ERR
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p [%t] %c{1}: %m%n%ex
diff --git
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
index f4ff49f..6628e9a 100644
---
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
+++
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
@@ -17,7 +17,6 @@
package org.apache.doris.spark.write
-import org.apache.commons.lang3.StringUtils
import org.apache.doris.spark.client.write.{CopyIntoProcessor, DorisCommitter,
DorisWriter, StreamLoadProcessor}
import org.apache.doris.spark.config.{DorisConfig, DorisOptions}
import org.apache.doris.spark.util.Retry
@@ -60,7 +59,7 @@ class DorisDataWriter(config: DorisConfig, schema:
StructType, partitionId: Int,
if (txnId.isDefined) {
committedMessages += txnId.get
} else {
- throw new Exception("Failed to commit batch")
+ log.warn("No txn {} to commit batch", txnId)
}
}
DorisWriterCommitMessage(partitionId, taskId, epochId,
committedMessages.toArray)
@@ -106,7 +105,7 @@ class DorisDataWriter(config: DorisConfig, schema:
StructType, partitionId: Int,
recordBuffer.clear()
}
writer.resetBatchCount()
- LockSupport.parkNanos(batchIntervalMs.toLong)
+
LockSupport.parkNanos(Duration.ofMillis(batchIntervalMs.toLong).toNanos)
}
writer.load(record)
} {
diff --git
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisWrite.scala
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisWrite.scala
index cf2914f..e5fddaf 100644
---
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisWrite.scala
+++
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisWrite.scala
@@ -53,9 +53,7 @@ class DorisWrite(config: DorisConfig, schema: StructType)
extends BatchWrite wit
// for batch write
override def abort(writerCommitMessages: Array[WriterCommitMessage]): Unit =
{
LOG.info("writerCommitMessages size: " + writerCommitMessages.length)
- writerCommitMessages.foreach(x => println(x))
if (writerCommitMessages.exists(_ != null) &&
writerCommitMessages.nonEmpty) {
- writerCommitMessages.foreach(x => println(x))
writerCommitMessages.filter(_ != null)
.foreach(_.asInstanceOf[DorisWriterCommitMessage].commitMessages.foreach(committer.abort))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]