This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git
commit d8ad1756cc22324bc7e5ec89b53c0127e4b6d271 Author: wei zhao <zhaowei_3...@163.com> AuthorDate: Tue Nov 2 16:35:29 2021 +0800 [Feature] Spark connector supports to specify fields to write (#6973) 1. By default , Spark connector must write all fields value to `Doris` table . In this feature , user can specify part of fields to write , even specify the order of the fields to write. eg: I have a table named `student` which has three columns (name,gender,age) , creating table sql as following: ```sql create table student (name varchar(255), gender varchar(10), age int) duplicate key (name) distributed by hash(name) buckets 2; ``` Now , I just want to write values to two columns : name , gender. The code as following: ```scala val df = spark.createDataFrame(Seq( ("m", "zhangsan"), ("f", "lisi"), ("m", "wangwu") )) df.write .format("doris") .option("doris.fenodes", dorisFeNodes) .option("doris.table.identifier", dorisTable) .option("user", dorisUser) .option("password", dorisPwd) //specify your fields or the order .option("doris.write.field", "gender,name") .save() ``` --- .../org/apache/doris/spark/DorisStreamLoad.java | 5 + .../doris/spark/cfg/ConfigurationOptions.java | 2 + .../org/apache/doris/spark/rest/RestService.java | 129 +++++++-------------- .../doris/spark/sql/DataframeSinkDoris.scala | 46 -------- .../doris/spark/sql/SparkDorisConnector.scala | 44 ------- .../doris/spark/sql/TestSparkConnector.scala | 109 +++++++++++++++++ .../doris/spark/sql/TestStreamSinkDoris.scala | 53 --------- 7 files changed, 157 insertions(+), 231 deletions(-) diff --git a/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/src/main/java/org/apache/doris/spark/DorisStreamLoad.java index ccf3a5e..117825c 100644 --- a/src/main/java/org/apache/doris/spark/DorisStreamLoad.java +++ b/src/main/java/org/apache/doris/spark/DorisStreamLoad.java @@ -62,6 +62,7 @@ public class DorisStreamLoad implements Serializable{ private String db; private String tbl; private String authEncoding; + private String columns; public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd) { this.hostPort = hostPort; @@ -83,6 +84,7 @@ public class DorisStreamLoad implements Serializable{ this.passwd = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD); this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl); this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8)); + this.columns = settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS); } public String getLoadUrlStr() { @@ -108,6 +110,9 @@ public class DorisStreamLoad implements Serializable{ conn.addRequestProperty("Expect", "100-continue"); conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8"); conn.addRequestProperty("label", label); + if (columns != null && !columns.equals("")) { + conn.addRequestProperty("columns", columns); + } conn.setDoOutput(true); conn.setDoInput(true); return conn; diff --git a/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java index 1bb5dfc..93b16f9 100644 --- a/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java +++ b/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java @@ -63,4 +63,6 @@ public interface ConfigurationOptions { String DORIS_DESERIALIZE_QUEUE_SIZE = "doris.deserialize.queue.size"; int DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64; + + String DORIS_WRITE_FIELDS = "doris.write.fields"; } diff --git a/src/main/java/org/apache/doris/spark/rest/RestService.java b/src/main/java/org/apache/doris/spark/rest/RestService.java index 0c9b5c4..dce540c 100644 --- a/src/main/java/org/apache/doris/spark/rest/RestService.java +++ b/src/main/java/org/apache/doris/spark/rest/RestService.java @@ -31,27 +31,19 @@ import static org.apache.doris.spark.util.ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE import static org.apache.doris.spark.util.ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE; import static org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE; -import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.PrintWriter; import java.io.Serializable; -import java.net.HttpURLConnection; -import java.net.URL; import java.nio.charset.StandardCharsets; -import java.util.Map; -import java.util.HashMap; -import java.util.Base64; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; -import java.util.ArrayList; +import java.util.Map; import java.util.Set; -import java.util.HashSet; import java.util.stream.Collectors; -import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.doris.spark.cfg.ConfigurationOptions; import org.apache.doris.spark.cfg.Settings; @@ -60,17 +52,22 @@ import org.apache.doris.spark.exception.ConnectedFailedException; import org.apache.doris.spark.exception.DorisException; import org.apache.doris.spark.exception.IllegalArgumentException; import org.apache.doris.spark.exception.ShouldNeverHappenException; -import org.apache.doris.spark.rest.models.Backend; -import org.apache.doris.spark.rest.models.BackendRow; -import org.apache.doris.spark.rest.models.QueryPlan; -import org.apache.doris.spark.rest.models.Schema; -import org.apache.doris.spark.rest.models.Tablet; +import org.apache.doris.spark.rest.models.*; import org.apache.http.HttpStatus; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpRequestBase; +import org.apache.http.client.protocol.HttpClientContext; import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; import org.codehaus.jackson.JsonParseException; import org.codehaus.jackson.map.JsonMappingException; import org.codehaus.jackson.map.ObjectMapper; @@ -86,8 +83,7 @@ public class RestService implements Serializable { private static final String API_PREFIX = "/api"; private static final String SCHEMA = "_schema"; private static final String QUERY_PLAN = "_query_plan"; - private static final String BACKENDS = "/rest/v1/system?path=//backends"; - + private static final String BACKENDS = "/api/show_proc?path=//backends"; /** * send request to Doris FE and get response json string. @@ -114,37 +110,36 @@ public class RestService implements Serializable { .build(); request.setConfig(requestConfig); + String user = cfg.getProperty(DORIS_REQUEST_AUTH_USER, ""); String password = cfg.getProperty(DORIS_REQUEST_AUTH_PASSWORD, ""); + + CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials( + AuthScope.ANY, + new UsernamePasswordCredentials(user, password)); + HttpClientContext context = HttpClientContext.create(); + context.setCredentialsProvider(credentialsProvider); logger.info("Send request to Doris FE '{}' with user '{}'.", request.getURI(), user); + IOException ex = null; int statusCode = -1; for (int attempt = 0; attempt < retries; attempt++) { + CloseableHttpClient httpClient = HttpClients.createDefault(); logger.debug("Attempt {} to request {}.", attempt, request.getURI()); try { - String response; - if (request instanceof HttpGet){ - response = getConnectionGet(request.getURI().toString(), user, password,logger); - } else { - response = getConnectionPost(request,user, password,logger); - } - if (response == null) { + CloseableHttpResponse response = httpClient.execute(request, context); + statusCode = response.getStatusLine().getStatusCode(); + if (statusCode != HttpStatus.SC_OK) { logger.warn("Failed to get response from Doris FE {}, http code is {}", request.getURI(), statusCode); continue; } + String res = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8); logger.trace("Success get response from Doris FE: {}, response is: {}.", - request.getURI(), response); - ObjectMapper mapper = new ObjectMapper(); - Map map = mapper.readValue(response, Map.class); - //Handle the problem of inconsistent data format returned by http v1 and v2 - if (map.containsKey("code") && map.containsKey("msg")) { - Object data = map.get("data"); - return mapper.writeValueAsString(data); - } else { - return response; - } + request.getURI(), res); + return res; } catch (IOException e) { ex = e; logger.warn(CONNECT_FAILED_MESSAGE, request.getURI(), e); @@ -155,54 +150,6 @@ public class RestService implements Serializable { throw new ConnectedFailedException(request.getURI().toString(), statusCode, ex); } - private static String getConnectionGet(String request,String user, String passwd,Logger logger) throws IOException { - URL realUrl = new URL(request); - // open connection - HttpURLConnection connection = (HttpURLConnection)realUrl.openConnection(); - String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8)); - connection.setRequestProperty("Authorization", "Basic " + authEncoding); - - connection.connect(); - return parseResponse(connection,logger); - } - - private static String parseResponse(HttpURLConnection connection,Logger logger) throws IOException { - if (connection.getResponseCode() != HttpStatus.SC_OK) { - logger.warn("Failed to get response from Doris {}, http code is {}", - connection.getURL(), connection.getResponseCode()); - throw new IOException("Failed to get response from Doris"); - } - String result = ""; - BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream(), "utf-8")); - String line; - while ((line = in.readLine()) != null) { - result += line; - } - if (in != null) { - in.close(); - } - return result; - } - - private static String getConnectionPost(HttpRequestBase request,String user, String passwd,Logger logger) throws IOException { - URL url = new URL(request.getURI().toString()); - HttpURLConnection conn = (HttpURLConnection) url.openConnection(); - conn.setInstanceFollowRedirects(false); - conn.setRequestMethod(request.getMethod()); - String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8)); - conn.setRequestProperty("Authorization", "Basic " + authEncoding); - InputStream content = ((HttpPost)request).getEntity().getContent(); - String res = IOUtils.toString(content); - conn.setDoOutput(true); - conn.setDoInput(true); - PrintWriter out = new PrintWriter(conn.getOutputStream()); - // send request params - out.print(res); - // flush - out.flush(); - // read response - return parseResponse(conn,logger); - } /** * parse table identifier to array. * @param tableIdentifier table identifier string @@ -502,9 +449,9 @@ public class RestService implements Serializable { static List<BackendRow> parseBackend(String response, Logger logger) throws DorisException, IOException { com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper(); - Backend backend; + List<List<String>> backend; try { - backend = mapper.readValue(response, Backend.class); + backend = mapper.readValue(response, List.class); } catch (com.fasterxml.jackson.core.JsonParseException e) { String errMsg = "Doris BE's response is not a json. res: " + response; logger.error(errMsg, e); @@ -523,7 +470,13 @@ public class RestService implements Serializable { logger.error(SHOULD_NOT_HAPPEN_MESSAGE); throw new ShouldNeverHappenException(); } - List<BackendRow> backendRows = backend.getRows().stream().filter(v -> v.getAlive()).collect(Collectors.toList()); + List<BackendRow> backendRows = backend.stream().map(array -> { + BackendRow backendRow = new BackendRow(); + backendRow.setIP(array.get(2)); + backendRow.setHttpPort(array.get(6)); + backendRow.setAlive(Boolean.parseBoolean(array.get(10))); + return backendRow; + }).filter(v -> v.getAlive()).collect(Collectors.toList()); logger.debug("Parsing schema result is '{}'.", backendRows); return backendRows; } @@ -541,7 +494,7 @@ public class RestService implements Serializable { */ @VisibleForTesting static List<PartitionDefinition> tabletsMapToPartition(Settings cfg, Map<String, List<Long>> be2Tablets, - String opaquedQueryPlan, String database, String table, Logger logger) + String opaquedQueryPlan, String database, String table, Logger logger) throws IllegalArgumentException { int tabletsSize = tabletCountLimitForOnePartition(cfg, logger); List<PartitionDefinition> partitions = new ArrayList<>(); diff --git a/src/test/scala/org/apache/doris/spark/sql/DataframeSinkDoris.scala b/src/test/scala/org/apache/doris/spark/sql/DataframeSinkDoris.scala deleted file mode 100644 index b0df051..0000000 --- a/src/test/scala/org/apache/doris/spark/sql/DataframeSinkDoris.scala +++ /dev/null @@ -1,46 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -package org.apache.doris.spark.sql - -import org.apache.spark.sql.SparkSession - -object DataframeSinkDoris { - def main(args: Array[String]): Unit = { - val spark = SparkSession.builder().master("local").getOrCreate() - - import spark.implicits._ - - val mockDataDF = List( - (3, "440403001005", "21.cn"), - (1, "4404030013005", "22.cn"), - (33, null, "23.cn") - ).toDF("id", "mi_code", "mi_name") - mockDataDF.show(5) - - mockDataDF.write.format("doris") - .option("feHostPort", "10.211.55.9:8030") - .option("dbName", "example_db") - .option("tbName", "test_insert_into") - .option("maxRowCount", "1000") - .option("user", "root") - .option("password", "") - .save() - - - } - -} diff --git a/src/test/scala/org/apache/doris/spark/sql/SparkDorisConnector.scala b/src/test/scala/org/apache/doris/spark/sql/SparkDorisConnector.scala deleted file mode 100644 index 8e18284..0000000 --- a/src/test/scala/org/apache/doris/spark/sql/SparkDorisConnector.scala +++ /dev/null @@ -1,44 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.spark.sql - -import org.apache.spark.{SparkConf, SparkContext} - - - -object SparkDorisConnector { - - def main(args: Array[String]): Unit = { - val sparkConf: SparkConf = new SparkConf().setAppName("SparkDorisConnector").setMaster("local[*]") - val sc = new SparkContext(sparkConf) - sc.setLogLevel("DEBUG") - import org.apache.doris.spark._ - val dorisSparkRDD = sc.dorisRDD( - tableIdentifier = Some("db.table1"), - cfg = Some(Map( - "doris.fenodes" -> "feip:8030", - "doris.request.auth.user" -> "root", - "doris.request.auth.password" -> "" - )) - ) - - dorisSparkRDD.map(println(_)).count() - sc.stop() - } - -} diff --git a/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala b/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala new file mode 100644 index 0000000..e0d39af --- /dev/null +++ b/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.sql + +import org.apache.spark.sql.SparkSession +import org.apache.spark.{SparkConf, SparkContext} +import org.junit.Test + +class TestSparkConnector { + val dorisFeNodes = "your_fe_host:8030" + val dorisUser = "root" + val dorisPwd = "" + val dorisTable = "test.test_tbl" + + val kafkaServers = "" + val kafkaTopics = "" + + @Test + def rddReadTest(): Unit = { + val sparkConf: SparkConf = new SparkConf().setMaster("local[*]") + val sc = new SparkContext(sparkConf) + import org.apache.doris.spark._ + val dorisSparkRDD = sc.dorisRDD( + tableIdentifier = Some(dorisTable), + cfg = Some(Map( + "doris.fenodes" -> dorisFeNodes, + "doris.request.auth.user" -> dorisUser, + "doris.request.auth.password" -> dorisPwd + )) + ) + dorisSparkRDD.map(println(_)).count() + sc.stop() + } + + @Test + def dataframeWriteTest(): Unit = { + val session = SparkSession.builder().master("local[*]").getOrCreate() + val df = session.createDataFrame(Seq( + ("zhangsan", "m"), + ("lisi", "f"), + ("wangwu", "m") + )) + df.write + .format("doris") + .option("doris.fenodes", dorisFeNodes) + .option("doris.table.identifier", dorisTable) + .option("user", dorisUser) + .option("password", dorisPwd) + //specify your field + .option("doris.write.field", "name,gender") + .save() + session.stop() + } + + @Test + def dataframeReadTest(): Unit = { + val session = SparkSession.builder().master("local[*]").getOrCreate() + val dorisSparkDF = session.read + .format("doris") + .option("doris.fenodes", dorisFeNodes) + .option("doris.table.identifier", dorisTable) + .option("user", dorisUser) + .option("password", dorisPwd) + .load() + + dorisSparkDF.show() + session.stop() + } + + + @Test + def structuredStreamingWriteTest(): Unit = { + val spark = SparkSession.builder() + .master("local") + .getOrCreate() + val df = spark.readStream + .option("kafka.bootstrap.servers", kafkaServers) + .option("startingOffsets", "latest") + .option("subscribe", kafkaTopics) + .format("kafka") + .option("failOnDataLoss", false) + .load() + + df.selectExpr("CAST(timestamp AS STRING)", "CAST(partition as STRING)") + .writeStream + .format("doris") + .option("checkpointLocation", "/tmp/test") + .option("doris.table.identifier", dorisTable) + .option("doris.fenodes", dorisFeNodes) + .option("user", dorisUser) + .option("password", dorisPwd) + .start().awaitTermination() + } +} diff --git a/src/test/scala/org/apache/doris/spark/sql/TestStreamSinkDoris.scala b/src/test/scala/org/apache/doris/spark/sql/TestStreamSinkDoris.scala deleted file mode 100644 index c62c9fb..0000000 --- a/src/test/scala/org/apache/doris/spark/sql/TestStreamSinkDoris.scala +++ /dev/null @@ -1,53 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.spark.sql - -import org.apache.spark.sql.SparkSession - -object TestStreamSinkDoris { - val kafkaServers = "" - val kafkaTopics = "" - val dorisFeNodes = "your_doris_host_port" - val dorisUser = "root" - val dorisPwd = "" - val dorisTable = "test.test_tbl" - - def main(args: Array[String]): Unit = { - val sparkSession = SparkSession.builder() - .master("local") - .getOrCreate() - - val dataFrame = sparkSession.readStream - .option("kafka.bootstrap.servers", kafkaServers) - .option("startingOffsets", "latest") - .option("subscribe", kafkaTopics) - .format("kafka") - .option("failOnDataLoss", false) - .load() - - dataFrame.selectExpr("CAST(timestamp AS STRING)", "CAST(partition as STRING)") - .writeStream - .format("doris") - .option("checkpointLocation", "/tmp/test") - .option("doris.table.identifier", dorisTable) - .option("doris.fenodes", dorisFeNodes) - .option("user", dorisUser) - .option("password", dorisPwd) - .start().awaitTermination() - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org