This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git

commit 27b72a734db7f7aa78a6fea091ca3c72b1f18198
Author: Mingyu Chen <morningman....@gmail.com>
AuthorDate: Fri Nov 5 09:43:06 2021 +0800

    [HTTP][API] Add backends info API for spark/flink connector (#6984)
    
    Doris should provide a http api to return backends list for connectors to 
submit stream load,
    and without privilege checking, which can let common user to use it
---
 build.sh                                           | 20 ++++++-
 pom_3.0.xml                                        |  6 ++
 .../org/apache/doris/spark/DorisStreamLoad.java    |  2 +-
 .../org/apache/doris/spark/rest/RestService.java   | 68 ++++++++++++++++++++--
 .../apache/doris/spark/rest/models/Backend.java    |  1 +
 .../apache/doris/spark/rest/models/BackendRow.java |  3 +
 .../rest/models/{Backend.java => BackendV2.java}   | 47 ++++++++++++---
 .../apache/doris/spark/rest/TestRestService.java   | 13 ++++-
 .../doris/spark/sql/TestSparkConnector.scala       |  5 ++
 9 files changed, 148 insertions(+), 17 deletions(-)

diff --git a/build.sh b/build.sh
index b4ea042..d6ba435 100755
--- a/build.sh
+++ b/build.sh
@@ -39,7 +39,6 @@ fi
 # check maven
 MVN_CMD=mvn
 
-
 if [[ ! -z ${CUSTOM_MVN} ]]; then
     MVN_CMD=${CUSTOM_MVN}
 fi
@@ -48,11 +47,26 @@ if ! ${MVN_CMD} --version; then
     exit 1
 fi
 export MVN_CMD
-if [ $1 == 3 ]
+
+usage() {
+  echo "
+  Eg.
+    $0 2            build with spark 2.x
+    $0 3            build with spark 3.x
+  "
+  exit 1
+}
+
+if [ $# == 0 ]; then
+    usage
+fi
+
+
+if [ "$1"x == "3x" ]
 then
    ${MVN_CMD} clean package -f pom_3.0.xml
 fi
-if [ $1 == 2 ]
+if [ "$1"x == "2x" ]
 then
    ${MVN_CMD} clean package
 fi
diff --git a/pom_3.0.xml b/pom_3.0.xml
index 6c8eee5..d208ad0 100644
--- a/pom_3.0.xml
+++ b/pom_3.0.xml
@@ -161,6 +161,12 @@
             <version>4.1.27.Final</version>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>test</scope>
+        </dependency>
 
     </dependencies>
 
diff --git a/src/main/java/org/apache/doris/spark/DorisStreamLoad.java 
b/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
index 117825c..4411fbc 100644
--- a/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
+++ b/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
@@ -75,7 +75,7 @@ public class DorisStreamLoad implements Serializable{
     }
 
     public DorisStreamLoad(SparkSettings settings) throws IOException, 
DorisException {
-        String hostPort = RestService.randomBackend(settings, LOG);
+        String hostPort = RestService.randomBackendV2(settings, LOG);
         this.hostPort = hostPort;
         String[] dbTable = 
settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\.");
         this.db = dbTable[0];
diff --git a/src/main/java/org/apache/doris/spark/rest/RestService.java 
b/src/main/java/org/apache/doris/spark/rest/RestService.java
index bb91538..e8a2956 100644
--- a/src/main/java/org/apache/doris/spark/rest/RestService.java
+++ b/src/main/java/org/apache/doris/spark/rest/RestService.java
@@ -62,6 +62,7 @@ import 
org.apache.doris.spark.exception.IllegalArgumentException;
 import org.apache.doris.spark.exception.ShouldNeverHappenException;
 import org.apache.doris.spark.rest.models.Backend;
 import org.apache.doris.spark.rest.models.BackendRow;
+import org.apache.doris.spark.rest.models.BackendV2;
 import org.apache.doris.spark.rest.models.QueryPlan;
 import org.apache.doris.spark.rest.models.Schema;
 import org.apache.doris.spark.rest.models.Tablet;
@@ -86,8 +87,9 @@ public class RestService implements Serializable {
     private static final String API_PREFIX = "/api";
     private static final String SCHEMA = "_schema";
     private static final String QUERY_PLAN = "_query_plan";
+    @Deprecated
     private static final String BACKENDS = "/rest/v1/system?path=//backends";
-
+    private static final String BACKENDS_V2 = "/api/backends?is_alive=true";
 
     /**
      * send request to Doris FE and get response json string.
@@ -478,14 +480,17 @@ public class RestService implements Serializable {
      * @param logger slf4j logger
      * @return the chosen one Doris BE node
      * @throws IllegalArgumentException BE nodes is illegal
+     * Deprecated, use randomBackendV2 instead
      */
+    @Deprecated
+    @VisibleForTesting
     public static String randomBackend(SparkSettings sparkSettings , Logger 
logger) throws DorisException, IOException {
         String feNodes = sparkSettings.getProperty(DORIS_FENODES);
         String feNode = randomEndpoint(feNodes, logger);
-        String beUrl =   String.format("http://%s"; + BACKENDS,feNode);
+        String beUrl =   String.format("http://%s"; + BACKENDS, feNode);
         HttpGet httpGet = new HttpGet(beUrl);
-        String response = send(sparkSettings,httpGet, logger);
-        logger.info("Backend Info:{}",response);
+        String response = send(sparkSettings, httpGet, logger);
+        logger.info("Backend Info:{}", response);
         List<BackendRow> backends = parseBackend(response, logger);
         logger.trace("Parse beNodes '{}'.", backends);
         if (backends == null || backends.isEmpty()) {
@@ -497,7 +502,6 @@ public class RestService implements Serializable {
         return backend.getIP() + ":" + backend.getHttpPort();
     }
 
-
     /**
      * translate Doris FE response to inner {@link BackendRow} struct.
      * @param response Doris FE response
@@ -505,6 +509,7 @@ public class RestService implements Serializable {
      * @return inner {@link List<BackendRow>} struct
      * @throws DorisException,IOException throw when translate failed
      * */
+    @Deprecated
     @VisibleForTesting
     static List<BackendRow> parseBackend(String response, Logger logger) 
throws DorisException, IOException {
         com.fasterxml.jackson.databind.ObjectMapper mapper = new 
com.fasterxml.jackson.databind.ObjectMapper();
@@ -535,6 +540,59 @@ public class RestService implements Serializable {
     }
 
     /**
+     * choice a Doris BE node to request.
+     * @param logger slf4j logger
+     * @return the chosen one Doris BE node
+     * @throws IllegalArgumentException BE nodes is illegal
+     */
+    @VisibleForTesting
+    public static String randomBackendV2(SparkSettings sparkSettings, Logger 
logger) throws DorisException {
+        String feNodes = sparkSettings.getProperty(DORIS_FENODES);
+        String feNode = randomEndpoint(feNodes, logger);
+        String beUrl =   String.format("http://%s"; + BACKENDS_V2, feNode);
+        HttpGet httpGet = new HttpGet(beUrl);
+        String response = send(sparkSettings, httpGet, logger);
+        logger.info("Backend Info:{}", response);
+        List<BackendV2.BackendRowV2> backends = parseBackendV2(response, 
logger);
+        logger.trace("Parse beNodes '{}'.", backends);
+        if (backends == null || backends.isEmpty()) {
+            logger.error(ILLEGAL_ARGUMENT_MESSAGE, "beNodes", backends);
+            throw new IllegalArgumentException("beNodes", 
String.valueOf(backends));
+        }
+        Collections.shuffle(backends);
+        BackendV2.BackendRowV2 backend = backends.get(0);
+        return backend.getIp() + ":" + backend.getHttpPort();
+    }
+
+    static List<BackendV2.BackendRowV2> parseBackendV2(String response, Logger 
logger) throws DorisException {
+        com.fasterxml.jackson.databind.ObjectMapper mapper = new 
com.fasterxml.jackson.databind.ObjectMapper();
+        BackendV2 backend;
+        try {
+            backend = mapper.readValue(response, BackendV2.class);
+        } catch (com.fasterxml.jackson.core.JsonParseException e) {
+            String errMsg = "Doris BE's response is not a json. res: " + 
response;
+            logger.error(errMsg, e);
+            throw new DorisException(errMsg, e);
+        } catch (com.fasterxml.jackson.databind.JsonMappingException e) {
+            String errMsg = "Doris BE's response cannot map to schema. res: " 
+ response;
+            logger.error(errMsg, e);
+            throw new DorisException(errMsg, e);
+        } catch (IOException e) {
+            String errMsg = "Parse Doris BE's response to json failed. res: " 
+ response;
+            logger.error(errMsg, e);
+            throw new DorisException(errMsg, e);
+        }
+
+        if (backend == null) {
+            logger.error(SHOULD_NOT_HAPPEN_MESSAGE);
+            throw new ShouldNeverHappenException();
+        }
+        List<BackendV2.BackendRowV2> backendRows = backend.getBackends();
+        logger.debug("Parsing schema result is '{}'.", backendRows);
+        return backendRows;
+    }
+
+    /**
      * translate BE tablets map to Doris RDD partition.
      * @param cfg configuration of request
      * @param be2Tablets BE to tablets {@link Map}
diff --git a/src/main/java/org/apache/doris/spark/rest/models/Backend.java 
b/src/main/java/org/apache/doris/spark/rest/models/Backend.java
index 122e71c..322202d 100644
--- a/src/main/java/org/apache/doris/spark/rest/models/Backend.java
+++ b/src/main/java/org/apache/doris/spark/rest/models/Backend.java
@@ -23,6 +23,7 @@ import java.util.List;
 /**
  * Be response model
  **/
+@Deprecated
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class Backend {
 
diff --git a/src/main/java/org/apache/doris/spark/rest/models/BackendRow.java 
b/src/main/java/org/apache/doris/spark/rest/models/BackendRow.java
index 0e2b385..a84ad2c 100644
--- a/src/main/java/org/apache/doris/spark/rest/models/BackendRow.java
+++ b/src/main/java/org/apache/doris/spark/rest/models/BackendRow.java
@@ -15,8 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 package org.apache.doris.spark.rest.models;
+
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonProperty;
+
+@Deprecated
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class BackendRow {
 
diff --git a/src/main/java/org/apache/doris/spark/rest/models/Backend.java 
b/src/main/java/org/apache/doris/spark/rest/models/BackendV2.java
similarity index 52%
copy from src/main/java/org/apache/doris/spark/rest/models/Backend.java
copy to src/main/java/org/apache/doris/spark/rest/models/BackendV2.java
index 122e71c..75a2514 100644
--- a/src/main/java/org/apache/doris/spark/rest/models/Backend.java
+++ b/src/main/java/org/apache/doris/spark/rest/models/BackendV2.java
@@ -24,16 +24,49 @@ import java.util.List;
  * Be response model
  **/
 @JsonIgnoreProperties(ignoreUnknown = true)
-public class Backend {
+public class BackendV2 {
 
-    @JsonProperty(value = "rows")
-    private List<BackendRow> rows;
+    @JsonProperty(value = "backends")
+    private List<BackendRowV2> backends;
 
-    public List<BackendRow> getRows() {
-        return rows;
+    public List<BackendRowV2> getBackends() {
+        return backends;
     }
 
-    public void setRows(List<BackendRow> rows) {
-        this.rows = rows;
+    public void setRows(List<BackendRowV2> rows) {
+        this.backends = rows;
+    }
+
+    public static class BackendRowV2 {
+        @JsonProperty("ip")
+        public String ip;
+        @JsonProperty("http_port")
+        public int httpPort;
+        @JsonProperty("is_alive")
+        public boolean isAlive;
+
+        public String getIp() {
+            return ip;
+        }
+
+        public void setIp(String ip) {
+            this.ip = ip;
+        }
+
+        public int getHttpPort() {
+            return httpPort;
+        }
+
+        public void setHttpPort(int httpPort) {
+            this.httpPort = httpPort;
+        }
+
+        public boolean isAlive() {
+            return isAlive;
+        }
+
+        public void setAlive(boolean alive) {
+            isAlive = alive;
+        }
     }
 }
diff --git a/src/test/java/org/apache/doris/spark/rest/TestRestService.java 
b/src/test/java/org/apache/doris/spark/rest/TestRestService.java
index 484be45..22d542a 100644
--- a/src/test/java/org/apache/doris/spark/rest/TestRestService.java
+++ b/src/test/java/org/apache/doris/spark/rest/TestRestService.java
@@ -38,6 +38,7 @@ import org.apache.doris.spark.cfg.Settings;
 import org.apache.doris.spark.exception.DorisException;
 import org.apache.doris.spark.exception.IllegalArgumentException;
 import org.apache.doris.spark.rest.models.BackendRow;
+import org.apache.doris.spark.rest.models.BackendV2;
 import org.apache.doris.spark.rest.models.Field;
 import org.apache.doris.spark.rest.models.QueryPlan;
 import org.apache.doris.spark.rest.models.Schema;
@@ -49,6 +50,8 @@ import org.junit.rules.ExpectedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import jdk.nashorn.internal.ir.annotations.Ignore;
+
 public class TestRestService {
     private static Logger logger = 
LoggerFactory.getLogger(TestRestService.class);
 
@@ -295,7 +298,8 @@ public class TestRestService {
         Assert.assertEquals(expected, actual);
     }
 
-    @Test
+    @Deprecated
+    @Ignore
     public void testParseBackend() throws Exception {
         String response = 
"{\"href_columns\":[\"BackendId\"],\"parent_url\":\"/rest/v1/system?path=/\"," +
                 
"\"column_names\":[\"BackendId\",\"Cluster\",\"IP\",\"HostName\",\"HeartbeatPort\",\"BePort\","
 +
@@ -313,4 +317,11 @@ public class TestRestService {
         List<BackendRow> backendRows = RestService.parseBackend(response, 
logger);
         Assert.assertTrue(backendRows != null && !backendRows.isEmpty());
     }
+
+    @Test
+    public void testParseBackendV2() throws Exception {
+        String response = 
"{\"backends\":[{\"ip\":\"192.168.1.1\",\"http_port\":8042,\"is_alive\":true}, 
{\"ip\":\"192.168.1.2\",\"http_port\":8042,\"is_alive\":true}]}";
+        List<BackendV2.BackendRowV2> backendRows = 
RestService.parseBackendV2(response, logger);
+        Assert.assertEquals(2, backendRows.size());
+    }
 }
diff --git a/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala 
b/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala
index e0d39af..be54aa9 100644
--- a/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala
+++ b/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala
@@ -19,8 +19,12 @@ package org.apache.doris.spark.sql
 
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.{SparkConf, SparkContext}
+import org.junit.Ignore;
 import org.junit.Test
 
+// This test need real connect info to run.
+// Set the connect info before comment out this @Ignore
+@Ignore
 class TestSparkConnector {
   val dorisFeNodes = "your_fe_host:8030"
   val dorisUser = "root"
@@ -107,3 +111,4 @@ class TestSparkConnector {
       .start().awaitTermination()
   }
 }
+

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to