github-actions[bot] commented on PR #61481:
URL: https://github.com/apache/doris/pull/61481#issuecomment-4087432003

   ## Code Review Summary
   
   **PR: [fix](streamingjob) fix postgres DML silently dropped on task restart**
   
   ### Overview
   This PR fixes a bug where the first DML of a new Postgres streaming 
transaction is intermittently dropped (10-20% failure rate) on task restart, 
with no error logged. The fix strips `lsn_proc` and `lsn_commit` keys from the 
persisted binlog state offset so that Debezium's `WalPositionLocator` skips its 
"find phase" entirely, preventing incorrect filtering of DML events. This is a 
workaround for FLINK-39265.
   
   ### Critical Checkpoint Conclusions
   
   1. **Goal / Correctness**: The fix correctly addresses the root cause. In 
pgoutput non-streaming mode (proto_version=1, Debezium 1.9.x), BEGIN and DML 
messages share the same `XLogData.data_start`. When `begin_lsn` equals the 
previous `commit_lsn`, the find phase incorrectly marks DMLs as 
already-processed. Stripping `lsn_proc`/`lsn_commit` sets 
`lastCommitStoredLsn=null` in `WalPositionLocator`, causing immediate 
find-phase exit. Verified that all downstream code paths (Flink CDC 
`PostgresOffset`, Debezium `Loader.load()`, replication slot flushing) 
gracefully handle the absence of these keys.
   
   2. **Modification size / clarity**: Very small and focused — a single method 
override (7 lines of code) plus an excellent Javadoc comment explaining the 
rationale. The log message fix in `StreamingMultiTblTask.java` is a trivial 
correctness improvement. No issues.
   
   3. **Concurrency**: No concerns. `extractBinlogStateOffset` operates on a 
locally created `HashMap` returned by `super.extractBinlogStateOffset()`. No 
shared state is modified.
   
   4. **Lifecycle management**: No new lifecycle concerns introduced.
   
   5. **Configuration items**: None added. N/A.
   
   6. **Incompatible changes / rolling upgrade**: The persisted offset format 
changes (two keys stripped). Verified safe: (a) Debezium's 
`PostgresOffsetContext.Loader.load()` falls back to the main `lsn` value when 
`lsn_proc`/`lsn_commit` are absent; (b) existing offsets containing these keys 
continue to work since stripping only occurs on extraction; (c) the live 
streaming `offsetContext` continues to populate these keys normally during 
operation, so replication slot management is unaffected.
   
   7. **Parallel code paths**: `MySqlSourceReader` has its own 
`extractBinlogStateOffset` override that does not need this fix (MySQL binlog 
offsets are not affected by this Postgres-specific WAL position issue). No 
action needed.
   
   8. **Test coverage**: **No tests exist** for `PostgresSourceReader` or 
`extractBinlogStateOffset`. The entire `fs_brokers/cdc_client` module has 
minimal test coverage (only `SchemaChangeHelperTest` and `SmallFileMgrTest`). 
While testing this specific Debezium integration bug requires a running 
Postgres instance with specific WAL patterns (making unit testing difficult), a 
unit test verifying that `extractBinlogStateOffset` strips the expected keys 
from a mock offset map would be straightforward and valuable.
   
   9. **Observability**: The thorough Javadoc comment with JIRA reference 
provides good documentation. No additional logging needed for this fix path.
   
   10. **Performance**: No concerns — two `HashMap.remove()` calls on a small 
map.
   
   11. **Other issues**: None found. The log message fix (`"Send get task fail 
reason"` -> `"Send get task timeout reason"`) correctly matches the method name 
`getTimeoutReason()`.
   
   ### Verdict
   **No blocking issues.** The fix is correct, well-documented, and safe for 
production. The only suggestion is to consider adding a unit test for the 
`extractBinlogStateOffset` override to prevent future regressions, but this is 
not a blocker.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to