This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git
commit 27b72a734db7f7aa78a6fea091ca3c72b1f18198 Author: Mingyu Chen <morningman....@gmail.com> AuthorDate: Fri Nov 5 09:43:06 2021 +0800 [HTTP][API] Add backends info API for spark/flink connector (#6984) Doris should provide a http api to return backends list for connectors to submit stream load, and without privilege checking, which can let common user to use it --- build.sh | 20 ++++++- pom_3.0.xml | 6 ++ .../org/apache/doris/spark/DorisStreamLoad.java | 2 +- .../org/apache/doris/spark/rest/RestService.java | 68 ++++++++++++++++++++-- .../apache/doris/spark/rest/models/Backend.java | 1 + .../apache/doris/spark/rest/models/BackendRow.java | 3 + .../rest/models/{Backend.java => BackendV2.java} | 47 ++++++++++++--- .../apache/doris/spark/rest/TestRestService.java | 13 ++++- .../doris/spark/sql/TestSparkConnector.scala | 5 ++ 9 files changed, 148 insertions(+), 17 deletions(-) diff --git a/build.sh b/build.sh index b4ea042..d6ba435 100755 --- a/build.sh +++ b/build.sh @@ -39,7 +39,6 @@ fi # check maven MVN_CMD=mvn - if [[ ! -z ${CUSTOM_MVN} ]]; then MVN_CMD=${CUSTOM_MVN} fi @@ -48,11 +47,26 @@ if ! ${MVN_CMD} --version; then exit 1 fi export MVN_CMD -if [ $1 == 3 ] + +usage() { + echo " + Eg. + $0 2 build with spark 2.x + $0 3 build with spark 3.x + " + exit 1 +} + +if [ $# == 0 ]; then + usage +fi + + +if [ "$1"x == "3x" ] then ${MVN_CMD} clean package -f pom_3.0.xml fi -if [ $1 == 2 ] +if [ "$1"x == "2x" ] then ${MVN_CMD} clean package fi diff --git a/pom_3.0.xml b/pom_3.0.xml index 6c8eee5..d208ad0 100644 --- a/pom_3.0.xml +++ b/pom_3.0.xml @@ -161,6 +161,12 @@ <version>4.1.27.Final</version> <scope>provided</scope> </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.12</version> + <scope>test</scope> + </dependency> </dependencies> diff --git a/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/src/main/java/org/apache/doris/spark/DorisStreamLoad.java index 117825c..4411fbc 100644 --- a/src/main/java/org/apache/doris/spark/DorisStreamLoad.java +++ b/src/main/java/org/apache/doris/spark/DorisStreamLoad.java @@ -75,7 +75,7 @@ public class DorisStreamLoad implements Serializable{ } public DorisStreamLoad(SparkSettings settings) throws IOException, DorisException { - String hostPort = RestService.randomBackend(settings, LOG); + String hostPort = RestService.randomBackendV2(settings, LOG); this.hostPort = hostPort; String[] dbTable = settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\."); this.db = dbTable[0]; diff --git a/src/main/java/org/apache/doris/spark/rest/RestService.java b/src/main/java/org/apache/doris/spark/rest/RestService.java index bb91538..e8a2956 100644 --- a/src/main/java/org/apache/doris/spark/rest/RestService.java +++ b/src/main/java/org/apache/doris/spark/rest/RestService.java @@ -62,6 +62,7 @@ import org.apache.doris.spark.exception.IllegalArgumentException; import org.apache.doris.spark.exception.ShouldNeverHappenException; import org.apache.doris.spark.rest.models.Backend; import org.apache.doris.spark.rest.models.BackendRow; +import org.apache.doris.spark.rest.models.BackendV2; import org.apache.doris.spark.rest.models.QueryPlan; import org.apache.doris.spark.rest.models.Schema; import org.apache.doris.spark.rest.models.Tablet; @@ -86,8 +87,9 @@ public class RestService implements Serializable { private static final String API_PREFIX = "/api"; private static final String SCHEMA = "_schema"; private static final String QUERY_PLAN = "_query_plan"; + @Deprecated private static final String BACKENDS = "/rest/v1/system?path=//backends"; - + private static final String BACKENDS_V2 = "/api/backends?is_alive=true"; /** * send request to Doris FE and get response json string. @@ -478,14 +480,17 @@ public class RestService implements Serializable { * @param logger slf4j logger * @return the chosen one Doris BE node * @throws IllegalArgumentException BE nodes is illegal + * Deprecated, use randomBackendV2 instead */ + @Deprecated + @VisibleForTesting public static String randomBackend(SparkSettings sparkSettings , Logger logger) throws DorisException, IOException { String feNodes = sparkSettings.getProperty(DORIS_FENODES); String feNode = randomEndpoint(feNodes, logger); - String beUrl = String.format("http://%s" + BACKENDS,feNode); + String beUrl = String.format("http://%s" + BACKENDS, feNode); HttpGet httpGet = new HttpGet(beUrl); - String response = send(sparkSettings,httpGet, logger); - logger.info("Backend Info:{}",response); + String response = send(sparkSettings, httpGet, logger); + logger.info("Backend Info:{}", response); List<BackendRow> backends = parseBackend(response, logger); logger.trace("Parse beNodes '{}'.", backends); if (backends == null || backends.isEmpty()) { @@ -497,7 +502,6 @@ public class RestService implements Serializable { return backend.getIP() + ":" + backend.getHttpPort(); } - /** * translate Doris FE response to inner {@link BackendRow} struct. * @param response Doris FE response @@ -505,6 +509,7 @@ public class RestService implements Serializable { * @return inner {@link List<BackendRow>} struct * @throws DorisException,IOException throw when translate failed * */ + @Deprecated @VisibleForTesting static List<BackendRow> parseBackend(String response, Logger logger) throws DorisException, IOException { com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper(); @@ -535,6 +540,59 @@ public class RestService implements Serializable { } /** + * choice a Doris BE node to request. + * @param logger slf4j logger + * @return the chosen one Doris BE node + * @throws IllegalArgumentException BE nodes is illegal + */ + @VisibleForTesting + public static String randomBackendV2(SparkSettings sparkSettings, Logger logger) throws DorisException { + String feNodes = sparkSettings.getProperty(DORIS_FENODES); + String feNode = randomEndpoint(feNodes, logger); + String beUrl = String.format("http://%s" + BACKENDS_V2, feNode); + HttpGet httpGet = new HttpGet(beUrl); + String response = send(sparkSettings, httpGet, logger); + logger.info("Backend Info:{}", response); + List<BackendV2.BackendRowV2> backends = parseBackendV2(response, logger); + logger.trace("Parse beNodes '{}'.", backends); + if (backends == null || backends.isEmpty()) { + logger.error(ILLEGAL_ARGUMENT_MESSAGE, "beNodes", backends); + throw new IllegalArgumentException("beNodes", String.valueOf(backends)); + } + Collections.shuffle(backends); + BackendV2.BackendRowV2 backend = backends.get(0); + return backend.getIp() + ":" + backend.getHttpPort(); + } + + static List<BackendV2.BackendRowV2> parseBackendV2(String response, Logger logger) throws DorisException { + com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper(); + BackendV2 backend; + try { + backend = mapper.readValue(response, BackendV2.class); + } catch (com.fasterxml.jackson.core.JsonParseException e) { + String errMsg = "Doris BE's response is not a json. res: " + response; + logger.error(errMsg, e); + throw new DorisException(errMsg, e); + } catch (com.fasterxml.jackson.databind.JsonMappingException e) { + String errMsg = "Doris BE's response cannot map to schema. res: " + response; + logger.error(errMsg, e); + throw new DorisException(errMsg, e); + } catch (IOException e) { + String errMsg = "Parse Doris BE's response to json failed. res: " + response; + logger.error(errMsg, e); + throw new DorisException(errMsg, e); + } + + if (backend == null) { + logger.error(SHOULD_NOT_HAPPEN_MESSAGE); + throw new ShouldNeverHappenException(); + } + List<BackendV2.BackendRowV2> backendRows = backend.getBackends(); + logger.debug("Parsing schema result is '{}'.", backendRows); + return backendRows; + } + + /** * translate BE tablets map to Doris RDD partition. * @param cfg configuration of request * @param be2Tablets BE to tablets {@link Map} diff --git a/src/main/java/org/apache/doris/spark/rest/models/Backend.java b/src/main/java/org/apache/doris/spark/rest/models/Backend.java index 122e71c..322202d 100644 --- a/src/main/java/org/apache/doris/spark/rest/models/Backend.java +++ b/src/main/java/org/apache/doris/spark/rest/models/Backend.java @@ -23,6 +23,7 @@ import java.util.List; /** * Be response model **/ +@Deprecated @JsonIgnoreProperties(ignoreUnknown = true) public class Backend { diff --git a/src/main/java/org/apache/doris/spark/rest/models/BackendRow.java b/src/main/java/org/apache/doris/spark/rest/models/BackendRow.java index 0e2b385..a84ad2c 100644 --- a/src/main/java/org/apache/doris/spark/rest/models/BackendRow.java +++ b/src/main/java/org/apache/doris/spark/rest/models/BackendRow.java @@ -15,8 +15,11 @@ // specific language governing permissions and limitations // under the License. package org.apache.doris.spark.rest.models; + import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; + +@Deprecated @JsonIgnoreProperties(ignoreUnknown = true) public class BackendRow { diff --git a/src/main/java/org/apache/doris/spark/rest/models/Backend.java b/src/main/java/org/apache/doris/spark/rest/models/BackendV2.java similarity index 52% copy from src/main/java/org/apache/doris/spark/rest/models/Backend.java copy to src/main/java/org/apache/doris/spark/rest/models/BackendV2.java index 122e71c..75a2514 100644 --- a/src/main/java/org/apache/doris/spark/rest/models/Backend.java +++ b/src/main/java/org/apache/doris/spark/rest/models/BackendV2.java @@ -24,16 +24,49 @@ import java.util.List; * Be response model **/ @JsonIgnoreProperties(ignoreUnknown = true) -public class Backend { +public class BackendV2 { - @JsonProperty(value = "rows") - private List<BackendRow> rows; + @JsonProperty(value = "backends") + private List<BackendRowV2> backends; - public List<BackendRow> getRows() { - return rows; + public List<BackendRowV2> getBackends() { + return backends; } - public void setRows(List<BackendRow> rows) { - this.rows = rows; + public void setRows(List<BackendRowV2> rows) { + this.backends = rows; + } + + public static class BackendRowV2 { + @JsonProperty("ip") + public String ip; + @JsonProperty("http_port") + public int httpPort; + @JsonProperty("is_alive") + public boolean isAlive; + + public String getIp() { + return ip; + } + + public void setIp(String ip) { + this.ip = ip; + } + + public int getHttpPort() { + return httpPort; + } + + public void setHttpPort(int httpPort) { + this.httpPort = httpPort; + } + + public boolean isAlive() { + return isAlive; + } + + public void setAlive(boolean alive) { + isAlive = alive; + } } } diff --git a/src/test/java/org/apache/doris/spark/rest/TestRestService.java b/src/test/java/org/apache/doris/spark/rest/TestRestService.java index 484be45..22d542a 100644 --- a/src/test/java/org/apache/doris/spark/rest/TestRestService.java +++ b/src/test/java/org/apache/doris/spark/rest/TestRestService.java @@ -38,6 +38,7 @@ import org.apache.doris.spark.cfg.Settings; import org.apache.doris.spark.exception.DorisException; import org.apache.doris.spark.exception.IllegalArgumentException; import org.apache.doris.spark.rest.models.BackendRow; +import org.apache.doris.spark.rest.models.BackendV2; import org.apache.doris.spark.rest.models.Field; import org.apache.doris.spark.rest.models.QueryPlan; import org.apache.doris.spark.rest.models.Schema; @@ -49,6 +50,8 @@ import org.junit.rules.ExpectedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import jdk.nashorn.internal.ir.annotations.Ignore; + public class TestRestService { private static Logger logger = LoggerFactory.getLogger(TestRestService.class); @@ -295,7 +298,8 @@ public class TestRestService { Assert.assertEquals(expected, actual); } - @Test + @Deprecated + @Ignore public void testParseBackend() throws Exception { String response = "{\"href_columns\":[\"BackendId\"],\"parent_url\":\"/rest/v1/system?path=/\"," + "\"column_names\":[\"BackendId\",\"Cluster\",\"IP\",\"HostName\",\"HeartbeatPort\",\"BePort\"," + @@ -313,4 +317,11 @@ public class TestRestService { List<BackendRow> backendRows = RestService.parseBackend(response, logger); Assert.assertTrue(backendRows != null && !backendRows.isEmpty()); } + + @Test + public void testParseBackendV2() throws Exception { + String response = "{\"backends\":[{\"ip\":\"192.168.1.1\",\"http_port\":8042,\"is_alive\":true}, {\"ip\":\"192.168.1.2\",\"http_port\":8042,\"is_alive\":true}]}"; + List<BackendV2.BackendRowV2> backendRows = RestService.parseBackendV2(response, logger); + Assert.assertEquals(2, backendRows.size()); + } } diff --git a/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala b/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala index e0d39af..be54aa9 100644 --- a/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala +++ b/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala @@ -19,8 +19,12 @@ package org.apache.doris.spark.sql import org.apache.spark.sql.SparkSession import org.apache.spark.{SparkConf, SparkContext} +import org.junit.Ignore; import org.junit.Test +// This test need real connect info to run. +// Set the connect info before comment out this @Ignore +@Ignore class TestSparkConnector { val dorisFeNodes = "your_fe_host:8030" val dorisUser = "root" @@ -107,3 +111,4 @@ class TestSparkConnector { .start().awaitTermination() } } + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org