This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new fad3c4cc [Improve](http) add timeout and waitForContinue config for sink httpclient (#522) fad3c4cc is described below commit fad3c4cc13752f4c8f89e29341724b5e54473d41 Author: wudi <676366...@qq.com> AuthorDate: Fri Dec 6 10:40:48 2024 +0800 [Improve](http) add timeout and waitForContinue config for sink httpclient (#522) --- .../doris/flink/cfg/ConfigurationOptions.java | 4 +- .../apache/doris/flink/cfg/DorisReadOptions.java | 25 +++--- .../java/org/apache/doris/flink/sink/HttpUtil.java | 88 +++++++++++++++------- .../flink/sink/batch/DorisBatchStreamLoad.java | 3 +- .../doris/flink/sink/committer/DorisCommitter.java | 6 +- .../doris/flink/sink/writer/DorisWriter.java | 2 +- .../flink/table/DorisDynamicTableFactoryTest.java | 4 +- 7 files changed, 86 insertions(+), 46 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java index 3709f0ae..8ae7f636 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java @@ -33,8 +33,8 @@ public interface ConfigurationOptions { String DORIS_REQUEST_READ_TIMEOUT_MS = "doris.request.read.timeout"; String DORIS_REQUEST_QUERY_TIMEOUT_S = "doris.request.query.timeout"; Integer DORIS_REQUEST_RETRIES_DEFAULT = 3; - Integer DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT = 30 * 1000; - Integer DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT = 30 * 1000; + Integer DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT = 60 * 1000; + Integer DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT = 60 * 1000; Integer DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT = 21600; String DORIS_TABLET_SIZE = "doris.request.tablet.size"; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java index 0448d60a..22a77b83 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java @@ -210,17 +210,22 @@ public class DorisReadOptions implements Serializable { private String readFields; private String filterQuery; - private Integer requestTabletSize; - private Integer requestConnectTimeoutMs; - private Integer requestReadTimeoutMs; - private Integer requestQueryTimeoutS; - private Integer requestRetries; - private Integer requestBatchSize; - private Long execMemLimit; - private Integer deserializeQueueSize; - private Boolean deserializeArrowAsync; + private Integer requestTabletSize = ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT; + private Integer requestConnectTimeoutMs = + ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT; + private Integer requestReadTimeoutMs = + ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT; + private Integer requestQueryTimeoutS = + ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT; + private Integer requestRetries = ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT; + private Integer requestBatchSize = ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT; + private Long execMemLimit = ConfigurationOptions.DORIS_EXEC_MEM_LIMIT_DEFAULT; + private Integer deserializeQueueSize = + ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT; + private Boolean deserializeArrowAsync = + ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT; private Boolean useOldApi = false; - private Boolean useFlightSql = false; + private Boolean useFlightSql = ConfigurationOptions.USE_FLIGHT_SQL_DEFAULT; private Integer flightSqlPort; /** diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java index 518eea71..94b65c79 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java @@ -17,50 +17,70 @@ package org.apache.doris.flink.sink; +import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.http.client.config.RequestConfig; import org.apache.http.impl.NoConnectionReuseStrategy; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.DefaultRedirectStrategy; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.client.HttpClients; +import org.apache.http.protocol.HttpRequestExecutor; -import java.util.concurrent.TimeUnit; +import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT; +import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT; /** util to build http client. */ public class HttpUtil { + private final int connectTimeout; + private final int socketTimeout; + private HttpClientBuilder httpClientBuilder; - private RequestConfig requestConfigStream = - RequestConfig.custom() - .setConnectTimeout(60 * 1000) - .setConnectionRequestTimeout(60 * 1000) - .build(); + public HttpUtil() { + this.connectTimeout = DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT; + this.socketTimeout = DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT; + settingStreamHttpClientBuilder(); + } - private final HttpClientBuilder httpClientBuilder = - HttpClients.custom() - .setRedirectStrategy( - new DefaultRedirectStrategy() { - @Override - protected boolean isRedirectable(String method) { - return true; - } - }) - .setConnectionReuseStrategy(NoConnectionReuseStrategy.INSTANCE) - .evictExpiredConnections() - .evictIdleConnections(60, TimeUnit.SECONDS) - .setDefaultRequestConfig(requestConfigStream); + public HttpUtil(DorisReadOptions readOptions) { + this.connectTimeout = readOptions.getRequestConnectTimeoutMs(); + this.socketTimeout = readOptions.getRequestReadTimeoutMs(); + settingStreamHttpClientBuilder(); + } + private void settingStreamHttpClientBuilder() { + this.httpClientBuilder = + HttpClients.custom() + // default timeout 3s, maybe report 307 error when fe busy + .setRequestExecutor(new HttpRequestExecutor(socketTimeout)) + .setRedirectStrategy( + new DefaultRedirectStrategy() { + @Override + protected boolean isRedirectable(String method) { + return true; + } + }) + .setConnectionReuseStrategy(NoConnectionReuseStrategy.INSTANCE) + .setDefaultRequestConfig( + RequestConfig.custom() + .setConnectTimeout(connectTimeout) + .setConnectionRequestTimeout(connectTimeout) + .build()); + } + + /** + * for stream http + * + * @return + */ public CloseableHttpClient getHttpClient() { return httpClientBuilder.build(); } - private RequestConfig requestConfig = - RequestConfig.custom() - .setConnectTimeout(60 * 1000) - .setConnectionRequestTimeout(60 * 1000) - // default checkpoint timeout is 10min - .setSocketTimeout(9 * 60 * 1000) - .build(); - + /** + * for batch http + * + * @return + */ public HttpClientBuilder getHttpClientBuilderForBatch() { return HttpClients.custom() .setRedirectStrategy( @@ -70,12 +90,22 @@ public class HttpUtil { return true; } }) - .setDefaultRequestConfig(requestConfig); + .setDefaultRequestConfig( + RequestConfig.custom() + .setConnectTimeout(connectTimeout) + .setConnectionRequestTimeout(connectTimeout) + .setSocketTimeout(socketTimeout) + .build()); } public HttpClientBuilder getHttpClientBuilderForCopyBatch() { return HttpClients.custom() .disableRedirectHandling() - .setDefaultRequestConfig(requestConfig); + .setDefaultRequestConfig( + RequestConfig.custom() + .setConnectTimeout(connectTimeout) + .setConnectionRequestTimeout(connectTimeout) + .setSocketTimeout(socketTimeout) + .build()); } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java index 5a32949e..479fab64 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java @@ -102,7 +102,7 @@ public class DorisBatchStreamLoad implements Serializable { private final AtomicBoolean started; private volatile boolean loadThreadAlive = false; private AtomicReference<Throwable> exception = new AtomicReference<>(null); - private HttpClientBuilder httpClientBuilder = new HttpUtil().getHttpClientBuilderForBatch(); + private HttpClientBuilder httpClientBuilder; private BackendUtil backendUtil; private boolean enableGroupCommit; private boolean enableGzCompress; @@ -169,6 +169,7 @@ public class DorisBatchStreamLoad implements Serializable { this.started = new AtomicBoolean(true); this.loadExecutorService.execute(loadAsyncExecutor); this.subTaskId = subTaskId; + this.httpClientBuilder = new HttpUtil(dorisReadOptions).getHttpClientBuilderForBatch(); } /** diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java index eafffd53..e73d96cd 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java @@ -66,7 +66,11 @@ public class DorisCommitter implements Committer<DorisCommittable>, Closeable { DorisOptions dorisOptions, DorisReadOptions dorisReadOptions, DorisExecutionOptions executionOptions) { - this(dorisOptions, dorisReadOptions, executionOptions, new HttpUtil().getHttpClient()); + this( + dorisOptions, + dorisReadOptions, + executionOptions, + new HttpUtil(dorisReadOptions).getHttpClient()); } public DorisCommitter( diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java index e84197d1..fdb797f9 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java @@ -346,7 +346,7 @@ public class DorisWriter<IN> dorisOptions, executionOptions, labelGenerator, - new HttpUtil().getHttpClient())); + new HttpUtil(dorisReadOptions).getHttpClient())); } /** Check the streamload http request regularly. */ diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java index 2baf6f56..4a01473d 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java @@ -248,8 +248,8 @@ public class DorisDynamicTableFactoryTest { options.put("password", ""); options.put("auto-redirect", "true"); options.put("doris.request.retries", "3"); - options.put("doris.request.connect.timeout", "30s"); - options.put("doris.request.read.timeout", "30s"); + options.put("doris.request.connect.timeout", "60s"); + options.put("doris.request.read.timeout", "60s"); return options; } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org