github-actions[bot] commented on code in PR #64075:
URL: https://github.com/apache/doris/pull/64075#discussion_r3354599048
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java:
##########
@@ -270,9 +270,12 @@ private String getTableIdentifier(String database, String
table) {
}
public void close() {
- // close async executor
- this.loadExecutorService.shutdown();
+ // Flag exit + clear()/shutdownNow to free producers parked on a full
flushQueue (writer
+ // thread leak).
this.started.set(false);
+ this.loadThreadAlive = false;
+ this.flushQueue.clear();
+ this.loadExecutorService.shutdownNow();
Review Comment:
This shutdown path still leaves one producer-blocking case unresolved. A
writer that has already entered `writeRecord()`'s `while
(currentCacheBytes.get() >= maxBlockedBytes)` loop is waiting on
`block.await(1, TimeUnit.SECONDS)` and only rechecks `checkFlushException()`
plus `currentCacheBytes`. `close()` now clears `flushQueue` and stops the
loader, but it does not decrement/reset `currentCacheBytes`, set `exception`,
or signal `block`, so if the queued/in-flight buffers are discarded during
close, that writer can wake every second and loop forever because
`currentCacheBytes` remains above the threshold and no load thread will reduce
it. Please make close publish a terminal failure/closed state and wake the
condition waiters, or otherwise reset the byte accounting consistently with the
discarded buffers.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]