LuciferYang commented on PR #364:
URL: 
https://github.com/apache/doris-spark-connector/pull/364#issuecomment-4786123145

   Thanks for taking a careful look — good questions. Replies inline:
   
   **1. Without 2PC it can't really be exactly-once — agreed.** This PR doesn't 
try to make auto-commit loads exactly-once end to end, and as you say it can't: 
one task can fire several stream loads, and if Spark re-runs the whole task 
attempt those already-committed batches get replayed under fresh labels (a new 
attempt means a new processor with an empty record buffer). What it actually 
fixes is much narrower — the connector's own in-task retry loop 
(`DORIS_SINK_MAX_RETRIES` / `Retry.exec`). When a single batch's HTTP request 
fails on the client side but the batch had in fact committed on the BE, the 
retry used to resend the same data under a *new* label and write the rows 
twice. Reusing the label lets the BE reject the retry as a duplicate. For 
exactly-once across executor/task failover you still need 2PC.
   
   **2. Only matters when `retries > 0`, and relies on the retry sending 
identical data — exactly.** `DorisDataWriter` only buffers rows 
(`recordBuffer`) when `retries > 0`, and on retry it replays exactly those rows 
in order, so the reused label plus identical payload is a genuine duplicate. 
With `retries = 0` there's no in-task retry and the manager always mints a 
fresh label, so nothing changes there.
   
   **3. On needing an abort before reuse (the PREPARE case in flink #523).** 
That one is about reusing a 2PC / precommitted transaction's label, where you 
do have to abort the stale PREPARE first or the same-label load is rejected 
forever. Here I deliberately don't reuse labels under 2PC at all — 
`onBatchFailed()` is a no-op when 2PC is on, since 2PC already gets 
exactly-once from precommit/commit (the retry opens a fresh txn and the 
orphaned precommit aborts). So there's no PREPARE state hanging around to 
clear. For auto-commit there's no long-lived transaction, and at the point the 
label collides there's no txn id to abort anyway. The three end states are 
handled directly:
   
   - original **committed** → BE returns `Label Already Exists` with 
`ExistingJobStatus=FINISHED` → we treat the retry as a no-op (rows are already 
there);
   - original **aborted/cancelled** → the label is free again → the retry's 
fresh load just succeeds;
   - original **still RUNNING/PRECOMMITTED** → we don't mask it; it stays a 
retriable failure (masking could silently drop rows) and the retry interval 
gives the BE time to settle.
   
   So abort-before-reuse doesn't really apply here, and would actually be 
unsafe — in the FINISHED case the only thing to abort would be the load we want 
to keep. Happy to drop in a comment/test making this explicit if you think it's 
worth it.
   
   **4. Where we actually hit the duplicate.** It came up while getting the 
Spark 4.1 failover IT to pass. Under Spark 4.x the retry backoff was collapsing 
— the async stream-load worker interrupts the task thread on failure, and the 
interrupt also unparks the old `LockSupport.parkNanos`, so the interval shrank 
to a few ms (fixed separately in the retry-interval commit). With the interval 
gone, a batch that had already committed on the BE but whose client ack was 
lost to the interrupt got retried right away under a new label, and the rows 
landed twice. Reusing the label is what makes the BE dedup that retry.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to