davidzollo opened a new issue, #10627:
URL: https://github.com/apache/seatunnel/issues/10627

   ### Search before asking
   
   - [x] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues) and found no similar 
issues.
   
   ### SeaTunnel Version
   
   2.3.x / dev branch (confirmed on latest dev)
   
   ### SeaTunnel Component
   
   Doris Connector (Sink)
   
   ### What happened
   
   When all Doris BE nodes become unavailable (returning `Connection refused`), 
the `DorisSinkWriter` enters an unbounded error loop that can eventually 
contribute to engine-level OOM and cascading cluster failure.
   
   **Root Cause Analysis:**
   
   1. **`DorisSinkWriter.checkDone()` has no circuit breaker or failure 
counter.** The 
`scheduledExecutorService.scheduleWithFixedDelay(this::checkDone, ...)` runs 
every `intervalTime` ms (default 10s). When 
`dorisStreamLoad.getLoadFailedMsg()` returns non-null (due to Connection 
refused), it logs `stream load finished unexpectedly` and sets `loadException`, 
but the scheduler keeps running indefinitely. There is no mechanism to:
      - Count consecutive failures and fail fast after a threshold
      - Apply exponential backoff on repeated failures
      - Stop the scheduler after persistent downstream unavailability
   
   2. **`DorisCommitter.commitTransaction()` retries without backoff.** The 
retry loop catches `IOException` (including `HttpHostConnectException: 
Connection refused`) and immediately retries by switching to a different 
frontend. There is no delay between retries, which amplifies load on the 
remaining infrastructure.
   
   3. **`DorisCommitter.abortTransaction()` maxRetry is never initialized.** 
The field `int maxRetry` defaults to `0` and is never set from 
`DorisSinkConfig.getMaxRetries()`. The loop `while (retry++ <= maxRetry)` 
effectively runs only once (when retry=0), making abort retry non-functional.
   
   4. **Engine-level job retry amplifies the problem.** When the sink fails, 
the Zeta engine retries the job, creating new `DorisSinkWriter` instances. Each 
new writer starts its own `checkDone()` scheduler, which also fails 
immediately. Over hours of Doris unavailability, this creates a large number of 
error log entries and failed stream load attempts that contribute to heap 
pressure.
   
   **In a production incident, this pattern produced:**
   - ~77,000 `Connection refused` errors over 6h39m
   - ~269,000 `stream load finished unexpectedly` log entries
   - Eventually contributed to JVM OOM and Hazelcast node shutdown
   
   ### Relevant Code
   
   **DorisSinkWriter.java - checkDone() (no circuit breaker):**
   ```java
   private void checkDone() {
       String errorMsg;
       log.debug("start timer checker, interval {} ms", intervalTime);
       if ((errorMsg = dorisStreamLoad.getLoadFailedMsg()) != null) {
           log.error("stream load finished unexpectedly: {}", errorMsg);
           loadException =
                   new DorisConnectorException(
                           DorisConnectorErrorCode.STREAM_LOAD_FAILED, 
errorMsg);
       }
       // No failure counting, no backoff, no circuit breaker
       // Scheduler continues running indefinitely even after persistent 
failures
   }
   ```
   
   **DorisCommitter.java - commitTransaction() (no backoff):**
   ```java
   while (retry++ <= dorisSinkConfig.getMaxRetries()) {
       // ...
       try {
           response = httpClient.execute(putBuilder.build());
       } catch (IOException e) {
           log.error("commit transaction failed: ", e);
           hostPort = dorisSinkConfig.getFrontends();
           continue; // Immediate retry, no backoff
       }
       // ...
   }
   ```
   
   **DorisCommitter.java - abortTransaction() (maxRetry bug):**
   ```java
   int maxRetry; // Never initialized from config, defaults to 0
   
   private void abortTransaction(DorisCommitInfo committable) throws 
IOException {
       int retry = 0;
       while (retry++ <= maxRetry) { // maxRetry=0, so loop body runs once
           // ...
       }
   }
   ```
   
   ### Key Logs (sanitized)
   
   **Connection refused errors flooding the log:**
   ```
   2026-03-17 00:00:01,346 ERROR [o.a.s.c.d.s.w.DorisSinkWriter] 
[stream-load-check] - stream load finished unexpectedly: 
org.apache.http.conn.HttpHostConnectException: Connect to 10.x.x.x:8040 
[/10.x.x.x] failed: Connection refused (Connection refused)
   2026-03-17 00:00:01,346 ERROR [o.a.s.c.d.s.w.DorisSinkWriter] 
[stream-load-check] - stream load finished unexpectedly: 
org.apache.http.conn.HttpHostConnectException: Connect to 10.x.x.x:8040 
[/10.x.x.x] failed: Connection refused (Connection refused)
   ... (repeated ~77,000 times over 6h39m across 3 BE nodes)
   ```
   
   **Eventual OOM causing checkpoint failure:**
   ```
   2026-03-17 05:31:14,445 ERROR [.s.e.s.c.CheckpointCoordinator] - store 
checkpoint states failed.
   java.lang.OutOfMemoryError: Java heap space
   ```
   
   **Final Hazelcast node shutdown due to OOM:**
   ```
   2026-03-17 06:39:56,618 INFO  [c.h.i.i.Node] - [10.x.x.x]:5801 [seatunnel] 
[5.1] Hazelcast Shutdown is completed in 54061 ms.
   2026-03-17 06:39:56,618 ERROR [c.h.s.i.o.i.InvocationMonitor] - 
[10.x.x.x]:5801 [seatunnel] [5.1] Java heap space
   java.lang.OutOfMemoryError: Java heap space
   ```
   
   ### Suggested Improvements
   
   1. **Add circuit breaker to DorisSinkWriter**: Track consecutive failure 
count in `checkDone()`. After N consecutive failures (configurable, e.g., 
`doris.sink.max-consecutive-failures=100`), stop the scheduler and fail the 
task immediately instead of continuing to retry indefinitely.
   
   2. **Add exponential backoff to DorisCommitter retry**: Add `Thread.sleep()` 
with exponential backoff between retries in `commitTransaction()` (e.g., 1s, 
2s, 4s...).
   
   3. **Fix maxRetry initialization in abortTransaction()**: Initialize 
`maxRetry` from `dorisSinkConfig.getMaxRetries()` in the constructor, or use 
`dorisSinkConfig.getMaxRetries()` directly in the loop condition.
   
   4. **Add rate limiting to error logging**: When the same error repeats 
thousands of times, consider logging at reduced frequency (e.g., log every Nth 
occurrence or use a time-based throttle) to reduce log volume and I/O pressure.
   
   ### How to reproduce
   
   1. Set up a SeaTunnel Zeta cluster with a streaming job writing to Doris
   2. Stop all Doris BE nodes (kill the BE processes or block port 8040 with 
iptables)
   3. Observe that `DorisSinkWriter` continuously logs `stream load finished 
unexpectedly` errors at `intervalTime` frequency
   4. With engine job retry enabled, observe that each retry creates new writer 
instances that also fail immediately
   5. Over extended periods (hours), this contributes to heap pressure and can 
lead to OOM
   
   ### Anything else?
   
   This issue is specifically about the Doris connector's lack of resilience 
mechanisms. The OOM in the production incident had multiple contributing 
factors (including control-plane request accumulation), but the Doris 
connector's unbounded retry behavior was a significant amplifier.
   
   ### Are you willing to submit a PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to