This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git
commit d121e12309cf42c59d47628d350b54b1088eca28 Author: jiafeng.zhang <zhang...@gmail.com> AuthorDate: Wed May 19 09:28:21 2021 +0800 [Bug] Modify spark, flink doris connector to send request to FE, fix the problem of POST method, it should be the same as the method when sending the request (#5788) Modify spark, flink doris connector to send request to FE, fix the problem of POST method, it should be the same as the method when sending the request --- .../org/apache/doris/spark/rest/RestService.java | 80 ++++++++++++++-------- 1 file changed, 50 insertions(+), 30 deletions(-) diff --git a/src/main/java/org/apache/doris/spark/rest/RestService.java b/src/main/java/org/apache/doris/spark/rest/RestService.java index 3c8249c..ec9cfec 100644 --- a/src/main/java/org/apache/doris/spark/rest/RestService.java +++ b/src/main/java/org/apache/doris/spark/rest/RestService.java @@ -31,8 +31,10 @@ import static org.apache.doris.spark.util.ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE import static org.apache.doris.spark.util.ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE; import static org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE; +import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; import java.io.PrintWriter; import java.io.Serializable; import java.net.HttpURLConnection; @@ -115,39 +117,36 @@ public class RestService implements Serializable { .build(); request.setConfig(requestConfig); - String user = cfg.getProperty(DORIS_REQUEST_AUTH_USER, ""); String password = cfg.getProperty(DORIS_REQUEST_AUTH_PASSWORD, ""); - logger.info("Send request to Doris FE '{}' with user '{}'.", request.getURI(), user); - IOException ex = null; int statusCode = -1; for (int attempt = 0; attempt < retries; attempt++) { logger.debug("Attempt {} to request {}.", attempt, request.getURI()); try { - HttpURLConnection conn = getConnection(request, user, password); - statusCode = conn.getResponseCode(); - if (statusCode != HttpStatus.SC_OK) { + String response; + if (request instanceof HttpGet){ + response = getConnectionGet(request.getURI().toString(), user, password,logger); + } else { + response = getConnectionPost(request,user, password,logger); + } + if (response == null) { logger.warn("Failed to get response from Doris FE {}, http code is {}", request.getURI(), statusCode); continue; } - InputStream stream = (InputStream) conn.getContent(); - String res = IOUtils.toString(stream); logger.trace("Success get response from Doris FE: {}, response is: {}.", - request.getURI(), res); - + request.getURI(), response); ObjectMapper mapper = new ObjectMapper(); - - Map map = mapper.readValue(res, Map.class); + Map map = mapper.readValue(response, Map.class); //Handle the problem of inconsistent data format returned by http v1 and v2 - if(map.containsKey("code") && map.containsKey("msg")) { + if (map.containsKey("code") && map.containsKey("msg")) { Object data = map.get("data"); return mapper.writeValueAsString(data); } else { - return res; + return response; } } catch (IOException e) { ex = e; @@ -159,32 +158,53 @@ public class RestService implements Serializable { throw new ConnectedFailedException(request.getURI().toString(), statusCode, ex); } + private static String getConnectionGet(String request,String user, String passwd,Logger logger) throws IOException { + URL realUrl = new URL(request); + // open connection + HttpURLConnection connection = (HttpURLConnection)realUrl.openConnection(); + String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8)); + connection.setRequestProperty("Authorization", "Basic " + authEncoding); - /** - * Get http connection - * @param request - * @param user - * @param passwd - * @return - * @throws IOException - */ - private static HttpURLConnection getConnection(HttpRequestBase request, String user, String passwd) throws IOException { + connection.connect(); + return parseResponse(connection,logger); + } + + private static String parseResponse(HttpURLConnection connection,Logger logger) throws IOException { + if (connection.getResponseCode() != HttpStatus.SC_OK) { + logger.warn("Failed to get response from Doris {}, http code is {}", + connection.getURL(), connection.getResponseCode()); + throw new IOException("Failed to get response from Doris"); + } + String result = ""; + BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream(), "utf-8")); + String line; + while ((line = in.readLine()) != null) { + result += line; + } + if (in != null) { + in.close(); + } + return result; + } + + private static String getConnectionPost(HttpRequestBase request,String user, String passwd,Logger logger) throws IOException { URL url = new URL(request.getURI().toString()); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setInstanceFollowRedirects(false); - conn.setRequestMethod("POST"); + conn.setRequestMethod(request.getMethod()); String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8)); conn.setRequestProperty("Authorization", "Basic " + authEncoding); - - InputStream content = ((HttpPost) request).getEntity().getContent(); - String s = IOUtils.toString(content); - + InputStream content = ((HttpPost)request).getEntity().getContent(); + String res = IOUtils.toString(content); conn.setDoOutput(true); conn.setDoInput(true); PrintWriter out = new PrintWriter(conn.getOutputStream()); - out.print(s); + // send request params + out.print(res); + // flush out.flush(); - return conn; + // read response + return parseResponse(conn,logger); } /** * parse table identifier to array. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org