This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new b7549be Introduce `httpclient` for sending requests in DorisStreamLoad (#79) b7549be is described below commit b7549be2fb0a146ea496bdf760ecba9271bb630d Author: Bowen Liang <liangbo...@gf.com.cn> AuthorDate: Wed Mar 29 09:50:38 2023 +0800 Introduce `httpclient` for sending requests in DorisStreamLoad (#79) * refactor DorisStreamLoad to use httpclient * fix return empty response message --- .../org/apache/doris/spark/DorisStreamLoad.java | 112 ++++++++++----------- 1 file changed, 51 insertions(+), 61 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java index 2a89858..522e791 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java @@ -21,7 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; -import org.apache.commons.io.IOUtils; +import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; import org.apache.doris.spark.cfg.ConfigurationOptions; import org.apache.doris.spark.cfg.SparkSettings; @@ -31,12 +31,21 @@ import org.apache.doris.spark.rest.RestService; import org.apache.doris.spark.rest.models.BackendV2; import org.apache.doris.spark.rest.models.RespContent; import org.apache.doris.spark.util.ListUtils; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHeaders; +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.BufferedHttpEntity; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.*; -import java.net.HttpURLConnection; -import java.net.URL; +import java.io.IOException; +import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.ExecutionException; @@ -130,38 +139,33 @@ public class DorisStreamLoad implements Serializable { } return loadUrlStr; } + private CloseableHttpClient getHttpClient() { + HttpClientBuilder httpClientBuilder = HttpClientBuilder.create() + .disableRedirectHandling(); + return httpClientBuilder.build(); + } - private HttpURLConnection getConnection(String urlStr, String label) throws IOException { - URL url = new URL(urlStr); - HttpURLConnection conn = (HttpURLConnection) url.openConnection(); - conn.setInstanceFollowRedirects(false); - conn.setRequestMethod("PUT"); - conn.setRequestProperty("Authorization", "Basic " + authEncoded); - conn.addRequestProperty("Expect", "100-continue"); - conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8"); - conn.addRequestProperty("label", label); - if (columns != null && !columns.equals("")) { - conn.addRequestProperty("columns", columns); + private HttpPut getHttpPut(String label, String loadUrlStr) { + HttpPut httpPut = new HttpPut(loadUrlStr); + httpPut.setHeader(HttpHeaders.AUTHORIZATION, "Basic " + authEncoded); + httpPut.setHeader(HttpHeaders.EXPECT, "100-continue"); + httpPut.setHeader(HttpHeaders.CONTENT_TYPE, "text/plain; charset=UTF-8"); + httpPut.setHeader("label", label); + if (StringUtils.isNotBlank(columns)) { + httpPut.setHeader("columns", columns); } - - if (maxFilterRatio != null && !maxFilterRatio.equals("")) { - conn.addRequestProperty("max_filter_ratio", maxFilterRatio); + if (StringUtils.isNotBlank(maxFilterRatio)) { + httpPut.setHeader("max_filter_ratio", maxFilterRatio); } - - conn.setDoOutput(true); - conn.setDoInput(true); - if (streamLoadProp != null) { - streamLoadProp.forEach((k, v) -> { - if ("read_json_by_line".equals(k)) { - return; - } - conn.addRequestProperty(k, v); - }); + if (MapUtils.isNotEmpty(streamLoadProp)) { + streamLoadProp.entrySet().stream() + .filter(entry -> !"read_json_by_line".equals(entry.getKey())) + .forEach(entry -> httpPut.setHeader(entry.getKey(), entry.getValue())); } if (fileType.equals("json")) { - conn.addRequestProperty("strip_outer_array", "true"); + httpPut.setHeader("strip_outer_array", "true"); } - return conn; + return httpPut; } public static class LoadResponse { @@ -222,7 +226,7 @@ public class DorisStreamLoad implements Serializable { public void load(String value) throws StreamLoadException { LoadResponse loadResponse = loadBatch(value); - if (loadResponse.status != 200) { + if (loadResponse.status != HttpStatus.SC_OK) { LOG.info("Streamload Response HTTP Status Error:{}", loadResponse); throw new StreamLoadException("stream load error: " + loadResponse.respContent); } else { @@ -247,39 +251,25 @@ public class DorisStreamLoad implements Serializable { calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND), UUID.randomUUID().toString().replaceAll("-", "")); - String loadUrlStr = String.format(loadUrlPattern, getBackend(), db, tbl); - LOG.debug("Streamload Request:{} ,Body:{}", loadUrlStr, value); - //only to record the BE node in case of an exception - this.loadUrlStr = loadUrlStr; - - HttpURLConnection beConn = null; - int status = -1; - try { - // build request and send to new be location - beConn = getConnection(loadUrlStr, label); - // send data to be - try (OutputStream beConnOutputStream = new BufferedOutputStream(beConn.getOutputStream())) { - IOUtils.write(value, beConnOutputStream, StandardCharsets.UTF_8); - } - - // get respond - status = beConn.getResponseCode(); - String respMsg = beConn.getResponseMessage(); - String response; - try (InputStream beConnInputStream = beConn.getInputStream()) { - response = IOUtils.toString(beConnInputStream, StandardCharsets.UTF_8); - } - return new LoadResponse(status, respMsg, response); - - } catch (Exception e) { + int responseHttpStatus = -1; + try (CloseableHttpClient httpClient = getHttpClient()) { + String loadUrlStr = String.format(loadUrlPattern, getBackend(), db, tbl); + LOG.debug("Streamload Request:{} ,Body:{}", loadUrlStr, value); + //only to record the BE node in case of an exception + this.loadUrlStr = loadUrlStr; + + HttpPut httpPut = getHttpPut(label, loadUrlStr); + httpPut.setEntity(new StringEntity(value, StandardCharsets.UTF_8)); + HttpResponse httpResponse = httpClient.execute(httpPut); + responseHttpStatus = httpResponse.getStatusLine().getStatusCode(); + String respMsg = httpResponse.getStatusLine().getReasonPhrase(); + String response = EntityUtils.toString(new BufferedHttpEntity(httpResponse.getEntity()), StandardCharsets.UTF_8); + return new LoadResponse(responseHttpStatus, respMsg, response); + } catch (IOException e) { e.printStackTrace(); String err = "http request exception,load url : " + loadUrlStr + ",failed to execute spark streamload with label: " + label; LOG.warn(err, e); - return new LoadResponse(status, e.getMessage(), err); - } finally { - if (beConn != null) { - beConn.disconnect(); - } + return new LoadResponse(responseHttpStatus, e.getMessage(), err); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org