This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new b7549be  Introduce `httpclient` for sending requests in 
DorisStreamLoad (#79)
b7549be is described below

commit b7549be2fb0a146ea496bdf760ecba9271bb630d
Author: Bowen Liang <liangbo...@gf.com.cn>
AuthorDate: Wed Mar 29 09:50:38 2023 +0800

    Introduce `httpclient` for sending requests in DorisStreamLoad (#79)
    
    * refactor DorisStreamLoad to use httpclient
    * fix return empty response message
---
 .../org/apache/doris/spark/DorisStreamLoad.java    | 112 ++++++++++-----------
 1 file changed, 51 insertions(+), 61 deletions(-)

diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
index 2a89858..522e791 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
@@ -21,7 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
-import org.apache.commons.io.IOUtils;
+import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.doris.spark.cfg.ConfigurationOptions;
 import org.apache.doris.spark.cfg.SparkSettings;
@@ -31,12 +31,21 @@ import org.apache.doris.spark.rest.RestService;
 import org.apache.doris.spark.rest.models.BackendV2;
 import org.apache.doris.spark.rest.models.RespContent;
 import org.apache.doris.spark.util.ListUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHeaders;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.BufferedHttpEntity;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.*;
-import java.net.HttpURLConnection;
-import java.net.URL;
+import java.io.IOException;
+import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
@@ -130,38 +139,33 @@ public class DorisStreamLoad implements Serializable {
         }
         return loadUrlStr;
     }
+    private CloseableHttpClient getHttpClient() {
+        HttpClientBuilder httpClientBuilder = HttpClientBuilder.create()
+                .disableRedirectHandling();
+        return httpClientBuilder.build();
+    }
 
-    private HttpURLConnection getConnection(String urlStr, String label) 
throws IOException {
-        URL url = new URL(urlStr);
-        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
-        conn.setInstanceFollowRedirects(false);
-        conn.setRequestMethod("PUT");
-        conn.setRequestProperty("Authorization", "Basic " + authEncoded);
-        conn.addRequestProperty("Expect", "100-continue");
-        conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
-        conn.addRequestProperty("label", label);
-        if (columns != null && !columns.equals("")) {
-            conn.addRequestProperty("columns", columns);
+    private HttpPut getHttpPut(String label, String loadUrlStr) {
+        HttpPut httpPut = new HttpPut(loadUrlStr);
+        httpPut.setHeader(HttpHeaders.AUTHORIZATION, "Basic " + authEncoded);
+        httpPut.setHeader(HttpHeaders.EXPECT, "100-continue");
+        httpPut.setHeader(HttpHeaders.CONTENT_TYPE, "text/plain; 
charset=UTF-8");
+        httpPut.setHeader("label", label);
+        if (StringUtils.isNotBlank(columns)) {
+            httpPut.setHeader("columns", columns);
         }
-
-        if (maxFilterRatio != null && !maxFilterRatio.equals("")) {
-            conn.addRequestProperty("max_filter_ratio", maxFilterRatio);
+        if (StringUtils.isNotBlank(maxFilterRatio)) {
+            httpPut.setHeader("max_filter_ratio", maxFilterRatio);
         }
-
-        conn.setDoOutput(true);
-        conn.setDoInput(true);
-        if (streamLoadProp != null) {
-            streamLoadProp.forEach((k, v) -> {
-                if ("read_json_by_line".equals(k)) {
-                    return;
-                }
-                conn.addRequestProperty(k, v);
-            });
+        if (MapUtils.isNotEmpty(streamLoadProp)) {
+            streamLoadProp.entrySet().stream()
+                    .filter(entry -> 
!"read_json_by_line".equals(entry.getKey()))
+                    .forEach(entry -> httpPut.setHeader(entry.getKey(), 
entry.getValue()));
         }
         if (fileType.equals("json")) {
-            conn.addRequestProperty("strip_outer_array", "true");
+            httpPut.setHeader("strip_outer_array", "true");
         }
-        return conn;
+        return httpPut;
     }
 
     public static class LoadResponse {
@@ -222,7 +226,7 @@ public class DorisStreamLoad implements Serializable {
 
     public void load(String value) throws StreamLoadException {
         LoadResponse loadResponse = loadBatch(value);
-        if (loadResponse.status != 200) {
+        if (loadResponse.status != HttpStatus.SC_OK) {
             LOG.info("Streamload Response HTTP Status Error:{}", loadResponse);
             throw new StreamLoadException("stream load error: " + 
loadResponse.respContent);
         } else {
@@ -247,39 +251,25 @@ public class DorisStreamLoad implements Serializable {
                 calendar.get(Calendar.HOUR_OF_DAY), 
calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),
                 UUID.randomUUID().toString().replaceAll("-", ""));
 
-        String loadUrlStr = String.format(loadUrlPattern, getBackend(), db, 
tbl);
-        LOG.debug("Streamload Request:{} ,Body:{}", loadUrlStr, value);
-        //only to record the BE node in case of an exception
-        this.loadUrlStr = loadUrlStr;
-
-        HttpURLConnection beConn = null;
-        int status = -1;
-        try {
-            // build request and send to new be location
-            beConn = getConnection(loadUrlStr, label);
-            // send data to be
-            try (OutputStream beConnOutputStream = new 
BufferedOutputStream(beConn.getOutputStream())) {
-                IOUtils.write(value, beConnOutputStream, 
StandardCharsets.UTF_8);
-            }
-
-            // get respond
-            status = beConn.getResponseCode();
-            String respMsg = beConn.getResponseMessage();
-            String response;
-            try (InputStream beConnInputStream = beConn.getInputStream()) {
-                response = IOUtils.toString(beConnInputStream, 
StandardCharsets.UTF_8);
-            }
-            return new LoadResponse(status, respMsg, response);
-
-        } catch (Exception e) {
+        int responseHttpStatus = -1;
+        try (CloseableHttpClient httpClient = getHttpClient()) {
+            String loadUrlStr = String.format(loadUrlPattern, getBackend(), 
db, tbl);
+            LOG.debug("Streamload Request:{} ,Body:{}", loadUrlStr, value);
+            //only to record the BE node in case of an exception
+            this.loadUrlStr = loadUrlStr;
+
+            HttpPut httpPut = getHttpPut(label, loadUrlStr);
+            httpPut.setEntity(new StringEntity(value, StandardCharsets.UTF_8));
+            HttpResponse httpResponse = httpClient.execute(httpPut);
+            responseHttpStatus = httpResponse.getStatusLine().getStatusCode();
+            String respMsg = httpResponse.getStatusLine().getReasonPhrase();
+            String response = EntityUtils.toString(new 
BufferedHttpEntity(httpResponse.getEntity()), StandardCharsets.UTF_8);
+            return new LoadResponse(responseHttpStatus, respMsg, response);
+        } catch (IOException e) {
             e.printStackTrace();
             String err = "http request exception,load url : " + loadUrlStr + 
",failed to execute spark streamload with label: " + label;
             LOG.warn(err, e);
-            return new LoadResponse(status, e.getMessage(), err);
-        } finally {
-            if (beConn != null) {
-                beConn.disconnect();
-            }
+            return new LoadResponse(responseHttpStatus, e.getMessage(), err);
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to