Copilot commented on code in PR #64850:
URL: https://github.com/apache/doris/pull/64850#discussion_r3518159281


##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java:
##########
@@ -1065,6 +1065,11 @@ public boolean hasNext() {
                     }
                     nextRecord = element;
                     return true;
+                } else if (SourceRecordUtils.isSchemaChangeEvent(element)) {
+                    // PostgresSchemaRecord is synthetic and has no source 
offset. Keep the current
+                    // stream offset; the following DML or heartbeat advances 
it.
+                    nextRecord = element;
+                    return true;
                 } else if (SourceRecordUtils.isDataChangeRecord(element)) {

Review Comment:
   Schema-change events are always forwarded without advancing the stream split 
offset. This is correct for PostgresSchemaRecord (no sourceOffset), but if 
SourceRecordUtils.isSchemaChangeEvent ever returns true for a record that 
*does* carry a source offset, the committed offset can get stuck before the DDL 
event and reprocess the same schema-change record indefinitely. Consider 
advancing the offset when element.sourceOffset() is non-null/non-empty, and 
only skipping advancement for synthetic/no-offset records.



##########
fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/CdcClientWriteHarness.java:
##########
@@ -221,6 +228,33 @@ void enterBinlog(List<SnapshotSplit> splits) throws 
Exception {
         runWrite(finishedSplitsMeta(splits, committedSnapshotOffsets()));
     }
 
+    /** Inject an invalid entry after the request baseline has been loaded. */
+    void injectUnserializableTableSchema() {
+        AbstractCdcSourceReader reader = (AbstractCdcSourceReader) 
openReader();
+        TableId tableId = new TableId(null, "public", 
"invalid_serialization_schema");
+        Table invalidTable =
+                (Table)
+                        Proxy.newProxyInstance(
+                                Table.class.getClassLoader(),
+                                new Class<?>[] {Table.class},
+                                (proxy, method, args) -> {
+                                    if ("id".equals(method.getName())) {
+                                        return tableId;
+                                    }
+                                    throw new IllegalStateException("injected 
schema serialization failure");
+                                });

Review Comment:
   injectUnserializableTableSchema() uses java.lang.reflect.Proxy to create a 
Table instance, but Proxy can only implement interfaces. 
io.debezium.relational.Table is a concrete class (note the production/test code 
uses Table.editor()), so this will throw IllegalArgumentException at injection 
time and the IT case won’t actually exercise the intended serializeTableSchemas 
failure path. Consider injecting a failing entry using a different mechanism 
(e.g., null/invalid TableChange) and adjusting the IT assertion accordingly, or 
use a mocking library capable of mocking Debezium Table.



##########
fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/PostgresWriteSchemaChangeITCase.java:
##########
@@ -122,6 +128,105 @@ void addColumnIsDetectedExecutedAndDataStreamLoaded() 
throws Exception {
         }
     }
 
+    @Test
+    void schemaSerializationFailureDoesNotCommitOffset() throws Exception {
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        try (MockDorisServer mock = new MockDorisServer();
+                CdcClientWriteHarness harness =
+                        CdcClientWriteHarness.postgres(
+                                jobId,
+                                POSTGRES.getHost(),
+                                
POSTGRES.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT),
+                                POSTGRES.getUsername(),
+                                POSTGRES.getPassword(),
+                                POSTGRES.getDatabaseName(),
+                                "public",
+                                "t_user",
+                                "initial",
+                                "doris_target_db",
+                                mock)) {
+
+            List<SnapshotSplit> splits = 
harness.fetchAllSnapshotSplits("t_user");
+            harness.writeSnapshot(splits);
+            harness.enterBinlog(splits);
+            String committedOffsetBeforeSchemaChange = 
harness.committedOffset();
+
+            try (Connection conn = connect();
+                    Statement st = conn.createStatement()) {
+                st.execute("ALTER TABLE t_user ADD COLUMN age INT");
+                st.execute("INSERT INTO t_user (id, name, age) VALUES (3, 
'carol', 30)");
+            }
+
+            mock.blockNextDdlResponse();
+            Future<List<String>> write =
+                    executor.submit(() -> harness.continueBinlog(1, 
Duration.ofSeconds(90)));
+            
assertThat(mock.awaitBlockedDdlRequest(Duration.ofSeconds(30))).isTrue();
+            harness.injectUnserializableTableSchema();
+            mock.releaseBlockedDdlResponse();
+
+            assertThatThrownBy(() -> write.get(90, TimeUnit.SECONDS))
+                    .isInstanceOf(ExecutionException.class)
+                    .hasCauseInstanceOf(IllegalStateException.class);

Review Comment:
   This test assumes injectUnserializableTableSchema() succeeds and causes the 
writer thread to fail with an IllegalStateException. As currently implemented, 
injectUnserializableTableSchema() will likely throw immediately (Proxy cannot 
implement Debezium Table if it is a class), so the failure may occur on the 
main test thread instead and this assertion will never run. Once the injection 
mechanism is fixed, re-check and align the expected exception/cause here to the 
actual serialization failure behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to