This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git
commit 6c0627affeab6702c56f3727872ec8ffea018baa Author: Heng Zhao <imzhaoh...@qq.com> AuthorDate: Fri Dec 24 21:28:35 2021 +0800 [improvement](flink-connector) flush data without multi httpclients (#7329) (#7450) reuse http client to flush data --- .../flink/table/DorisDynamicOutputFormat.java | 22 +++++++------ .../apache/doris/flink/table/DorisStreamLoad.java | 36 ++++++++++++++-------- 2 files changed, 36 insertions(+), 22 deletions(-) diff --git a/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java b/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java index f4f49bd..2a1cec4 100644 --- a/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java +++ b/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java @@ -94,7 +94,7 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> { this.readOptions = readOptions; this.executionOptions = executionOptions; - Properties streamLoadProp=executionOptions.getStreamLoadProp(); + Properties streamLoadProp = executionOptions.getStreamLoadProp(); boolean ifEscape = Boolean.parseBoolean(streamLoadProp.getProperty(ESCAPE_DELIMITERS_KEY, ESCAPE_DELIMITERS_DEFAULT)); if (ifEscape) { @@ -121,16 +121,16 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> { } } - private String escapeString( String s) { - Pattern p = Pattern.compile("\\\\x(\\d{2})"); - Matcher m = p.matcher(s); + private String escapeString(String s) { + Pattern p = Pattern.compile("\\\\x(\\d{2})"); + Matcher m = p.matcher(s); - StringBuffer buf = new StringBuffer(); - while (m.find()) { - m.appendReplacement(buf, String.format("%s", (char) Integer.parseInt(m.group(1)))); - } - m.appendTail(buf); - return buf.toString(); + StringBuffer buf = new StringBuffer(); + while (m.find()) { + m.appendReplacement(buf, String.format("%s", (char) Integer.parseInt(m.group(1)))); + } + m.appendTail(buf); + return buf.toString(); } @Override @@ -220,6 +220,8 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> { } catch (Exception e) { LOG.warn("Writing records to doris failed.", e); throw new RuntimeException("Writing records to doris failed.", e); + } finally { + this.dorisStreamLoad.close(); } } checkFlushException(); diff --git a/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java b/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java index b897ff2..9c05b83 100644 --- a/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java +++ b/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java @@ -64,6 +64,15 @@ public class DorisStreamLoad implements Serializable { private String tbl; private String authEncoding; private Properties streamLoadProp; + private final HttpClientBuilder httpClientBuilder = HttpClients + .custom() + .setRedirectStrategy(new DefaultRedirectStrategy() { + @Override + protected boolean isRedirectable(String method) { + return true; + } + }); + private CloseableHttpClient httpClient; public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd, Properties streamLoadProp) { this.hostPort = hostPort; @@ -74,6 +83,7 @@ public class DorisStreamLoad implements Serializable { this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl); this.authEncoding = basicAuthHeader(user, passwd); this.streamLoadProp = streamLoadProp; + this.httpClient = httpClientBuilder.build(); } public String getLoadUrlStr() { @@ -94,7 +104,7 @@ public class DorisStreamLoad implements Serializable { try { RespContent respContent = OBJECT_MAPPER.readValue(loadResponse.respContent, RespContent.class); if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) { - String errMsg=String.format("stream load error: %s, see more in %s",respContent.getMessage(),respContent.getErrorURL()); + String errMsg = String.format("stream load error: %s, see more in %s", respContent.getMessage(), respContent.getErrorURL()); throw new StreamLoadException(errMsg); } } catch (IOException e) { @@ -112,16 +122,7 @@ public class DorisStreamLoad implements Serializable { UUID.randomUUID().toString().replaceAll("-", "")); } - final HttpClientBuilder httpClientBuilder = HttpClients - .custom() - .setRedirectStrategy(new DefaultRedirectStrategy() { - @Override - protected boolean isRedirectable(String method) { - return true; - } - }); - - try (CloseableHttpClient client = httpClientBuilder.build()) { + try { HttpPut put = new HttpPut(loadUrlStr); put.setHeader(HttpHeaders.EXPECT, "100-continue"); put.setHeader(HttpHeaders.AUTHORIZATION, this.authEncoding); @@ -132,7 +133,7 @@ public class DorisStreamLoad implements Serializable { StringEntity entity = new StringEntity(value, "UTF-8"); put.setEntity(entity); - try (CloseableHttpResponse response = client.execute(put)) { + try (CloseableHttpResponse response = httpClient.execute(put)) { final int statusCode = response.getStatusLine().getStatusCode(); final String reasonPhrase = response.getStatusLine().getReasonPhrase(); String loadResult = ""; @@ -154,6 +155,17 @@ public class DorisStreamLoad implements Serializable { return "Basic " + new String(encoded); } + public void close() throws IOException { + if (null != httpClient) { + try { + httpClient.close(); + } catch (IOException e) { + LOG.error("Closing httpClient failed.", e); + throw new RuntimeException("Closing httpClient failed.", e); + } + } + } + public static class LoadResponse { public int status; public String respMsg; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org