This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git
commit 3618f40e6346e44972b71991e72d2e55f89cc331 Author: jiafeng.zhang <[email protected]> AuthorDate: Wed May 19 09:28:21 2021 +0800 [Bug] Modify spark, flink doris connector to send request to FE, fix the problem of POST method, it should be the same as the method when sending the request (#5788) Modify spark, flink doris connector to send request to FE, fix the problem of POST method, it should be the same as the method when sending the request --- .../org/apache/doris/flink/rest/RestService.java | 25 +++++++++++----------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/src/main/java/org/apache/doris/flink/rest/RestService.java b/src/main/java/org/apache/doris/flink/rest/RestService.java index 469f1aa..cd5b6d5 100644 --- a/src/main/java/org/apache/doris/flink/rest/RestService.java +++ b/src/main/java/org/apache/doris/flink/rest/RestService.java @@ -20,6 +20,7 @@ package org.apache.doris.flink.rest; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; + import org.apache.commons.io.IOUtils; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; @@ -42,7 +43,6 @@ import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpRequestBase; import org.apache.http.entity.StringEntity; - import org.slf4j.Logger; import java.io.BufferedReader; @@ -65,8 +65,6 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; - - import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE; import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT; import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_MIN; @@ -110,10 +108,7 @@ public class RestService implements Serializable { .build(); request.setConfig(requestConfig); - - logger.info("Send request to Doris FE '{}' with user '{}'.", request.getURI(), options.getUsername()); - IOException ex = null; int statusCode = -1; @@ -121,17 +116,22 @@ public class RestService implements Serializable { logger.debug("Attempt {} to request {}.", attempt, request.getURI()); try { String response; - if(request instanceof HttpGet){ + if (request instanceof HttpGet){ response = getConnectionGet(request.getURI().toString(), options.getUsername(), options.getPassword(),logger); - }else{ - response = getConnection(request, options.getUsername(), options.getPassword(),logger); + } else { + response = getConnectionPost(request, options.getUsername(), options.getPassword(),logger); + } + if (response == null) { + logger.warn("Failed to get response from Doris FE {}, http code is {}", + request.getURI(), statusCode); + continue; } logger.trace("Success get response from Doris FE: {}, response is: {}.", request.getURI(), response); //Handle the problem of inconsistent data format returned by http v1 and v2 ObjectMapper mapper = new ObjectMapper(); Map map = mapper.readValue(response, Map.class); - if(map.containsKey("code") && map.containsKey("msg")) { + if (map.containsKey("code") && map.containsKey("msg")) { Object data = map.get("data"); return mapper.writeValueAsString(data); } else { @@ -147,14 +147,13 @@ public class RestService implements Serializable { throw new ConnectedFailedException(request.getURI().toString(), statusCode, ex); } - private static String getConnection(HttpRequestBase request,String user, String passwd,Logger logger) throws IOException { + private static String getConnectionPost(HttpRequestBase request,String user, String passwd,Logger logger) throws IOException { URL url = new URL(request.getURI().toString()); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setInstanceFollowRedirects(false); - conn.setRequestMethod("POST"); + conn.setRequestMethod(request.getMethod()); String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8)); conn.setRequestProperty("Authorization", "Basic " + authEncoding); - InputStream content = ((HttpPost)request).getEntity().getContent(); String res = IOUtils.toString(content); conn.setDoOutput(true); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
