Copilot commented on code in PR #64850:
URL: https://github.com/apache/doris/pull/64850#discussion_r3518159281
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java:
##########
@@ -1065,6 +1065,11 @@ public boolean hasNext() {
}
nextRecord = element;
return true;
+ } else if (SourceRecordUtils.isSchemaChangeEvent(element)) {
+ // PostgresSchemaRecord is synthetic and has no source
offset. Keep the current
+ // stream offset; the following DML or heartbeat advances
it.
+ nextRecord = element;
+ return true;
} else if (SourceRecordUtils.isDataChangeRecord(element)) {
Review Comment:
Schema-change events are always forwarded without advancing the stream split
offset. This is correct for PostgresSchemaRecord (no sourceOffset), but if
SourceRecordUtils.isSchemaChangeEvent ever returns true for a record that
*does* carry a source offset, the committed offset can get stuck before the DDL
event and reprocess the same schema-change record indefinitely. Consider
advancing the offset when element.sourceOffset() is non-null/non-empty, and
only skipping advancement for synthetic/no-offset records.
##########
fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/CdcClientWriteHarness.java:
##########
@@ -221,6 +228,33 @@ void enterBinlog(List<SnapshotSplit> splits) throws
Exception {
runWrite(finishedSplitsMeta(splits, committedSnapshotOffsets()));
}
+ /** Inject an invalid entry after the request baseline has been loaded. */
+ void injectUnserializableTableSchema() {
+ AbstractCdcSourceReader reader = (AbstractCdcSourceReader)
openReader();
+ TableId tableId = new TableId(null, "public",
"invalid_serialization_schema");
+ Table invalidTable =
+ (Table)
+ Proxy.newProxyInstance(
+ Table.class.getClassLoader(),
+ new Class<?>[] {Table.class},
+ (proxy, method, args) -> {
+ if ("id".equals(method.getName())) {
+ return tableId;
+ }
+ throw new IllegalStateException("injected
schema serialization failure");
+ });
Review Comment:
injectUnserializableTableSchema() uses java.lang.reflect.Proxy to create a
Table instance, but Proxy can only implement interfaces.
io.debezium.relational.Table is a concrete class (note the production/test code
uses Table.editor()), so this will throw IllegalArgumentException at injection
time and the IT case won’t actually exercise the intended serializeTableSchemas
failure path. Consider injecting a failing entry using a different mechanism
(e.g., null/invalid TableChange) and adjusting the IT assertion accordingly, or
use a mocking library capable of mocking Debezium Table.
##########
fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/itcase/PostgresWriteSchemaChangeITCase.java:
##########
@@ -122,6 +128,105 @@ void addColumnIsDetectedExecutedAndDataStreamLoaded()
throws Exception {
}
}
+ @Test
+ void schemaSerializationFailureDoesNotCommitOffset() throws Exception {
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ try (MockDorisServer mock = new MockDorisServer();
+ CdcClientWriteHarness harness =
+ CdcClientWriteHarness.postgres(
+ jobId,
+ POSTGRES.getHost(),
+
POSTGRES.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT),
+ POSTGRES.getUsername(),
+ POSTGRES.getPassword(),
+ POSTGRES.getDatabaseName(),
+ "public",
+ "t_user",
+ "initial",
+ "doris_target_db",
+ mock)) {
+
+ List<SnapshotSplit> splits =
harness.fetchAllSnapshotSplits("t_user");
+ harness.writeSnapshot(splits);
+ harness.enterBinlog(splits);
+ String committedOffsetBeforeSchemaChange =
harness.committedOffset();
+
+ try (Connection conn = connect();
+ Statement st = conn.createStatement()) {
+ st.execute("ALTER TABLE t_user ADD COLUMN age INT");
+ st.execute("INSERT INTO t_user (id, name, age) VALUES (3,
'carol', 30)");
+ }
+
+ mock.blockNextDdlResponse();
+ Future<List<String>> write =
+ executor.submit(() -> harness.continueBinlog(1,
Duration.ofSeconds(90)));
+
assertThat(mock.awaitBlockedDdlRequest(Duration.ofSeconds(30))).isTrue();
+ harness.injectUnserializableTableSchema();
+ mock.releaseBlockedDdlResponse();
+
+ assertThatThrownBy(() -> write.get(90, TimeUnit.SECONDS))
+ .isInstanceOf(ExecutionException.class)
+ .hasCauseInstanceOf(IllegalStateException.class);
Review Comment:
This test assumes injectUnserializableTableSchema() succeeds and causes the
writer thread to fail with an IllegalStateException. As currently implemented,
injectUnserializableTableSchema() will likely throw immediately (Proxy cannot
implement Debezium Table if it is a class), so the failure may occur on the
main test thread instead and this assertion will never run. Once the injection
mechanism is fixed, re-check and align the expected exception/cause here to the
actual serialization failure behavior.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]