This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 8dcb522  [Enhancemant] Add fe retry and load balancing strategy for 
Doris Flight SQL Reader (#318)
8dcb522 is described below

commit 8dcb5225a86c3d82fceccb3d65e30720e54d06e4
Author: Lei <[email protected]>
AuthorDate: Thu May 29 19:03:41 2025 +0800

    [Enhancemant] Add fe retry and load balancing strategy for Doris Flight SQL 
Reader (#318)
    
    1. add option DORIS_READ_FLIGHT_SQL_PREFIX to spec flight sql prefix
    2. add fe retry and random strategy when initialize adbc connection  to 
achieve load balancing for FE
---
 .../spark/client/read/DorisFlightSqlReader.java    | 22 +++++++++++++---
 .../apache/doris/spark/config/DorisOptions.java    |  2 ++
 .../apache/doris/spark/sql/DorisReaderITCase.scala | 29 ++++++++++++++++++++++
 3 files changed, 49 insertions(+), 4 deletions(-)

diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
index 779d622..55298ff 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
@@ -48,6 +48,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Collections;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -55,7 +56,6 @@ import java.util.stream.Collectors;
 public class DorisFlightSqlReader extends DorisReader {
 
     private static final Logger log = 
LoggerFactory.getLogger(DorisFlightSqlReader.class);
-    private static final String PREFIX = "/* ApplicationName=Spark 
ArrowFlightSQL Query */";
     private final AtomicBoolean endOfStream = new AtomicBoolean(false);
     private final DorisFrontendClient frontendClient;
     private final Schema schema;
@@ -66,15 +66,24 @@ public class DorisFlightSqlReader extends DorisReader {
     public DorisFlightSqlReader(DorisReaderPartition partition) throws 
Exception {
         super(partition);
         this.frontendClient = new DorisFrontendClient(partition.getConfig());
-        List<Frontend> frontends = frontendClient.getFrontends();
+        List<Frontend> frontends = new 
ArrayList<>(frontendClient.getFrontends());
+        Collections.shuffle(frontends);
+        Exception tx = null;
         for (Frontend frontend : frontends) {
             try {
                 this.connection = initializeConnection(frontend, 
partition.getConfig());
+                tx = null;
                 break;
-            } catch (OptionRequiredException | AdbcException e) {
+            } catch (OptionRequiredException e) {
                 throw new DorisException("init adbc connection failed", e);
+            } catch (AdbcException e) {
+                log.warn("init adbc connection failed with fe: " + 
frontend.getHost(), e);
+                tx = new DorisException("init adbc connection failed", e);
             }
         }
+        if (tx != null) {
+            throw tx;
+        }
         this.schema = processDorisSchema(partition);
         this.arrowReader = executeQuery();
         this.datetimeJava8ApiEnabled = partition.getDateTimeJava8APIEnabled();
@@ -150,7 +159,12 @@ public class DorisFlightSqlReader extends DorisReader {
         String tablets = String.format("TABLET(%s)", 
StringUtils.join(partition.getTablets(), ","));
         String predicates = partition.getFilters().length == 0 ? "" : " WHERE 
" + String.join(" AND ", partition.getFilters());
         String limit = partition.getLimit() > 0 ? " LIMIT " + 
partition.getLimit() : "";
-        return PREFIX + String.format("SELECT %s FROM %s %s%s%s", columns, 
fullTableName, tablets, predicates, limit);
+        return generateQueryPrefix() + String.format("SELECT %s FROM %s 
%s%s%s", columns, fullTableName, tablets, predicates, limit);
+    }
+
+    private String generateQueryPrefix() throws OptionRequiredException {
+        String prefix = 
config.getValue(DorisOptions.DORIS_READ_FLIGHT_SQL_PREFIX);
+        return String.format("/* %s */", prefix);
     }
 
     protected Schema processDorisSchema(DorisReaderPartition partition) throws 
Exception {
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
index 01dad06..5c31fa1 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
@@ -118,6 +118,8 @@ public class DorisOptions {
 
     public static final ConfigOption<String> READ_MODE = 
ConfigOptions.name("doris.read.mode").stringType().defaultValue("thrift").withDescription("");
 
+    public static final ConfigOption<String> DORIS_READ_FLIGHT_SQL_PREFIX = 
ConfigOptions.name("doris.read.arrow-flight-sql.prefix").stringType().defaultValue("ApplicationName=Spark
 ArrowFlightSQL Query").withDescription("");
+
     public static final ConfigOption<Integer> DORIS_READ_FLIGHT_SQL_PORT = 
ConfigOptions.name("doris.read.arrow-flight-sql.port").intType().withoutDefaultValue().withDescription("");
 
     public static final ConfigOption<String> DORIS_SINK_LABEL_PREFIX = 
ConfigOptions.name("doris.sink.label.prefix").stringType().defaultValue("spark-doris").withDescription("");
diff --git 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
index 47793b6..5fa9112 100644
--- 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
+++ 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
@@ -531,4 +531,33 @@ class DorisReaderITCase(readMode: String, flightSqlPort: 
Int) extends AbstractCo
       session.stop()
     }
   }
+
+  def testReadWithPrefix(): Unit = {
+    val sourceInitSql: Array[String] = 
ContainerUtils.parseFileContentSQL("container/ddl/read_all_type.sql")
+    ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG, 
sourceInitSql: _*)
+    val session = SparkSession.builder().master("local[*]").getOrCreate()
+    try {
+      session.sql(
+        s"""
+           |CREATE TEMPORARY VIEW test_source
+           |USING doris
+           |OPTIONS(
+           | "doris.read.arrow-flight-sql.prefix" = 
"test-read-prefix-${session.sparkContext.applicationId}",
+           | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_ALL_TYPES}",
+           | "fenodes"="${getFenodes}",
+           | "user"="${getDorisUsername}",
+           | "password"="${getDorisPassword}",
+           | "doris.read.mode"="${readMode}",
+           | "doris.read.arrow-flight-sql.port"="${flightSqlPort}"
+           |)
+           |""".stripMargin)
+
+      val prefixTest = session.sql(
+        """
+          |select id from test_source where id <= 2
+          |""".stripMargin).collect()
+
+      assert("List([1], [2])".equals(prefixTest.toList.toString()))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to