This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new e5bba52  [improve] Improve Httpclient Connection (#344)
e5bba52 is described below

commit e5bba5207b5537ca2103374af82660bc962607de
Author: wudi <676366...@qq.com>
AuthorDate: Tue Mar 19 14:08:33 2024 +0800

    [improve] Improve Httpclient Connection (#344)
---
 .../java/org/apache/doris/flink/sink/HttpUtil.java | 31 ++++---
 .../flink/sink/batch/DorisBatchStreamLoad.java     | 54 ++++++------
 .../doris/flink/sink/copy/BatchStageLoad.java      | 98 ++++++++++++----------
 .../doris/flink/sink/copy/DorisCopyCommitter.java  | 64 +++++++-------
 .../flink/sink/copy/TestDorisCopyCommitter.java    |  5 +-
 5 files changed, 139 insertions(+), 113 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java
index 43e2bea..1307ce4 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java
@@ -18,12 +18,13 @@
 package org.apache.doris.flink.sink;
 
 import org.apache.http.client.config.RequestConfig;
-import org.apache.http.impl.NoConnectionReuseStrategy;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.DefaultRedirectStrategy;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.impl.client.HttpClients;
 
+import java.util.concurrent.TimeUnit;
+
 /** util to build http client. */
 public class HttpUtil {
     private final HttpClientBuilder httpClientBuilder =
@@ -34,7 +35,9 @@ public class HttpUtil {
                                 protected boolean isRedirectable(String 
method) {
                                     return true;
                                 }
-                            });
+                            })
+                    .evictExpiredConnections()
+                    .evictIdleConnections(60, TimeUnit.SECONDS);
 
     public CloseableHttpClient getHttpClient() {
         return httpClientBuilder.build();
@@ -48,17 +51,21 @@ public class HttpUtil {
                     .setSocketTimeout(9 * 60 * 1000)
                     .build();
 
-    public CloseableHttpClient getHttpClientForBatch() {
-        return 
httpClientBuilder.setDefaultRequestConfig(requestConfig).build();
+    public HttpClientBuilder getHttpClientBuilderForBatch() {
+        return HttpClients.custom()
+                .setRedirectStrategy(
+                        new DefaultRedirectStrategy() {
+                            @Override
+                            protected boolean isRedirectable(String method) {
+                                return true;
+                            }
+                        })
+                .setDefaultRequestConfig(requestConfig);
     }
 
-    private final HttpClientBuilder httpClientBuilderWithTimeout =
-            HttpClients.custom().setDefaultRequestConfig(requestConfig);
-
-    public CloseableHttpClient getHttpClientWithTimeout() {
-        return httpClientBuilderWithTimeout
-                // fix failed to respond for commit copy
-                .setConnectionReuseStrategy(NoConnectionReuseStrategy.INSTANCE)
-                .build();
+    public HttpClientBuilder getHttpClientBuilderForCopyBatch() {
+        return HttpClients.custom()
+                .disableRedirectHandling()
+                .setDefaultRequestConfig(requestConfig);
     }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
index 0971be0..ad4ccde 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -35,6 +35,7 @@ import org.apache.doris.flink.sink.writer.LabelGenerator;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.entity.ByteArrayEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.util.EntityUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -90,7 +91,7 @@ public class DorisBatchStreamLoad implements Serializable {
     private final AtomicBoolean started;
     private volatile boolean loadThreadAlive = false;
     private AtomicReference<Throwable> exception = new AtomicReference<>(null);
-    private CloseableHttpClient httpClient = new 
HttpUtil().getHttpClientForBatch();
+    private HttpClientBuilder httpClientBuilder = new 
HttpUtil().getHttpClientBuilderForBatch();
     private BackendUtil backendUtil;
 
     public DorisBatchStreamLoad(
@@ -274,32 +275,35 @@ public class DorisBatchStreamLoad implements Serializable 
{
             int retry = 0;
             while (retry <= executionOptions.getMaxRetries()) {
                 LOG.info("stream load started for {} on host {}", label, 
hostPort);
-                try (CloseableHttpResponse response = 
httpClient.execute(putBuilder.build())) {
-                    int statusCode = response.getStatusLine().getStatusCode();
-                    if (statusCode == 200 && response.getEntity() != null) {
-                        String loadResult = 
EntityUtils.toString(response.getEntity());
-                        LOG.info("load Result {}", loadResult);
-                        RespContent respContent =
-                                OBJECT_MAPPER.readValue(loadResult, 
RespContent.class);
-                        if 
(!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
-                            String errMsg =
-                                    String.format(
-                                            "stream load error: %s, see more 
in %s",
-                                            respContent.getMessage(), 
respContent.getErrorURL());
-                            throw new DorisBatchLoadException(errMsg);
-                        } else {
-                            return;
+                try (CloseableHttpClient httpClient = 
httpClientBuilder.build()) {
+                    try (CloseableHttpResponse response = 
httpClient.execute(putBuilder.build())) {
+                        int statusCode = 
response.getStatusLine().getStatusCode();
+                        if (statusCode == 200 && response.getEntity() != null) 
{
+                            String loadResult = 
EntityUtils.toString(response.getEntity());
+                            LOG.info("load Result {}", loadResult);
+                            RespContent respContent =
+                                    OBJECT_MAPPER.readValue(loadResult, 
RespContent.class);
+                            if 
(!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
+                                String errMsg =
+                                        String.format(
+                                                "stream load error: %s, see 
more in %s",
+                                                respContent.getMessage(),
+                                                respContent.getErrorURL());
+                                throw new DorisBatchLoadException(errMsg);
+                            } else {
+                                return;
+                            }
                         }
+                        LOG.error(
+                                "stream load failed with {}, reason {}, to 
retry",
+                                hostPort,
+                                response.getStatusLine().toString());
+                    } catch (Exception ex) {
+                        if (retry == executionOptions.getMaxRetries()) {
+                            throw new DorisBatchLoadException("stream load 
error: ", ex);
+                        }
+                        LOG.error("stream load error with {}, to retry, cause 
by", hostPort, ex);
                     }
-                    LOG.error(
-                            "stream load failed with {}, reason {}, to retry",
-                            hostPort,
-                            response.getStatusLine().toString());
-                } catch (Exception ex) {
-                    if (retry == executionOptions.getMaxRetries()) {
-                        throw new DorisBatchLoadException("stream load error: 
", ex);
-                    }
-                    LOG.error("stream load error with {}, to retry, cause by", 
hostPort, ex);
                 }
                 retry++;
                 // get available backend retry
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java
index 97e217d..c192d76 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java
@@ -35,6 +35,7 @@ import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpPut;
 import org.apache.http.entity.ByteArrayEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.util.EntityUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -84,7 +85,7 @@ public class BatchStageLoad implements Serializable {
     private final AtomicBoolean started;
     private volatile boolean loadThreadAlive = false;
     private AtomicReference<Throwable> exception = new AtomicReference<>(null);
-    private CloseableHttpClient httpClient = new 
HttpUtil().getHttpClientWithTimeout();
+    private HttpClientBuilder httpClientBuilder = new 
HttpUtil().getHttpClientBuilderForCopyBatch();
 
     public BatchStageLoad(
             DorisOptions dorisOptions,
@@ -290,33 +291,38 @@ public class BatchStageLoad implements Serializable {
                         BackoffAndRetryUtils.backoffAndRetry(
                                 BackoffAndRetryUtils.LoadOperation.UPLOAD_FILE,
                                 () -> {
-                                    try (CloseableHttpResponse response =
-                                            httpClient.execute(httpPut)) {
-                                        final int statusCode =
-                                                
response.getStatusLine().getStatusCode();
-                                        String requestId = 
getRequestId(response.getAllHeaders());
-                                        if (statusCode == 200 && 
response.getEntity() != null) {
-                                            String loadResult =
-                                                    
EntityUtils.toString(response.getEntity());
-                                            if (loadResult == null || 
loadResult.isEmpty()) {
-                                                // upload finished
-                                                return requestId;
+                                    try (CloseableHttpClient httpClient =
+                                            httpClientBuilder.build()) {
+                                        try (CloseableHttpResponse response =
+                                                httpClient.execute(httpPut)) {
+                                            final int statusCode =
+                                                    
response.getStatusLine().getStatusCode();
+                                            String requestId =
+                                                    
getRequestId(response.getAllHeaders());
+                                            if (statusCode == 200 && 
response.getEntity() != null) {
+                                                String loadResult =
+                                                        
EntityUtils.toString(response.getEntity());
+                                                if (loadResult == null || 
loadResult.isEmpty()) {
+                                                    // upload finished
+                                                    return requestId;
+                                                }
+                                                LOG.error(
+                                                        "upload file failed, 
requestId is {}, response result: {}",
+                                                        requestId,
+                                                        loadResult);
+                                                throw new CopyLoadException(
+                                                        "upload file failed: "
+                                                                + 
response.getStatusLine()
+                                                                        
.toString()
+                                                                + ", with 
requestId "
+                                                                + requestId);
                                             }
-                                            LOG.error(
-                                                    "upload file failed, 
requestId is {}, response result: {}",
-                                                    requestId,
-                                                    loadResult);
                                             throw new CopyLoadException(
-                                                    "upload file failed: "
+                                                    "upload file error: "
                                                             + 
response.getStatusLine().toString()
                                                             + ", with 
requestId "
                                                             + requestId);
                                         }
-                                        throw new CopyLoadException(
-                                                "upload file error: "
-                                                        + 
response.getStatusLine().toString()
-                                                        + ", with requestId "
-                                                        + requestId);
                                     }
                                 });
                 return String.valueOf(result);
@@ -361,27 +367,33 @@ public class BatchStageLoad implements Serializable {
                         BackoffAndRetryUtils.backoffAndRetry(
                                 
BackoffAndRetryUtils.LoadOperation.GET_INTERNAL_STAGE_ADDRESS,
                                 () -> {
-                                    try (CloseableHttpResponse execute =
-                                            
httpClient.execute(putBuilder.build())) {
-                                        int statusCode = 
execute.getStatusLine().getStatusCode();
-                                        String reason = 
execute.getStatusLine().getReasonPhrase();
-                                        if (statusCode == 307) {
-                                            Header location = 
execute.getFirstHeader("location");
-                                            String uploadAddress = 
location.getValue();
-                                            return uploadAddress;
-                                        } else {
-                                            HttpEntity entity = 
execute.getEntity();
-                                            String result =
-                                                    entity == null
-                                                            ? null
-                                                            : 
EntityUtils.toString(entity);
-                                            LOG.error(
-                                                    "Failed to get 
internalStage address, status {}, reason {}, response {}",
-                                                    statusCode,
-                                                    reason,
-                                                    result);
-                                            throw new CopyLoadException(
-                                                    "Failed get internalStage 
address");
+                                    try (CloseableHttpClient httpClient =
+                                            httpClientBuilder.build()) {
+                                        try (CloseableHttpResponse execute =
+                                                
httpClient.execute(putBuilder.build())) {
+                                            int statusCode =
+                                                    
execute.getStatusLine().getStatusCode();
+                                            String reason =
+                                                    
execute.getStatusLine().getReasonPhrase();
+                                            if (statusCode == 307) {
+                                                Header location =
+                                                        
execute.getFirstHeader("location");
+                                                String uploadAddress = 
location.getValue();
+                                                return uploadAddress;
+                                            } else {
+                                                HttpEntity entity = 
execute.getEntity();
+                                                String result =
+                                                        entity == null
+                                                                ? null
+                                                                : 
EntityUtils.toString(entity);
+                                                LOG.error(
+                                                        "Failed to get 
internalStage address, status {}, reason {}, response {}",
+                                                        statusCode,
+                                                        reason,
+                                                        result);
+                                                throw new CopyLoadException(
+                                                        "Failed get 
internalStage address");
+                                            }
                                         }
                                     }
                                 });
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyCommitter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyCommitter.java
index 16be357..095c680 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyCommitter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyCommitter.java
@@ -31,6 +31,7 @@ import org.apache.doris.flink.sink.copy.models.CopyIntoResp;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.util.EntityUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,18 +48,20 @@ public class DorisCopyCommitter implements 
Committer<DorisCopyCommittable>, Clos
     private static final int SUCCESS = 0;
     private static final String FAIL = "1";
     private ObjectMapper objectMapper = new ObjectMapper();
-    private final CloseableHttpClient httpClient;
     private final DorisOptions dorisOptions;
+    private HttpClientBuilder httpClientBuilder = new 
HttpUtil().getHttpClientBuilderForCopyBatch();
     int maxRetry;
 
     public DorisCopyCommitter(DorisOptions dorisOptions, int maxRetry) {
-        this(dorisOptions, maxRetry, new 
HttpUtil().getHttpClientWithTimeout());
+        this.dorisOptions = dorisOptions;
+        this.maxRetry = maxRetry;
     }
 
-    public DorisCopyCommitter(DorisOptions dorisOptions, int maxRetry, 
CloseableHttpClient client) {
+    public DorisCopyCommitter(
+            DorisOptions dorisOptions, int maxRetry, HttpClientBuilder 
httpClientBuilder) {
         this.dorisOptions = dorisOptions;
         this.maxRetry = maxRetry;
-        this.httpClient = client;
+        this.httpClientBuilder = httpClientBuilder;
     }
 
     @Override
@@ -88,31 +91,32 @@ public class DorisCopyCommitter implements 
Committer<DorisCopyCommittable>, Clos
                     .setUrl(String.format(commitPattern, hostPort))
                     .baseAuth(dorisOptions.getUsername(), 
dorisOptions.getPassword())
                     .setEntity(new 
StringEntity(objectMapper.writeValueAsString(params)));
-
-            try (CloseableHttpResponse response = 
httpClient.execute(postBuilder.build())) {
-                statusCode = response.getStatusLine().getStatusCode();
-                reasonPhrase = response.getStatusLine().getReasonPhrase();
-                if (statusCode != 200) {
-                    LOG.warn(
-                            "commit failed with status {} {}, reason {}",
-                            statusCode,
-                            hostPort,
-                            reasonPhrase);
-                } else if (response.getEntity() != null) {
-                    loadResult = EntityUtils.toString(response.getEntity());
-                    success = handleCommitResponse(loadResult);
-                    if (success) {
-                        LOG.info(
-                                "commit success cost {}ms, response is {}",
-                                System.currentTimeMillis() - start,
-                                loadResult);
-                        break;
-                    } else {
-                        LOG.warn("commit failed, retry again");
+            try (CloseableHttpClient httpClient = httpClientBuilder.build()) {
+                try (CloseableHttpResponse response = 
httpClient.execute(postBuilder.build())) {
+                    statusCode = response.getStatusLine().getStatusCode();
+                    reasonPhrase = response.getStatusLine().getReasonPhrase();
+                    if (statusCode != 200) {
+                        LOG.warn(
+                                "commit failed with status {} {}, reason {}",
+                                statusCode,
+                                hostPort,
+                                reasonPhrase);
+                    } else if (response.getEntity() != null) {
+                        loadResult = 
EntityUtils.toString(response.getEntity());
+                        success = handleCommitResponse(loadResult);
+                        if (success) {
+                            LOG.info(
+                                    "commit success cost {}ms, response is {}",
+                                    System.currentTimeMillis() - start,
+                                    loadResult);
+                            break;
+                        } else {
+                            LOG.warn("commit failed, retry again");
+                        }
                     }
+                } catch (IOException e) {
+                    LOG.error("commit error : ", e);
                 }
-            } catch (IOException e) {
-                LOG.error("commit error : ", e);
             }
         }
 
@@ -158,9 +162,5 @@ public class DorisCopyCommitter implements 
Committer<DorisCopyCommittable>, Clos
     }
 
     @Override
-    public void close() throws IOException {
-        if (httpClient != null) {
-            httpClient.close();
-        }
-    }
+    public void close() throws IOException {}
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestDorisCopyCommitter.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestDorisCopyCommitter.java
index 7a7f326..23399ac 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestDorisCopyCommitter.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestDorisCopyCommitter.java
@@ -26,6 +26,7 @@ import org.apache.http.ProtocolVersion;
 import org.apache.http.StatusLine;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.message.BasicStatusLine;
 import org.junit.Assert;
 import org.junit.Before;
@@ -48,14 +49,16 @@ public class TestDorisCopyCommitter {
     public void setUp() throws Exception {
         DorisOptions dorisOptions = OptionUtils.buildDorisOptions();
         copyCommittable = new DorisCopyCommittable("127.0.0.1:8710", "copy 
into sql");
+        HttpClientBuilder httpClientBuilder = mock(HttpClientBuilder.class);
         CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
+        when(httpClientBuilder.build()).thenReturn(httpClient);
         entityMock = new HttpEntityMock();
         CloseableHttpResponse httpResponse = mock(CloseableHttpResponse.class);
         StatusLine normalLine = new BasicStatusLine(new 
ProtocolVersion("http", 1, 0), 200, "");
         when(httpClient.execute(any())).thenReturn(httpResponse);
         when(httpResponse.getStatusLine()).thenReturn(normalLine);
         when(httpResponse.getEntity()).thenReturn(entityMock);
-        copyCommitter = new DorisCopyCommitter(dorisOptions, 1, httpClient);
+        copyCommitter = new DorisCopyCommitter(dorisOptions, 1, 
httpClientBuilder);
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to