This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git

commit 3618f40e6346e44972b71991e72d2e55f89cc331
Author: jiafeng.zhang <[email protected]>
AuthorDate: Wed May 19 09:28:21 2021 +0800

    [Bug] Modify spark, flink doris connector to send request to FE, fix the 
problem of POST method, it should be the same as the method when sending the 
request (#5788)
    
    Modify spark, flink doris connector to send request to FE, fix the problem 
of POST method,
    it should be the same as the method when sending the request
---
 .../org/apache/doris/flink/rest/RestService.java   | 25 +++++++++++-----------
 1 file changed, 12 insertions(+), 13 deletions(-)

diff --git a/src/main/java/org/apache/doris/flink/rest/RestService.java 
b/src/main/java/org/apache/doris/flink/rest/RestService.java
index 469f1aa..cd5b6d5 100644
--- a/src/main/java/org/apache/doris/flink/rest/RestService.java
+++ b/src/main/java/org/apache/doris/flink/rest/RestService.java
@@ -20,6 +20,7 @@ package org.apache.doris.flink.rest;
 import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+
 import org.apache.commons.io.IOUtils;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
@@ -42,7 +43,6 @@ import org.apache.http.client.methods.HttpPost;
 import org.apache.http.client.methods.HttpRequestBase;
 import org.apache.http.entity.StringEntity;
 
-
 import org.slf4j.Logger;
 
 import java.io.BufferedReader;
@@ -65,8 +65,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-
-
 import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE;
 import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT;
 import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_MIN;
@@ -110,10 +108,7 @@ public class RestService implements Serializable {
                 .build();
 
         request.setConfig(requestConfig);
-
-
         logger.info("Send request to Doris FE '{}' with user '{}'.", 
request.getURI(), options.getUsername());
-
         IOException ex = null;
         int statusCode = -1;
 
@@ -121,17 +116,22 @@ public class RestService implements Serializable {
             logger.debug("Attempt {} to request {}.", attempt, 
request.getURI());
             try {
                 String response;
-                if(request instanceof HttpGet){
+                if (request instanceof HttpGet){
                     response = getConnectionGet(request.getURI().toString(), 
options.getUsername(), options.getPassword(),logger);
-                }else{
-                    response = getConnection(request,  options.getUsername(), 
options.getPassword(),logger);
+                } else {
+                    response = getConnectionPost(request,  
options.getUsername(), options.getPassword(),logger);
+                }
+                if (response == null) {
+                    logger.warn("Failed to get response from Doris FE {}, http 
code is {}",
+                            request.getURI(), statusCode);
+                    continue;
                 }
                 logger.trace("Success get response from Doris FE: {}, response 
is: {}.",
                         request.getURI(), response);
                 //Handle the problem of inconsistent data format returned by 
http v1 and v2
                 ObjectMapper mapper = new ObjectMapper();
                 Map map = mapper.readValue(response, Map.class);
-                if(map.containsKey("code") && map.containsKey("msg")) {
+                if (map.containsKey("code") && map.containsKey("msg")) {
                     Object data = map.get("data");
                     return mapper.writeValueAsString(data);
                 } else {
@@ -147,14 +147,13 @@ public class RestService implements Serializable {
         throw new ConnectedFailedException(request.getURI().toString(), 
statusCode, ex);
     }
 
-    private static String getConnection(HttpRequestBase request,String user, 
String passwd,Logger logger) throws IOException {
+    private static String getConnectionPost(HttpRequestBase request,String 
user, String passwd,Logger logger) throws IOException {
         URL url = new URL(request.getURI().toString());
         HttpURLConnection conn = (HttpURLConnection) url.openConnection();
         conn.setInstanceFollowRedirects(false);
-        conn.setRequestMethod("POST");
+        conn.setRequestMethod(request.getMethod());
         String authEncoding = 
Base64.getEncoder().encodeToString(String.format("%s:%s", user, 
passwd).getBytes(StandardCharsets.UTF_8));
         conn.setRequestProperty("Authorization", "Basic " + authEncoding);
-
         InputStream content = ((HttpPost)request).getEntity().getContent();
         String res = IOUtils.toString(content);
         conn.setDoOutput(true);

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to