This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git

commit d8ad1756cc22324bc7e5ec89b53c0127e4b6d271
Author: wei zhao <zhaowei_3...@163.com>
AuthorDate: Tue Nov 2 16:35:29 2021 +0800

    [Feature] Spark connector supports to specify fields to write (#6973)
    
    1. By default , Spark connector must write all fields value to `Doris` 
table .
    In this feature , user can specify part of fields to write ,  even specify 
the order of the fields to write.
    
    eg:
    I have a table named `student` which has three columns (name,gender,age) ,
    creating table sql as following:
    ```sql
    create table student (name varchar(255), gender varchar(10), age int) 
duplicate key (name) distributed by hash(name) buckets 2;
    ```
    Now , I just want  to write values to two columns : name , gender.
    The code as following:
    ```scala
        val df = spark.createDataFrame(Seq(
          ("m", "zhangsan"),
          ("f", "lisi"),
          ("m", "wangwu")
        ))
        df.write
          .format("doris")
          .option("doris.fenodes", dorisFeNodes)
          .option("doris.table.identifier", dorisTable)
          .option("user", dorisUser)
          .option("password", dorisPwd)
          //specify your fields or the order
          .option("doris.write.field", "gender,name")
          .save()
    ```
---
 .../org/apache/doris/spark/DorisStreamLoad.java    |   5 +
 .../doris/spark/cfg/ConfigurationOptions.java      |   2 +
 .../org/apache/doris/spark/rest/RestService.java   | 129 +++++++--------------
 .../doris/spark/sql/DataframeSinkDoris.scala       |  46 --------
 .../doris/spark/sql/SparkDorisConnector.scala      |  44 -------
 .../doris/spark/sql/TestSparkConnector.scala       | 109 +++++++++++++++++
 .../doris/spark/sql/TestStreamSinkDoris.scala      |  53 ---------
 7 files changed, 157 insertions(+), 231 deletions(-)

diff --git a/src/main/java/org/apache/doris/spark/DorisStreamLoad.java 
b/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
index ccf3a5e..117825c 100644
--- a/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
+++ b/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
@@ -62,6 +62,7 @@ public class DorisStreamLoad implements Serializable{
     private String db;
     private String tbl;
     private String authEncoding;
+    private String columns;
 
     public DorisStreamLoad(String hostPort, String db, String tbl, String 
user, String passwd) {
         this.hostPort = hostPort;
@@ -83,6 +84,7 @@ public class DorisStreamLoad implements Serializable{
         this.passwd = 
settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD);
         this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
         this.authEncoding = 
Base64.getEncoder().encodeToString(String.format("%s:%s", user, 
passwd).getBytes(StandardCharsets.UTF_8));
+        this.columns = 
settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS);
     }
 
     public String getLoadUrlStr() {
@@ -108,6 +110,9 @@ public class DorisStreamLoad implements Serializable{
         conn.addRequestProperty("Expect", "100-continue");
         conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
         conn.addRequestProperty("label", label);
+        if (columns != null && !columns.equals("")) {
+            conn.addRequestProperty("columns", columns);
+        }
         conn.setDoOutput(true);
         conn.setDoInput(true);
         return conn;
diff --git a/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java 
b/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
index 1bb5dfc..93b16f9 100644
--- a/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
+++ b/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
@@ -63,4 +63,6 @@ public interface ConfigurationOptions {
 
     String DORIS_DESERIALIZE_QUEUE_SIZE = "doris.deserialize.queue.size";
     int DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64;
+
+    String DORIS_WRITE_FIELDS = "doris.write.fields";
 }
diff --git a/src/main/java/org/apache/doris/spark/rest/RestService.java 
b/src/main/java/org/apache/doris/spark/rest/RestService.java
index 0c9b5c4..dce540c 100644
--- a/src/main/java/org/apache/doris/spark/rest/RestService.java
+++ b/src/main/java/org/apache/doris/spark/rest/RestService.java
@@ -31,27 +31,19 @@ import static 
org.apache.doris.spark.util.ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE
 import static 
org.apache.doris.spark.util.ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE;
 import static 
org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE;
 
-import java.io.BufferedReader;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.PrintWriter;
 import java.io.Serializable;
-import java.net.HttpURLConnection;
-import java.net.URL;
 import java.nio.charset.StandardCharsets;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.Base64;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
-import java.util.ArrayList;
+import java.util.Map;
 import java.util.Set;
-import java.util.HashSet;
 import java.util.stream.Collectors;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.doris.spark.cfg.ConfigurationOptions;
 import org.apache.doris.spark.cfg.Settings;
@@ -60,17 +52,22 @@ import 
org.apache.doris.spark.exception.ConnectedFailedException;
 import org.apache.doris.spark.exception.DorisException;
 import org.apache.doris.spark.exception.IllegalArgumentException;
 import org.apache.doris.spark.exception.ShouldNeverHappenException;
-import org.apache.doris.spark.rest.models.Backend;
-import org.apache.doris.spark.rest.models.BackendRow;
-import org.apache.doris.spark.rest.models.QueryPlan;
-import org.apache.doris.spark.rest.models.Schema;
-import org.apache.doris.spark.rest.models.Tablet;
+import org.apache.doris.spark.rest.models.*;
 import org.apache.http.HttpStatus;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
 import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.client.protocol.HttpClientContext;
 import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
 import org.codehaus.jackson.JsonParseException;
 import org.codehaus.jackson.map.JsonMappingException;
 import org.codehaus.jackson.map.ObjectMapper;
@@ -86,8 +83,7 @@ public class RestService implements Serializable {
     private static final String API_PREFIX = "/api";
     private static final String SCHEMA = "_schema";
     private static final String QUERY_PLAN = "_query_plan";
-    private static final String BACKENDS = "/rest/v1/system?path=//backends";
-
+    private static final String BACKENDS = "/api/show_proc?path=//backends";
 
     /**
      * send request to Doris FE and get response json string.
@@ -114,37 +110,36 @@ public class RestService implements Serializable {
                 .build();
 
         request.setConfig(requestConfig);
+
         String user = cfg.getProperty(DORIS_REQUEST_AUTH_USER, "");
         String password = cfg.getProperty(DORIS_REQUEST_AUTH_PASSWORD, "");
+
+        CredentialsProvider credentialsProvider = new 
BasicCredentialsProvider();
+        credentialsProvider.setCredentials(
+                AuthScope.ANY,
+                new UsernamePasswordCredentials(user, password));
+        HttpClientContext context = HttpClientContext.create();
+        context.setCredentialsProvider(credentialsProvider);
         logger.info("Send request to Doris FE '{}' with user '{}'.", 
request.getURI(), user);
+
         IOException ex = null;
         int statusCode = -1;
 
         for (int attempt = 0; attempt < retries; attempt++) {
+            CloseableHttpClient httpClient = HttpClients.createDefault();
             logger.debug("Attempt {} to request {}.", attempt, 
request.getURI());
             try {
-                String response;
-                if (request instanceof HttpGet){
-                    response = getConnectionGet(request.getURI().toString(), 
user, password,logger);
-                } else {
-                    response = getConnectionPost(request,user, 
password,logger);
-                }
-                if (response == null) {
+                CloseableHttpResponse response = httpClient.execute(request, 
context);
+                statusCode = response.getStatusLine().getStatusCode();
+                if (statusCode != HttpStatus.SC_OK) {
                     logger.warn("Failed to get response from Doris FE {}, http 
code is {}",
                             request.getURI(), statusCode);
                     continue;
                 }
+                String res = EntityUtils.toString(response.getEntity(), 
StandardCharsets.UTF_8);
                 logger.trace("Success get response from Doris FE: {}, response 
is: {}.",
-                        request.getURI(), response);
-                ObjectMapper mapper = new ObjectMapper();
-                Map map = mapper.readValue(response, Map.class);
-                //Handle the problem of inconsistent data format returned by 
http v1 and v2
-                if (map.containsKey("code") && map.containsKey("msg")) {
-                    Object data = map.get("data");
-                    return mapper.writeValueAsString(data);
-                } else {
-                    return response;
-                }
+                        request.getURI(), res);
+                return res;
             } catch (IOException e) {
                 ex = e;
                 logger.warn(CONNECT_FAILED_MESSAGE, request.getURI(), e);
@@ -155,54 +150,6 @@ public class RestService implements Serializable {
         throw new ConnectedFailedException(request.getURI().toString(), 
statusCode, ex);
     }
 
-    private static String getConnectionGet(String request,String user, String 
passwd,Logger logger) throws IOException {
-        URL realUrl = new URL(request);
-        // open connection
-        HttpURLConnection connection = 
(HttpURLConnection)realUrl.openConnection();
-        String authEncoding = 
Base64.getEncoder().encodeToString(String.format("%s:%s", user, 
passwd).getBytes(StandardCharsets.UTF_8));
-        connection.setRequestProperty("Authorization", "Basic " + 
authEncoding);
-
-        connection.connect();
-        return parseResponse(connection,logger);
-    }
-
-    private static String parseResponse(HttpURLConnection connection,Logger 
logger) throws IOException {
-        if (connection.getResponseCode() != HttpStatus.SC_OK) {
-            logger.warn("Failed to get response from Doris  {}, http code is 
{}",
-                    connection.getURL(), connection.getResponseCode());
-            throw new IOException("Failed to get response from Doris");
-        }
-        String result = "";
-        BufferedReader in = new BufferedReader(new 
InputStreamReader(connection.getInputStream(), "utf-8"));
-        String line;
-        while ((line = in.readLine()) != null) {
-            result += line;
-        }
-        if (in != null) {
-            in.close();
-        }
-        return result;
-    }
-
-    private static String getConnectionPost(HttpRequestBase request,String 
user, String passwd,Logger logger) throws IOException {
-        URL url = new URL(request.getURI().toString());
-        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
-        conn.setInstanceFollowRedirects(false);
-        conn.setRequestMethod(request.getMethod());
-        String authEncoding = 
Base64.getEncoder().encodeToString(String.format("%s:%s", user, 
passwd).getBytes(StandardCharsets.UTF_8));
-        conn.setRequestProperty("Authorization", "Basic " + authEncoding);
-        InputStream content = ((HttpPost)request).getEntity().getContent();
-        String res = IOUtils.toString(content);
-        conn.setDoOutput(true);
-        conn.setDoInput(true);
-        PrintWriter out = new PrintWriter(conn.getOutputStream());
-        // send request params
-        out.print(res);
-        // flush
-        out.flush();
-        // read response
-        return parseResponse(conn,logger);
-    }
     /**
      * parse table identifier to array.
      * @param tableIdentifier table identifier string
@@ -502,9 +449,9 @@ public class RestService implements Serializable {
 
     static List<BackendRow> parseBackend(String response, Logger logger) 
throws DorisException, IOException {
         com.fasterxml.jackson.databind.ObjectMapper mapper = new 
com.fasterxml.jackson.databind.ObjectMapper();
-        Backend backend;
+        List<List<String>> backend;
         try {
-            backend = mapper.readValue(response, Backend.class);
+            backend = mapper.readValue(response, List.class);
         } catch (com.fasterxml.jackson.core.JsonParseException e) {
             String errMsg = "Doris BE's response is not a json. res: " + 
response;
             logger.error(errMsg, e);
@@ -523,7 +470,13 @@ public class RestService implements Serializable {
             logger.error(SHOULD_NOT_HAPPEN_MESSAGE);
             throw new ShouldNeverHappenException();
         }
-        List<BackendRow> backendRows = backend.getRows().stream().filter(v -> 
v.getAlive()).collect(Collectors.toList());
+        List<BackendRow> backendRows = backend.stream().map(array -> {
+            BackendRow backendRow = new BackendRow();
+            backendRow.setIP(array.get(2));
+            backendRow.setHttpPort(array.get(6));
+            backendRow.setAlive(Boolean.parseBoolean(array.get(10)));
+            return backendRow;
+        }).filter(v -> v.getAlive()).collect(Collectors.toList());
         logger.debug("Parsing schema result is '{}'.", backendRows);
         return backendRows;
     }
@@ -541,7 +494,7 @@ public class RestService implements Serializable {
      */
     @VisibleForTesting
     static List<PartitionDefinition> tabletsMapToPartition(Settings cfg, 
Map<String, List<Long>> be2Tablets,
-            String opaquedQueryPlan, String database, String table, Logger 
logger)
+                                                           String 
opaquedQueryPlan, String database, String table, Logger logger)
             throws IllegalArgumentException {
         int tabletsSize = tabletCountLimitForOnePartition(cfg, logger);
         List<PartitionDefinition> partitions = new ArrayList<>();
diff --git a/src/test/scala/org/apache/doris/spark/sql/DataframeSinkDoris.scala 
b/src/test/scala/org/apache/doris/spark/sql/DataframeSinkDoris.scala
deleted file mode 100644
index b0df051..0000000
--- a/src/test/scala/org/apache/doris/spark/sql/DataframeSinkDoris.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.doris.spark.sql
-
-import org.apache.spark.sql.SparkSession
-
-object DataframeSinkDoris {
-  def main(args: Array[String]): Unit = {
-    val spark = SparkSession.builder().master("local").getOrCreate()
-
-    import spark.implicits._
-
-    val mockDataDF = List(
-      (3, "440403001005", "21.cn"),
-      (1, "4404030013005", "22.cn"),
-      (33, null, "23.cn")
-    ).toDF("id", "mi_code", "mi_name")
-    mockDataDF.show(5)
-
-    mockDataDF.write.format("doris")
-      .option("feHostPort", "10.211.55.9:8030")
-      .option("dbName", "example_db")
-      .option("tbName", "test_insert_into")
-      .option("maxRowCount", "1000")
-      .option("user", "root")
-      .option("password", "")
-      .save()
-
-
-  }
-
-}
diff --git 
a/src/test/scala/org/apache/doris/spark/sql/SparkDorisConnector.scala 
b/src/test/scala/org/apache/doris/spark/sql/SparkDorisConnector.scala
deleted file mode 100644
index 8e18284..0000000
--- a/src/test/scala/org/apache/doris/spark/sql/SparkDorisConnector.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.spark.sql
-
-import org.apache.spark.{SparkConf, SparkContext}
-
-
-
-object SparkDorisConnector {
-
-    def main(args: Array[String]): Unit = {
-        val sparkConf: SparkConf = new 
SparkConf().setAppName("SparkDorisConnector").setMaster("local[*]")
-        val sc = new SparkContext(sparkConf)
-        sc.setLogLevel("DEBUG")
-        import org.apache.doris.spark._
-        val dorisSparkRDD = sc.dorisRDD(
-            tableIdentifier = Some("db.table1"),
-            cfg = Some(Map(
-                "doris.fenodes" -> "feip:8030",
-                "doris.request.auth.user" -> "root",
-                "doris.request.auth.password" -> ""
-            ))
-        )
-
-        dorisSparkRDD.map(println(_)).count()
-        sc.stop()
-    }
-
-}
diff --git a/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala 
b/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala
new file mode 100644
index 0000000..e0d39af
--- /dev/null
+++ b/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.sql
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.{SparkConf, SparkContext}
+import org.junit.Test
+
+class TestSparkConnector {
+  val dorisFeNodes = "your_fe_host:8030"
+  val dorisUser = "root"
+  val dorisPwd = ""
+  val dorisTable = "test.test_tbl"
+
+  val kafkaServers = ""
+  val kafkaTopics = ""
+
+  @Test
+  def rddReadTest(): Unit = {
+    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]")
+    val sc = new SparkContext(sparkConf)
+    import org.apache.doris.spark._
+    val dorisSparkRDD = sc.dorisRDD(
+      tableIdentifier = Some(dorisTable),
+      cfg = Some(Map(
+        "doris.fenodes" -> dorisFeNodes,
+        "doris.request.auth.user" -> dorisUser,
+        "doris.request.auth.password" -> dorisPwd
+      ))
+    )
+    dorisSparkRDD.map(println(_)).count()
+    sc.stop()
+  }
+
+  @Test
+  def dataframeWriteTest(): Unit = {
+    val session = SparkSession.builder().master("local[*]").getOrCreate()
+    val df = session.createDataFrame(Seq(
+      ("zhangsan", "m"),
+      ("lisi", "f"),
+      ("wangwu", "m")
+    ))
+    df.write
+      .format("doris")
+      .option("doris.fenodes", dorisFeNodes)
+      .option("doris.table.identifier", dorisTable)
+      .option("user", dorisUser)
+      .option("password", dorisPwd)
+      //specify your field
+      .option("doris.write.field", "name,gender")
+      .save()
+    session.stop()
+  }
+
+  @Test
+  def dataframeReadTest(): Unit = {
+    val session = SparkSession.builder().master("local[*]").getOrCreate()
+    val dorisSparkDF = session.read
+      .format("doris")
+      .option("doris.fenodes", dorisFeNodes)
+      .option("doris.table.identifier", dorisTable)
+      .option("user", dorisUser)
+      .option("password", dorisPwd)
+      .load()
+
+    dorisSparkDF.show()
+    session.stop()
+  }
+
+
+  @Test
+  def structuredStreamingWriteTest(): Unit = {
+    val spark = SparkSession.builder()
+      .master("local")
+      .getOrCreate()
+    val df = spark.readStream
+      .option("kafka.bootstrap.servers", kafkaServers)
+      .option("startingOffsets", "latest")
+      .option("subscribe", kafkaTopics)
+      .format("kafka")
+      .option("failOnDataLoss", false)
+      .load()
+
+    df.selectExpr("CAST(timestamp AS STRING)", "CAST(partition as STRING)")
+      .writeStream
+      .format("doris")
+      .option("checkpointLocation", "/tmp/test")
+      .option("doris.table.identifier", dorisTable)
+      .option("doris.fenodes", dorisFeNodes)
+      .option("user", dorisUser)
+      .option("password", dorisPwd)
+      .start().awaitTermination()
+  }
+}
diff --git 
a/src/test/scala/org/apache/doris/spark/sql/TestStreamSinkDoris.scala 
b/src/test/scala/org/apache/doris/spark/sql/TestStreamSinkDoris.scala
deleted file mode 100644
index c62c9fb..0000000
--- a/src/test/scala/org/apache/doris/spark/sql/TestStreamSinkDoris.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.spark.sql
-
-import org.apache.spark.sql.SparkSession
-
-object TestStreamSinkDoris {
-  val kafkaServers = ""
-  val kafkaTopics = ""
-  val dorisFeNodes = "your_doris_host_port"
-  val dorisUser = "root"
-  val dorisPwd = ""
-  val dorisTable = "test.test_tbl"
-
-  def main(args: Array[String]): Unit = {
-    val sparkSession = SparkSession.builder()
-      .master("local")
-      .getOrCreate()
-
-    val dataFrame = sparkSession.readStream
-      .option("kafka.bootstrap.servers", kafkaServers)
-      .option("startingOffsets", "latest")
-      .option("subscribe", kafkaTopics)
-      .format("kafka")
-      .option("failOnDataLoss", false)
-      .load()
-
-    dataFrame.selectExpr("CAST(timestamp AS STRING)", "CAST(partition as 
STRING)")
-      .writeStream
-      .format("doris")
-      .option("checkpointLocation", "/tmp/test")
-      .option("doris.table.identifier", dorisTable)
-      .option("doris.fenodes", dorisFeNodes)
-      .option("user", dorisUser)
-      .option("password", dorisPwd)
-      .start().awaitTermination()
-  }
-}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to