This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 304b8626e89 [feature](regression) add retry to stream load when
connection reset … (#54702)
304b8626e89 is described below
commit 304b8626e89278b027194692da93e7756aa77fcd
Author: shuke <[email protected]>
AuthorDate: Mon Aug 18 11:01:31 2025 +0800
[feature](regression) add retry to stream load when connection reset …
(#54702)
…by s3 (#54613)
During stream loading, Doris sometimes becomes stuck due to high cpu
usage, meaning that Doris fails to retrieve data from the HTTP
connection. This issue also prevents the regression test framework from
fetching data via HTTP/S3. Since S3 terminates connections that are idle
for approximately two minutes, such stalls lead to stream load failures.
This patch introduces an automatic retry mechanism to tolerate these
stuck scenarios and ensure successful stream loading.
usage:
`
streamload {
...
retryIfHttpError true
} `
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
.../regression/action/StreamLoadAction.groovy | 192 ++++++++++++++++++++-
regression-test/suites/opensky_p2/load.groovy | 1 +
.../suites/tpcds_sf1_unique_p1/load.groovy | 1 +
3 files changed, 187 insertions(+), 7 deletions(-)
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy
index aa19094854e..7cf57d212f2 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy
@@ -29,6 +29,7 @@ import groovy.util.logging.Slf4j
import org.apache.http.HttpEntity
import org.apache.http.HttpStatus
import org.apache.http.client.methods.CloseableHttpResponse
+import org.apache.http.impl.client.CloseableHttpClient
import org.apache.http.client.methods.RequestBuilder
import org.apache.http.entity.FileEntity
import org.apache.http.entity.InputStreamEntity
@@ -37,6 +38,8 @@ import org.apache.http.impl.client.CloseableHttpClient
import org.apache.http.impl.client.HttpClients
import org.apache.http.util.EntityUtils
import org.junit.Assert
+import java.io.InputStream
+import java.io.IOException
@Slf4j
class StreamLoadAction implements SuiteAction {
@@ -50,6 +53,7 @@ class StreamLoadAction implements SuiteAction {
String inputText
Iterator<List<Object>> inputIterator
long time
+ boolean retryIfHttpError = false
Closure check
Map<String, String> headers
SuiteContext context
@@ -138,6 +142,10 @@ class StreamLoadAction implements SuiteAction {
this.time = time.call()
}
+ void retryIfHttpError(boolean r) {
+ this.retryIfHttpError = r
+ }
+
void twoPhaseCommit(boolean twoPhaseCommit) {
this.twoPhaseCommit = twoPhaseCommit;
}
@@ -213,14 +221,18 @@ class StreamLoadAction implements SuiteAction {
}
private InputStream httpGetStream(CloseableHttpClient client, String url) {
- CloseableHttpResponse resp =
client.execute(RequestBuilder.get(url).build())
- int code = resp.getStatusLine().getStatusCode()
- if (code != HttpStatus.SC_OK) {
- String streamBody = EntityUtils.toString(resp.getEntity())
- throw new IllegalStateException("Get http stream failed, status
code is ${code}, body:\n${streamBody}")
- }
+ if (retryIfHttpError) {
+ return new ResumableHttpInputStream(client, url)
+ } else {
+ CloseableHttpResponse resp =
client.execute(RequestBuilder.get(url).build())
+ int code = resp.getStatusLine().getStatusCode()
+ if (code != HttpStatus.SC_OK) {
+ String streamBody = EntityUtils.toString(resp.getEntity())
+ throw new IllegalStateException("Get http stream failed,
status code is ${code}, body:\n${streamBody}")
+ }
- return resp.getEntity().getContent()
+ return resp.getEntity().getContent()
+ }
}
private RequestBuilder prepareRequestHeader(RequestBuilder requestBuilder)
{
@@ -423,4 +435,170 @@ class StreamLoadAction implements SuiteAction {
throw t;
}
}
+
+ /**
+ * A resumable HTTP input stream implementation that supports automatic
retry and resume
+ * on connection failures during data transfer. This stream is designed for
reliable
+ * large file downloads over HTTP with built-in recovery mechanisms,
especially when stream
+ * load runs too slowly due to high cpu.
+ *
+ * Pay Attention:
+ * Using this class can recover from S3 actively disconnecting due to
streaming
+ * load stuck, thereby masking the underlying performance bug where
stream loading
+ * stalls for extended periods.
+ */
+ class ResumableHttpInputStream extends InputStream {
+ private CloseableHttpClient httpClient
+ private String url
+ private long offset = 0
+ private InputStream currentStream
+ private CloseableHttpResponse currentResponse
+ private int maxRetries = 3
+ private int retryDelayMs = 1000
+ private boolean closed = false
+
+ ResumableHttpInputStream(CloseableHttpClient httpClient, String url) {
+ this.httpClient = httpClient
+ this.url = url
+ openNewStream(0)
+ }
+
+ private void openNewStream(long startOffset) {
+ closeCurrentResources()
+ log.info("open new stream ${this.url} with offset ${startOffset}")
+
+ int attempts = 0
+ while (attempts <= maxRetries && !closed) {
+ attempts++
+ try {
+ RequestBuilder builder = RequestBuilder.get(url)
+ if (startOffset > 0) {
+ builder.addHeader("Range", "bytes=${startOffset}-")
+ }
+
+ currentResponse = httpClient.execute(builder.build())
+ int code = currentResponse.getStatusLine().getStatusCode()
+
+ if (code == HttpStatus.SC_OK ||
+ (code == HttpStatus.SC_PARTIAL_CONTENT && startOffset >
0)) {
+ currentStream =
currentResponse.getEntity().getContent()
+ offset = startOffset
+ return
+ }
+
+ String body =
EntityUtils.toString(currentResponse.getEntity())
+ throw new IOException("HTTP error ${code}
${currentResponse.getStatusLine().getReasonPhrase()}\n${body}")
+
+ } catch (IOException e) {
+ closeCurrentResources()
+ if (attempts > maxRetries || closed) {
+ throw e
+ }
+ sleep(retryDelayMs * attempts)
+ }
+ }
+ }
+
+ @Override
+ int read() throws IOException {
+ if (closed) throw new IOException("Stream closed")
+
+ int attempts = 0
+ while (attempts <= maxRetries) {
+ attempts++
+ try {
+ int byteRead = currentStream.read()
+ if (byteRead >= 0) offset++
+ return byteRead
+ } catch (IOException e) {
+ log.info("${url} read exception: ${e.getMessage()}")
+ if (attempts > maxRetries || closed) throw e
+ reopenStreamAfterError()
+ sleep(retryDelayMs * attempts)
+ }
+ }
+ return -1
+ }
+
+ @Override
+ int read(byte[] b, int off, int len) throws IOException {
+ if (closed) throw new IOException("Stream closed")
+ if (b == null) throw new NullPointerException()
+ if (off < 0 || len < 0 || len > b.length - off) {
+ throw new IndexOutOfBoundsException()
+ }
+
+ int attempts = 0
+ while (attempts <= maxRetries) {
+ attempts++
+ try {
+ int bytesRead = currentStream.read(b, off, len)
+ if (bytesRead > 0) offset += bytesRead
+ return bytesRead
+
+ } catch (IOException e) {
+ log.info("${url} read exception: ${e.getMessage()}")
+ if (attempts > maxRetries || closed) throw e
+ reopenStreamAfterError()
+ sleep(retryDelayMs * attempts)
+ }
+ }
+ return -1
+ }
+
+ private void reopenStreamAfterError() {
+ closeCurrentResources()
+ openNewStream(offset)
+ }
+
+ private void closeCurrentResources() {
+ try {
+ if (currentStream != null) {
+ currentStream.close()
+ }
+ } catch (IOException ignored) {}
+
+ try {
+ if (currentResponse != null) {
+ currentResponse.close()
+ }
+ } catch (IOException ignored) {}
+
+ currentStream = null
+ currentResponse = null
+ }
+
+ @Override
+ void close() throws IOException {
+ if (!closed) {
+ closed = true
+ closeCurrentResources()
+ }
+ }
+
+ long getOffset() { offset }
+
+ void setRetryPolicy(int maxRetries, int baseDelayMs) {
+ this.maxRetries = maxRetries
+ this.retryDelayMs = baseDelayMs
+ }
+
+ @Override
+ int available() throws IOException {
+ return currentStream != null ? currentStream.available() : 0
+ }
+
+ @Override
+ long skip(long n) throws IOException {
+ if (currentStream == null) return 0
+ long skipped = currentStream.skip(n)
+ offset += skipped
+ return skipped
+ }
+
+ @Override
+ boolean markSupported() {
+ return false
+ }
+ }
}
diff --git a/regression-test/suites/opensky_p2/load.groovy
b/regression-test/suites/opensky_p2/load.groovy
index d00be91d3fd..d0ff5eb478e 100644
--- a/regression-test/suites/opensky_p2/load.groovy
+++ b/regression-test/suites/opensky_p2/load.groovy
@@ -46,6 +46,7 @@ suite("load"){
file """${getS3Url() + '/regression/clickhouse/opensky/' +
sourceFile}"""
time 0
+ retryIfHttpError true
// stream load action will check result, include Success status,
and NumberTotalRows == NumberLoadedRows
diff --git a/regression-test/suites/tpcds_sf1_unique_p1/load.groovy
b/regression-test/suites/tpcds_sf1_unique_p1/load.groovy
index 0caf0889f8c..e93d7f7d020 100644
--- a/regression-test/suites/tpcds_sf1_unique_p1/load.groovy
+++ b/regression-test/suites/tpcds_sf1_unique_p1/load.groovy
@@ -121,6 +121,7 @@ suite("load") {
file """${getS3Url()}/regression/tpcds/sf1/${tableName}.dat.gz"""
time 10000 // limit inflight 10s
+ retryIfHttpError true
// stream load action will check result, include Success status,
and NumberTotalRows == NumberLoadedRows
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]