This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new fad3c4cc [Improve](http) add timeout and waitForContinue config for 
sink httpclient (#522)
fad3c4cc is described below

commit fad3c4cc13752f4c8f89e29341724b5e54473d41
Author: wudi <676366...@qq.com>
AuthorDate: Fri Dec 6 10:40:48 2024 +0800

    [Improve](http) add timeout and waitForContinue config for sink httpclient 
(#522)
---
 .../doris/flink/cfg/ConfigurationOptions.java      |  4 +-
 .../apache/doris/flink/cfg/DorisReadOptions.java   | 25 +++---
 .../java/org/apache/doris/flink/sink/HttpUtil.java | 88 +++++++++++++++-------
 .../flink/sink/batch/DorisBatchStreamLoad.java     |  3 +-
 .../doris/flink/sink/committer/DorisCommitter.java |  6 +-
 .../doris/flink/sink/writer/DorisWriter.java       |  2 +-
 .../flink/table/DorisDynamicTableFactoryTest.java  |  4 +-
 7 files changed, 86 insertions(+), 46 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
index 3709f0ae..8ae7f636 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
@@ -33,8 +33,8 @@ public interface ConfigurationOptions {
     String DORIS_REQUEST_READ_TIMEOUT_MS = "doris.request.read.timeout";
     String DORIS_REQUEST_QUERY_TIMEOUT_S = "doris.request.query.timeout";
     Integer DORIS_REQUEST_RETRIES_DEFAULT = 3;
-    Integer DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT = 30 * 1000;
-    Integer DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT = 30 * 1000;
+    Integer DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT = 60 * 1000;
+    Integer DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT = 60 * 1000;
     Integer DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT = 21600;
 
     String DORIS_TABLET_SIZE = "doris.request.tablet.size";
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
index 0448d60a..22a77b83 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
@@ -210,17 +210,22 @@ public class DorisReadOptions implements Serializable {
 
         private String readFields;
         private String filterQuery;
-        private Integer requestTabletSize;
-        private Integer requestConnectTimeoutMs;
-        private Integer requestReadTimeoutMs;
-        private Integer requestQueryTimeoutS;
-        private Integer requestRetries;
-        private Integer requestBatchSize;
-        private Long execMemLimit;
-        private Integer deserializeQueueSize;
-        private Boolean deserializeArrowAsync;
+        private Integer requestTabletSize = 
ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT;
+        private Integer requestConnectTimeoutMs =
+                ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT;
+        private Integer requestReadTimeoutMs =
+                ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT;
+        private Integer requestQueryTimeoutS =
+                ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT;
+        private Integer requestRetries = 
ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT;
+        private Integer requestBatchSize = 
ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT;
+        private Long execMemLimit = 
ConfigurationOptions.DORIS_EXEC_MEM_LIMIT_DEFAULT;
+        private Integer deserializeQueueSize =
+                ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT;
+        private Boolean deserializeArrowAsync =
+                ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT;
         private Boolean useOldApi = false;
-        private Boolean useFlightSql = false;
+        private Boolean useFlightSql = 
ConfigurationOptions.USE_FLIGHT_SQL_DEFAULT;
         private Integer flightSqlPort;
 
         /**
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java
index 518eea71..94b65c79 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java
@@ -17,50 +17,70 @@
 
 package org.apache.doris.flink.sink;
 
+import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.http.client.config.RequestConfig;
 import org.apache.http.impl.NoConnectionReuseStrategy;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.DefaultRedirectStrategy;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.impl.client.HttpClients;
+import org.apache.http.protocol.HttpRequestExecutor;
 
-import java.util.concurrent.TimeUnit;
+import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT;
+import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT;
 
 /** util to build http client. */
 public class HttpUtil {
+    private final int connectTimeout;
+    private final int socketTimeout;
+    private HttpClientBuilder httpClientBuilder;
 
-    private RequestConfig requestConfigStream =
-            RequestConfig.custom()
-                    .setConnectTimeout(60 * 1000)
-                    .setConnectionRequestTimeout(60 * 1000)
-                    .build();
+    public HttpUtil() {
+        this.connectTimeout = DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT;
+        this.socketTimeout = DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT;
+        settingStreamHttpClientBuilder();
+    }
 
-    private final HttpClientBuilder httpClientBuilder =
-            HttpClients.custom()
-                    .setRedirectStrategy(
-                            new DefaultRedirectStrategy() {
-                                @Override
-                                protected boolean isRedirectable(String 
method) {
-                                    return true;
-                                }
-                            })
-                    
.setConnectionReuseStrategy(NoConnectionReuseStrategy.INSTANCE)
-                    .evictExpiredConnections()
-                    .evictIdleConnections(60, TimeUnit.SECONDS)
-                    .setDefaultRequestConfig(requestConfigStream);
+    public HttpUtil(DorisReadOptions readOptions) {
+        this.connectTimeout = readOptions.getRequestConnectTimeoutMs();
+        this.socketTimeout = readOptions.getRequestReadTimeoutMs();
+        settingStreamHttpClientBuilder();
+    }
 
+    private void settingStreamHttpClientBuilder() {
+        this.httpClientBuilder =
+                HttpClients.custom()
+                        // default timeout 3s, maybe report 307 error when fe 
busy
+                        .setRequestExecutor(new 
HttpRequestExecutor(socketTimeout))
+                        .setRedirectStrategy(
+                                new DefaultRedirectStrategy() {
+                                    @Override
+                                    protected boolean isRedirectable(String 
method) {
+                                        return true;
+                                    }
+                                })
+                        
.setConnectionReuseStrategy(NoConnectionReuseStrategy.INSTANCE)
+                        .setDefaultRequestConfig(
+                                RequestConfig.custom()
+                                        .setConnectTimeout(connectTimeout)
+                                        
.setConnectionRequestTimeout(connectTimeout)
+                                        .build());
+    }
+
+    /**
+     * for stream http
+     *
+     * @return
+     */
     public CloseableHttpClient getHttpClient() {
         return httpClientBuilder.build();
     }
 
-    private RequestConfig requestConfig =
-            RequestConfig.custom()
-                    .setConnectTimeout(60 * 1000)
-                    .setConnectionRequestTimeout(60 * 1000)
-                    // default checkpoint timeout is 10min
-                    .setSocketTimeout(9 * 60 * 1000)
-                    .build();
-
+    /**
+     * for batch http
+     *
+     * @return
+     */
     public HttpClientBuilder getHttpClientBuilderForBatch() {
         return HttpClients.custom()
                 .setRedirectStrategy(
@@ -70,12 +90,22 @@ public class HttpUtil {
                                 return true;
                             }
                         })
-                .setDefaultRequestConfig(requestConfig);
+                .setDefaultRequestConfig(
+                        RequestConfig.custom()
+                                .setConnectTimeout(connectTimeout)
+                                .setConnectionRequestTimeout(connectTimeout)
+                                .setSocketTimeout(socketTimeout)
+                                .build());
     }
 
     public HttpClientBuilder getHttpClientBuilderForCopyBatch() {
         return HttpClients.custom()
                 .disableRedirectHandling()
-                .setDefaultRequestConfig(requestConfig);
+                .setDefaultRequestConfig(
+                        RequestConfig.custom()
+                                .setConnectTimeout(connectTimeout)
+                                .setConnectionRequestTimeout(connectTimeout)
+                                .setSocketTimeout(socketTimeout)
+                                .build());
     }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
index 5a32949e..479fab64 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -102,7 +102,7 @@ public class DorisBatchStreamLoad implements Serializable {
     private final AtomicBoolean started;
     private volatile boolean loadThreadAlive = false;
     private AtomicReference<Throwable> exception = new AtomicReference<>(null);
-    private HttpClientBuilder httpClientBuilder = new 
HttpUtil().getHttpClientBuilderForBatch();
+    private HttpClientBuilder httpClientBuilder;
     private BackendUtil backendUtil;
     private boolean enableGroupCommit;
     private boolean enableGzCompress;
@@ -169,6 +169,7 @@ public class DorisBatchStreamLoad implements Serializable {
         this.started = new AtomicBoolean(true);
         this.loadExecutorService.execute(loadAsyncExecutor);
         this.subTaskId = subTaskId;
+        this.httpClientBuilder = new 
HttpUtil(dorisReadOptions).getHttpClientBuilderForBatch();
     }
 
     /**
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
index eafffd53..e73d96cd 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
@@ -66,7 +66,11 @@ public class DorisCommitter implements 
Committer<DorisCommittable>, Closeable {
             DorisOptions dorisOptions,
             DorisReadOptions dorisReadOptions,
             DorisExecutionOptions executionOptions) {
-        this(dorisOptions, dorisReadOptions, executionOptions, new 
HttpUtil().getHttpClient());
+        this(
+                dorisOptions,
+                dorisReadOptions,
+                executionOptions,
+                new HttpUtil(dorisReadOptions).getHttpClient());
     }
 
     public DorisCommitter(
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index e84197d1..fdb797f9 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -346,7 +346,7 @@ public class DorisWriter<IN>
                                 dorisOptions,
                                 executionOptions,
                                 labelGenerator,
-                                new HttpUtil().getHttpClient()));
+                                new 
HttpUtil(dorisReadOptions).getHttpClient()));
     }
 
     /** Check the streamload http request regularly. */
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
index 2baf6f56..4a01473d 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
@@ -248,8 +248,8 @@ public class DorisDynamicTableFactoryTest {
         options.put("password", "");
         options.put("auto-redirect", "true");
         options.put("doris.request.retries", "3");
-        options.put("doris.request.connect.timeout", "30s");
-        options.put("doris.request.read.timeout", "30s");
+        options.put("doris.request.connect.timeout", "60s");
+        options.put("doris.request.read.timeout", "60s");
         return options;
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to