This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git

commit 6c0627affeab6702c56f3727872ec8ffea018baa
Author: Heng Zhao <imzhaoh...@qq.com>
AuthorDate: Fri Dec 24 21:28:35 2021 +0800

    [improvement](flink-connector) flush data without multi httpclients (#7329) 
(#7450)
    
    reuse http client to flush data
---
 .../flink/table/DorisDynamicOutputFormat.java      | 22 +++++++------
 .../apache/doris/flink/table/DorisStreamLoad.java  | 36 ++++++++++++++--------
 2 files changed, 36 insertions(+), 22 deletions(-)

diff --git 
a/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java 
b/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
index f4f49bd..2a1cec4 100644
--- a/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
+++ b/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
@@ -94,7 +94,7 @@ public class DorisDynamicOutputFormat<T> extends 
RichOutputFormat<T> {
         this.readOptions = readOptions;
         this.executionOptions = executionOptions;
 
-        Properties streamLoadProp=executionOptions.getStreamLoadProp();
+        Properties streamLoadProp = executionOptions.getStreamLoadProp();
 
         boolean ifEscape = 
Boolean.parseBoolean(streamLoadProp.getProperty(ESCAPE_DELIMITERS_KEY, 
ESCAPE_DELIMITERS_DEFAULT));
         if (ifEscape) {
@@ -121,16 +121,16 @@ public class DorisDynamicOutputFormat<T> extends 
RichOutputFormat<T> {
         }
     }
 
-    private String escapeString( String s) {
-            Pattern p = Pattern.compile("\\\\x(\\d{2})");
-            Matcher m = p.matcher(s);
+    private String escapeString(String s) {
+        Pattern p = Pattern.compile("\\\\x(\\d{2})");
+        Matcher m = p.matcher(s);
 
-            StringBuffer buf = new StringBuffer();
-            while (m.find()) {
-                m.appendReplacement(buf, String.format("%s", (char) 
Integer.parseInt(m.group(1))));
-            }
-            m.appendTail(buf);
-            return buf.toString();
+        StringBuffer buf = new StringBuffer();
+        while (m.find()) {
+            m.appendReplacement(buf, String.format("%s", (char) 
Integer.parseInt(m.group(1))));
+        }
+        m.appendTail(buf);
+        return buf.toString();
     }
 
     @Override
@@ -220,6 +220,8 @@ public class DorisDynamicOutputFormat<T> extends 
RichOutputFormat<T> {
             } catch (Exception e) {
                 LOG.warn("Writing records to doris failed.", e);
                 throw new RuntimeException("Writing records to doris failed.", 
e);
+            } finally {
+                this.dorisStreamLoad.close();
             }
         }
         checkFlushException();
diff --git a/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java 
b/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
index b897ff2..9c05b83 100644
--- a/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
+++ b/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
@@ -64,6 +64,15 @@ public class DorisStreamLoad implements Serializable {
     private String tbl;
     private String authEncoding;
     private Properties streamLoadProp;
+    private final HttpClientBuilder httpClientBuilder = HttpClients
+            .custom()
+            .setRedirectStrategy(new DefaultRedirectStrategy() {
+                @Override
+                protected boolean isRedirectable(String method) {
+                    return true;
+                }
+            });
+    private CloseableHttpClient httpClient;
 
     public DorisStreamLoad(String hostPort, String db, String tbl, String 
user, String passwd, Properties streamLoadProp) {
         this.hostPort = hostPort;
@@ -74,6 +83,7 @@ public class DorisStreamLoad implements Serializable {
         this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
         this.authEncoding = basicAuthHeader(user, passwd);
         this.streamLoadProp = streamLoadProp;
+        this.httpClient = httpClientBuilder.build();
     }
 
     public String getLoadUrlStr() {
@@ -94,7 +104,7 @@ public class DorisStreamLoad implements Serializable {
             try {
                 RespContent respContent = 
OBJECT_MAPPER.readValue(loadResponse.respContent, RespContent.class);
                 if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
-                    String errMsg=String.format("stream load error: %s, see 
more in %s",respContent.getMessage(),respContent.getErrorURL());
+                    String errMsg = String.format("stream load error: %s, see 
more in %s", respContent.getMessage(), respContent.getErrorURL());
                     throw new StreamLoadException(errMsg);
                 }
             } catch (IOException e) {
@@ -112,16 +122,7 @@ public class DorisStreamLoad implements Serializable {
                     UUID.randomUUID().toString().replaceAll("-", ""));
         }
 
-        final HttpClientBuilder httpClientBuilder = HttpClients
-                .custom()
-                .setRedirectStrategy(new DefaultRedirectStrategy() {
-                    @Override
-                    protected boolean isRedirectable(String method) {
-                        return true;
-                    }
-                });
-
-        try (CloseableHttpClient client = httpClientBuilder.build()) {
+        try {
             HttpPut put = new HttpPut(loadUrlStr);
             put.setHeader(HttpHeaders.EXPECT, "100-continue");
             put.setHeader(HttpHeaders.AUTHORIZATION, this.authEncoding);
@@ -132,7 +133,7 @@ public class DorisStreamLoad implements Serializable {
             StringEntity entity = new StringEntity(value, "UTF-8");
             put.setEntity(entity);
 
-            try (CloseableHttpResponse response = client.execute(put)) {
+            try (CloseableHttpResponse response = httpClient.execute(put)) {
                 final int statusCode = 
response.getStatusLine().getStatusCode();
                 final String reasonPhrase = 
response.getStatusLine().getReasonPhrase();
                 String loadResult = "";
@@ -154,6 +155,17 @@ public class DorisStreamLoad implements Serializable {
         return "Basic " + new String(encoded);
     }
 
+    public void close() throws IOException {
+        if (null != httpClient) {
+            try {
+                httpClient.close();
+            } catch (IOException e) {
+                LOG.error("Closing httpClient failed.", e);
+                throw new RuntimeException("Closing httpClient failed.", e);
+            }
+        }
+    }
+
     public static class LoadResponse {
         public int status;
         public String respMsg;

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to