Copilot commented on code in PR #61481:
URL: https://github.com/apache/doris/pull/61481#discussion_r2957572162


##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java:
##########
@@ -430,6 +431,25 @@ && getCurrentFetchTask() instanceof 
PostgresStreamFetchTask) {
         }
     }
 
+    /**
+     * Strip lsn_proc and lsn_commit from the binlog state offset before it is 
passed to debezium's
+     * WalPositionLocator. In pgoutput non-streaming mode (proto_version=1, 
used by debezium 1.9.x
+     * even on PG14), BEGIN and DML messages within a transaction share the 
same XLogData.data_start
+     * as the transaction's begin_lsn. When begin_lsn equals the previous 
transaction's commit_lsn
+     * (i.e. no other WAL write exists between them), WalPositionLocator adds 
that lsn to lsnSeen
+     * during the find phase and then incorrectly filters the DML as 
already-processed during actual
+     * streaming. Removing these keys sets lastCommitStoredLsn=null, so the 
find phase exits
+     * immediately at the first received message and switch-off happens before 
any DML is filtered.
+     * See https://issues.apache.org/jira/browse/FLINK-39265.
+     */
+    @Override
+    public Map<String, String> extractBinlogStateOffset(Object splitState) {
+        Map<String, String> offset = 
super.extractBinlogStateOffset(splitState);
+        offset.remove(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY);
+        offset.remove(PostgresOffsetContext.LAST_COMMIT_LSN_KEY);
+        return offset;

Review Comment:
   `super.extractBinlogStateOffset(splitState)` may return an unmodifiable map 
(common for “offset/state” snapshots). Calling `remove()` would then throw 
`UnsupportedOperationException` during task initialization/restart. Make a 
defensive mutable copy before removing keys (e.g., construct a new 
`HashMap<>(...)` and return that), so this override is safe regardless of the 
parent implementation.
   



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java:
##########
@@ -430,6 +431,25 @@ && getCurrentFetchTask() instanceof 
PostgresStreamFetchTask) {
         }
     }
 
+    /**
+     * Strip lsn_proc and lsn_commit from the binlog state offset before it is 
passed to debezium's
+     * WalPositionLocator. In pgoutput non-streaming mode (proto_version=1, 
used by debezium 1.9.x
+     * even on PG14), BEGIN and DML messages within a transaction share the 
same XLogData.data_start
+     * as the transaction's begin_lsn. When begin_lsn equals the previous 
transaction's commit_lsn
+     * (i.e. no other WAL write exists between them), WalPositionLocator adds 
that lsn to lsnSeen
+     * during the find phase and then incorrectly filters the DML as 
already-processed during actual
+     * streaming. Removing these keys sets lastCommitStoredLsn=null, so the 
find phase exits
+     * immediately at the first received message and switch-off happens before 
any DML is filtered.

Review Comment:
   The comment says “Strip lsn_proc and lsn_commit”, but the code removes 
Debezium’s `LAST_COMPLETELY_PROCESSED_LSN_KEY` and `LAST_COMMIT_LSN_KEY`. To 
avoid confusion for future maintainers, align terminology in the comment with 
the actual offset keys removed (e.g., explicitly mention the Debezium offset 
field names/keys being removed and how they map to `lsn_proc`/`lsn_commit` if 
applicable).
   



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java:
##########
@@ -357,7 +357,7 @@ public String getTimeoutReason() {
                 log.warn("Failed to get task timeout reason, response: {}", 
response);
             }
         } catch (ExecutionException | InterruptedException ex) {
-            log.error("Send get task fail reason request failed: ", ex);
+            log.error("Send get task timeout reason request failed: ", ex);

Review Comment:
   This log line has a trailing colon and space but no placeholder; it reads a 
bit odd in logs. Consider removing the colon or changing to a consistent 
pattern used elsewhere (e.g., `"Send get task timeout reason request failed"`), 
relying on the throwable to provide details.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to