This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 8dcb522 [Enhancemant] Add fe retry and load balancing strategy for
Doris Flight SQL Reader (#318)
8dcb522 is described below
commit 8dcb5225a86c3d82fceccb3d65e30720e54d06e4
Author: Lei <[email protected]>
AuthorDate: Thu May 29 19:03:41 2025 +0800
[Enhancemant] Add fe retry and load balancing strategy for Doris Flight SQL
Reader (#318)
1. add option DORIS_READ_FLIGHT_SQL_PREFIX to spec flight sql prefix
2. add fe retry and random strategy when initialize adbc connection to
achieve load balancing for FE
---
.../spark/client/read/DorisFlightSqlReader.java | 22 +++++++++++++---
.../apache/doris/spark/config/DorisOptions.java | 2 ++
.../apache/doris/spark/sql/DorisReaderITCase.scala | 29 ++++++++++++++++++++++
3 files changed, 49 insertions(+), 4 deletions(-)
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
index 779d622..55298ff 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
@@ -48,6 +48,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -55,7 +56,6 @@ import java.util.stream.Collectors;
public class DorisFlightSqlReader extends DorisReader {
private static final Logger log =
LoggerFactory.getLogger(DorisFlightSqlReader.class);
- private static final String PREFIX = "/* ApplicationName=Spark
ArrowFlightSQL Query */";
private final AtomicBoolean endOfStream = new AtomicBoolean(false);
private final DorisFrontendClient frontendClient;
private final Schema schema;
@@ -66,15 +66,24 @@ public class DorisFlightSqlReader extends DorisReader {
public DorisFlightSqlReader(DorisReaderPartition partition) throws
Exception {
super(partition);
this.frontendClient = new DorisFrontendClient(partition.getConfig());
- List<Frontend> frontends = frontendClient.getFrontends();
+ List<Frontend> frontends = new
ArrayList<>(frontendClient.getFrontends());
+ Collections.shuffle(frontends);
+ Exception tx = null;
for (Frontend frontend : frontends) {
try {
this.connection = initializeConnection(frontend,
partition.getConfig());
+ tx = null;
break;
- } catch (OptionRequiredException | AdbcException e) {
+ } catch (OptionRequiredException e) {
throw new DorisException("init adbc connection failed", e);
+ } catch (AdbcException e) {
+ log.warn("init adbc connection failed with fe: " +
frontend.getHost(), e);
+ tx = new DorisException("init adbc connection failed", e);
}
}
+ if (tx != null) {
+ throw tx;
+ }
this.schema = processDorisSchema(partition);
this.arrowReader = executeQuery();
this.datetimeJava8ApiEnabled = partition.getDateTimeJava8APIEnabled();
@@ -150,7 +159,12 @@ public class DorisFlightSqlReader extends DorisReader {
String tablets = String.format("TABLET(%s)",
StringUtils.join(partition.getTablets(), ","));
String predicates = partition.getFilters().length == 0 ? "" : " WHERE
" + String.join(" AND ", partition.getFilters());
String limit = partition.getLimit() > 0 ? " LIMIT " +
partition.getLimit() : "";
- return PREFIX + String.format("SELECT %s FROM %s %s%s%s", columns,
fullTableName, tablets, predicates, limit);
+ return generateQueryPrefix() + String.format("SELECT %s FROM %s
%s%s%s", columns, fullTableName, tablets, predicates, limit);
+ }
+
+ private String generateQueryPrefix() throws OptionRequiredException {
+ String prefix =
config.getValue(DorisOptions.DORIS_READ_FLIGHT_SQL_PREFIX);
+ return String.format("/* %s */", prefix);
}
protected Schema processDorisSchema(DorisReaderPartition partition) throws
Exception {
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
index 01dad06..5c31fa1 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
@@ -118,6 +118,8 @@ public class DorisOptions {
public static final ConfigOption<String> READ_MODE =
ConfigOptions.name("doris.read.mode").stringType().defaultValue("thrift").withDescription("");
+ public static final ConfigOption<String> DORIS_READ_FLIGHT_SQL_PREFIX =
ConfigOptions.name("doris.read.arrow-flight-sql.prefix").stringType().defaultValue("ApplicationName=Spark
ArrowFlightSQL Query").withDescription("");
+
public static final ConfigOption<Integer> DORIS_READ_FLIGHT_SQL_PORT =
ConfigOptions.name("doris.read.arrow-flight-sql.port").intType().withoutDefaultValue().withDescription("");
public static final ConfigOption<String> DORIS_SINK_LABEL_PREFIX =
ConfigOptions.name("doris.sink.label.prefix").stringType().defaultValue("spark-doris").withDescription("");
diff --git
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
index 47793b6..5fa9112 100644
---
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
+++
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
@@ -531,4 +531,33 @@ class DorisReaderITCase(readMode: String, flightSqlPort:
Int) extends AbstractCo
session.stop()
}
}
+
+ def testReadWithPrefix(): Unit = {
+ val sourceInitSql: Array[String] =
ContainerUtils.parseFileContentSQL("container/ddl/read_all_type.sql")
+ ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG,
sourceInitSql: _*)
+ val session = SparkSession.builder().master("local[*]").getOrCreate()
+ try {
+ session.sql(
+ s"""
+ |CREATE TEMPORARY VIEW test_source
+ |USING doris
+ |OPTIONS(
+ | "doris.read.arrow-flight-sql.prefix" =
"test-read-prefix-${session.sparkContext.applicationId}",
+ | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_ALL_TYPES}",
+ | "fenodes"="${getFenodes}",
+ | "user"="${getDorisUsername}",
+ | "password"="${getDorisPassword}",
+ | "doris.read.mode"="${readMode}",
+ | "doris.read.arrow-flight-sql.port"="${flightSqlPort}"
+ |)
+ |""".stripMargin)
+
+ val prefixTest = session.sql(
+ """
+ |select id from test_source where id <= 2
+ |""".stripMargin).collect()
+
+ assert("List([1], [2])".equals(prefixTest.toList.toString()))
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]