github-actions[bot] commented on PR #61481: URL: https://github.com/apache/doris/pull/61481#issuecomment-4087432003
## Code Review Summary **PR: [fix](streamingjob) fix postgres DML silently dropped on task restart** ### Overview This PR fixes a bug where the first DML of a new Postgres streaming transaction is intermittently dropped (10-20% failure rate) on task restart, with no error logged. The fix strips `lsn_proc` and `lsn_commit` keys from the persisted binlog state offset so that Debezium's `WalPositionLocator` skips its "find phase" entirely, preventing incorrect filtering of DML events. This is a workaround for FLINK-39265. ### Critical Checkpoint Conclusions 1. **Goal / Correctness**: The fix correctly addresses the root cause. In pgoutput non-streaming mode (proto_version=1, Debezium 1.9.x), BEGIN and DML messages share the same `XLogData.data_start`. When `begin_lsn` equals the previous `commit_lsn`, the find phase incorrectly marks DMLs as already-processed. Stripping `lsn_proc`/`lsn_commit` sets `lastCommitStoredLsn=null` in `WalPositionLocator`, causing immediate find-phase exit. Verified that all downstream code paths (Flink CDC `PostgresOffset`, Debezium `Loader.load()`, replication slot flushing) gracefully handle the absence of these keys. 2. **Modification size / clarity**: Very small and focused — a single method override (7 lines of code) plus an excellent Javadoc comment explaining the rationale. The log message fix in `StreamingMultiTblTask.java` is a trivial correctness improvement. No issues. 3. **Concurrency**: No concerns. `extractBinlogStateOffset` operates on a locally created `HashMap` returned by `super.extractBinlogStateOffset()`. No shared state is modified. 4. **Lifecycle management**: No new lifecycle concerns introduced. 5. **Configuration items**: None added. N/A. 6. **Incompatible changes / rolling upgrade**: The persisted offset format changes (two keys stripped). Verified safe: (a) Debezium's `PostgresOffsetContext.Loader.load()` falls back to the main `lsn` value when `lsn_proc`/`lsn_commit` are absent; (b) existing offsets containing these keys continue to work since stripping only occurs on extraction; (c) the live streaming `offsetContext` continues to populate these keys normally during operation, so replication slot management is unaffected. 7. **Parallel code paths**: `MySqlSourceReader` has its own `extractBinlogStateOffset` override that does not need this fix (MySQL binlog offsets are not affected by this Postgres-specific WAL position issue). No action needed. 8. **Test coverage**: **No tests exist** for `PostgresSourceReader` or `extractBinlogStateOffset`. The entire `fs_brokers/cdc_client` module has minimal test coverage (only `SchemaChangeHelperTest` and `SmallFileMgrTest`). While testing this specific Debezium integration bug requires a running Postgres instance with specific WAL patterns (making unit testing difficult), a unit test verifying that `extractBinlogStateOffset` strips the expected keys from a mock offset map would be straightforward and valuable. 9. **Observability**: The thorough Javadoc comment with JIRA reference provides good documentation. No additional logging needed for this fix path. 10. **Performance**: No concerns — two `HashMap.remove()` calls on a small map. 11. **Other issues**: None found. The log message fix (`"Send get task fail reason"` -> `"Send get task timeout reason"`) correctly matches the method name `getTimeoutReason()`. ### Verdict **No blocking issues.** The fix is correct, well-documented, and safe for production. The only suggestion is to consider adding a unit test for the `extractBinlogStateOffset` override to prevent future regressions, but this is not a blocker. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
