This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git

commit d121e12309cf42c59d47628d350b54b1088eca28
Author: jiafeng.zhang <zhang...@gmail.com>
AuthorDate: Wed May 19 09:28:21 2021 +0800

    [Bug] Modify spark, flink doris connector to send request to FE, fix the 
problem of POST method, it should be the same as the method when sending the 
request (#5788)
    
    Modify spark, flink doris connector to send request to FE, fix the problem 
of POST method,
    it should be the same as the method when sending the request
---
 .../org/apache/doris/spark/rest/RestService.java   | 80 ++++++++++++++--------
 1 file changed, 50 insertions(+), 30 deletions(-)

diff --git a/src/main/java/org/apache/doris/spark/rest/RestService.java 
b/src/main/java/org/apache/doris/spark/rest/RestService.java
index 3c8249c..ec9cfec 100644
--- a/src/main/java/org/apache/doris/spark/rest/RestService.java
+++ b/src/main/java/org/apache/doris/spark/rest/RestService.java
@@ -31,8 +31,10 @@ import static 
org.apache.doris.spark.util.ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE
 import static 
org.apache.doris.spark.util.ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE;
 import static 
org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE;
 
+import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.io.PrintWriter;
 import java.io.Serializable;
 import java.net.HttpURLConnection;
@@ -115,39 +117,36 @@ public class RestService implements Serializable {
                 .build();
 
         request.setConfig(requestConfig);
-
         String user = cfg.getProperty(DORIS_REQUEST_AUTH_USER, "");
         String password = cfg.getProperty(DORIS_REQUEST_AUTH_PASSWORD, "");
-
         logger.info("Send request to Doris FE '{}' with user '{}'.", 
request.getURI(), user);
-
         IOException ex = null;
         int statusCode = -1;
 
         for (int attempt = 0; attempt < retries; attempt++) {
             logger.debug("Attempt {} to request {}.", attempt, 
request.getURI());
             try {
-                HttpURLConnection conn = getConnection(request, user, 
password);
-                statusCode = conn.getResponseCode();
-                if (statusCode != HttpStatus.SC_OK) {
+                String response;
+                if (request instanceof HttpGet){
+                    response = getConnectionGet(request.getURI().toString(), 
user, password,logger);
+                } else {
+                    response = getConnectionPost(request,user, 
password,logger);
+                }
+                if (response == null) {
                     logger.warn("Failed to get response from Doris FE {}, http 
code is {}",
                             request.getURI(), statusCode);
                     continue;
                 }
-                InputStream stream = (InputStream) conn.getContent();
-                String res = IOUtils.toString(stream);
                 logger.trace("Success get response from Doris FE: {}, response 
is: {}.",
-                        request.getURI(), res);
-
+                        request.getURI(), response);
                 ObjectMapper mapper = new ObjectMapper();
-
-                Map map = mapper.readValue(res, Map.class);
+                Map map = mapper.readValue(response, Map.class);
                 //Handle the problem of inconsistent data format returned by 
http v1 and v2
-                if(map.containsKey("code") && map.containsKey("msg")) {
+                if (map.containsKey("code") && map.containsKey("msg")) {
                     Object data = map.get("data");
                     return mapper.writeValueAsString(data);
                 } else {
-                    return res;
+                    return response;
                 }
             } catch (IOException e) {
                 ex = e;
@@ -159,32 +158,53 @@ public class RestService implements Serializable {
         throw new ConnectedFailedException(request.getURI().toString(), 
statusCode, ex);
     }
 
+    private static String getConnectionGet(String request,String user, String 
passwd,Logger logger) throws IOException {
+        URL realUrl = new URL(request);
+        // open connection
+        HttpURLConnection connection = 
(HttpURLConnection)realUrl.openConnection();
+        String authEncoding = 
Base64.getEncoder().encodeToString(String.format("%s:%s", user, 
passwd).getBytes(StandardCharsets.UTF_8));
+        connection.setRequestProperty("Authorization", "Basic " + 
authEncoding);
 
-    /**
-     * Get http connection
-     * @param request
-     * @param user
-     * @param passwd
-     * @return
-     * @throws IOException
-     */
-    private static HttpURLConnection getConnection(HttpRequestBase request, 
String user, String passwd) throws IOException {
+        connection.connect();
+        return parseResponse(connection,logger);
+    }
+
+    private static String parseResponse(HttpURLConnection connection,Logger 
logger) throws IOException {
+        if (connection.getResponseCode() != HttpStatus.SC_OK) {
+            logger.warn("Failed to get response from Doris  {}, http code is 
{}",
+                    connection.getURL(), connection.getResponseCode());
+            throw new IOException("Failed to get response from Doris");
+        }
+        String result = "";
+        BufferedReader in = new BufferedReader(new 
InputStreamReader(connection.getInputStream(), "utf-8"));
+        String line;
+        while ((line = in.readLine()) != null) {
+            result += line;
+        }
+        if (in != null) {
+            in.close();
+        }
+        return result;
+    }
+
+    private static String getConnectionPost(HttpRequestBase request,String 
user, String passwd,Logger logger) throws IOException {
         URL url = new URL(request.getURI().toString());
         HttpURLConnection conn = (HttpURLConnection) url.openConnection();
         conn.setInstanceFollowRedirects(false);
-        conn.setRequestMethod("POST");
+        conn.setRequestMethod(request.getMethod());
         String authEncoding = 
Base64.getEncoder().encodeToString(String.format("%s:%s", user, 
passwd).getBytes(StandardCharsets.UTF_8));
         conn.setRequestProperty("Authorization", "Basic " + authEncoding);
-
-        InputStream content = ((HttpPost) request).getEntity().getContent();
-        String s = IOUtils.toString(content);
-
+        InputStream content = ((HttpPost)request).getEntity().getContent();
+        String res = IOUtils.toString(content);
         conn.setDoOutput(true);
         conn.setDoInput(true);
         PrintWriter out = new PrintWriter(conn.getOutputStream());
-        out.print(s);
+        // send request params
+        out.print(res);
+        // flush
         out.flush();
-        return conn;
+        // read response
+        return parseResponse(conn,logger);
     }
     /**
      * parse table identifier to array.

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to