github-actions[bot] commented on code in PR #64850:
URL: https://github.com/apache/doris/pull/64850#discussion_r3519132084


##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java:
##########
@@ -34,241 +33,170 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 
-import static io.debezium.connector.AbstractSourceInfo.SCHEMA_NAME_KEY;
-import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY;
-
-import io.debezium.data.Envelope;
 import io.debezium.relational.Column;
-import io.debezium.relational.TableEditor;
+import io.debezium.relational.Table;
 import io.debezium.relational.TableId;
 import io.debezium.relational.history.TableChanges;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * PostgreSQL-specific deserializer that detects schema changes (ADD/DROP 
column only) by comparing
- * the record's Kafka Connect schema field names with stored tableSchemas.
+ * PostgreSQL-specific deserializer with event-driven schema change handling.
+ *
+ * <p>Schema changes are detected from pgoutput Relation messages, which 
flink-cdc surfaces as
+ * {@link PostgresSchemaRecord} (the source is created with {@code 
includeSchemaChanges(true)}). The
+ * carried Debezium {@link Table} is the full post-change schema and is diffed 
against the stored
+ * baseline — the Doris table's current full schema, loaded from FE — to 
derive ADD/DROP column DDL.
+ * Regular DML records are emitted directly without per-record schema 
comparison.
  *
- * <p>Because PostgreSQL does not emit DDL events in the WAL stream, schema 
detection is done by
- * comparing the "after" struct field names in each DML record against the 
known column set.
+ * <p>The baseline is also established by Relation events: when a table first 
appears (e.g. a stream
+ * started directly from an offset without a snapshot), pgoutput sends its 
Relation before the first
+ * DML, and the {@code stored == null} branch of {@link 
#handleSchemaChangeEvent} adopts the current
+ * schema as the baseline (no DDL). No JDBC fallback is needed — a DML can 
only reach this
+ * deserializer after Debezium has resolved its Relation (otherwise Debezium 
drops it as a
+ * NoopMessage), and that Relation has already established the baseline.
  *
- * <p>Type comparison is intentionally skipped to avoid false positives caused 
by Kafka Connect type
- * ambiguity (e.g. text/varchar/json/uuid all appear as STRING). When a column 
add or drop is
- * detected, the accurate column types are fetched directly from PostgreSQL 
via the injected {@link
- * #pgSchemaRefresher} callback.
+ * <p>Only ADD and DROP column are emitted. A simultaneous ADD+DROP is treated 
as a possible RENAME
+ * and skipped (RENAME manually in Doris). MODIFY column type is not emitted.
  *
- * <p>MODIFY column type is not supported — users must manually execute ALTER 
TABLE ... MODIFY
- * COLUMN in Doris when a PG column type changes.
+ * <p>The emitted DDL is only applied on the from-to (at-least-once) write 
path; the TVF
+ * (exactly-once) fetch path consumes DML only and does not execute 
schema-change records, so
+ * automatic schema change is effective for from-to mode.
  */
 public class PostgresDebeziumJsonDeserializer extends DebeziumJsonDeserializer 
{
     private static final long serialVersionUID = 1L;
     private static final Logger LOG =
             LoggerFactory.getLogger(PostgresDebeziumJsonDeserializer.class);
 
-    /**
-     * Callback to fetch the current PG table schema for a single table via 
JDBC. Injected by {@link
-     * org.apache.doris.cdcclient.source.reader.postgres.PostgresSourceReader} 
after initialization.
-     */
-    private transient Function<TableId, TableChanges.TableChange> 
pgSchemaRefresher;
-
-    public void setPgSchemaRefresher(Function<TableId, 
TableChanges.TableChange> refresher) {
-        this.pgSchemaRefresher = refresher;
-    }
-
     @Override
     public DeserializeResult deserialize(Map<String, String> context, 
SourceRecord record)
             throws IOException {
+        // 1. Schema change event (pgoutput Relation -> PostgresSchemaRecord).
+        if (SourceRecordUtils.isSchemaChangeEvent(record)) {
+            return handleSchemaChangeEvent(context, record);
+        }
+        // 2. Non-DML (heartbeat / watermark / etc.).
         if (!RecordUtils.isDataChangeRecord(record)) {
             return DeserializeResult.empty();
         }
+        return super.deserialize(context, record);
+    }
 
-        Schema valueSchema = record.valueSchema();
-        if (valueSchema == null) {
-            return super.deserialize(context, record);
-        }
-
-        Field afterField = valueSchema.field(Envelope.FieldName.AFTER);
-        if (afterField == null) {
-            return super.deserialize(context, record);
-        }
-
-        Schema afterSchema = afterField.schema();
-        TableId tableId = extractTableId(record);
+    /**
+     * Handle a pgoutput Relation-driven schema change: diff the post-change 
PG schema (carried by
+     * {@link PostgresSchemaRecord}) against the stored Doris baseline and 
emit ADD/DROP column DDL.
+     * When no baseline exists yet (first appearance of the table), adopt the 
fresh schema as the
+     * baseline without emitting any DDL.
+     */
+    private DeserializeResult handleSchemaChangeEvent(
+            Map<String, String> context, SourceRecord record) {
+        Table freshTable = ((PostgresSchemaRecord) record).getTable();
+        // Debezium PG TableId is (catalog=null, schema, table) — matches the 
tableSchemas key.
+        TableId tableId = freshTable.id();
         TableChanges.TableChange stored = tableSchemas != null ? 
tableSchemas.get(tableId) : null;
 
-        // No baseline schema available — cannot detect changes, fall through 
to normal
-        // deserialization
+        // changeType is not consumed inside cdc_client — downstream only 
reads getTable() and
+        // serializeTableSchemas does not persist it — so ALTER is used 
uniformly, including for the
+        // first-time baseline below (which is semantically a CREATE).
+        TableChanges.TableChange freshChange =
+                new 
TableChanges.TableChange(TableChanges.TableChangeType.ALTER, freshTable);
+        Map<TableId, TableChanges.TableChange> updatedSchemas = new 
HashMap<>();
+
+        // No baseline yet: adopt the fresh schema as baseline, no DDL.
         if (stored == null || stored.getTable() == null) {
-            LOG.debug(
-                    "No stored schema for table {}, skipping schema change 
detection.",
+            LOG.info(
+                    "[SCHEMA-CHANGE] Table {}: no baseline, adopting fresh 
schema as baseline (no DDL)",
                     tableId.identifier());
-            return super.deserialize(context, record);
-        }
-
-        // First pass: name-only diff — fast, in-memory, no type comparison, 
no false positives
-        SchemaChangeHelper.SchemaDiff nameDiff =
-                SchemaChangeHelper.diffSchemaByName(afterSchema, stored);
-        if (nameDiff.isEmpty()) {
-            return super.deserialize(context, record);
-        }
-
-        Preconditions.checkNotNull(
-                pgSchemaRefresher,
-                "pgSchemaRefresher callback is not set. Cannot fetch fresh PG 
schema for change detection.");
-
-        // the last fresh schema
-        TableChanges.TableChange fresh = pgSchemaRefresher.apply(tableId);
-        if (fresh == null || fresh.getTable() == null) {
-            // Cannot proceed: DDL must be executed before the triggering DML 
record is written,
-            // otherwise new column data in this record would be silently 
dropped.
-            // Throwing here causes the batch to be retried from the same 
offset.
-            throw new IOException(
-                    "Failed to fetch fresh schema for table "
-                            + tableId.identifier()
-                            + "; cannot apply schema change safely. Will 
retry.");
+            updatedSchemas.put(tableId, freshChange);
+            return DeserializeResult.schemaChange(
+                    Collections.emptyList(), updatedSchemas, 
Collections.emptyList());
         }
 
-        // Second diff: use afterSchema as the source of truth for which 
columns the current WAL
-        // record is aware of. Only process additions/drops visible in 
afterSchema — columns that
-        // exist in fresh (JDBC) but are absent from afterSchema belong to a 
later DDL that has not
-        // yet produced a DML record, and will be processed when that DML 
record arrives.
-        //
-        // pgAdded: present in afterSchema but absent from stored → look up 
Column in fresh for
-        //          accurate PG type metadata. If fresh doesn't have the 
column yet (shouldn't
-        //          happen normally), skip it.
-        // pgDropped: present in stored but absent from afterSchema.
-        List<Column> pgAdded = new ArrayList<>();
-        List<String> pgDropped = new ArrayList<>();
-
-        for (Field field : afterSchema.fields()) {
-            if (stored.getTable().columnWithName(field.name()) == null) {
-                Column freshCol = 
fresh.getTable().columnWithName(field.name());
-                if (freshCol != null) {
-                    pgAdded.add(freshCol);
-                }
+        List<Column> added = new ArrayList<>();
+        List<String> dropped = new ArrayList<>();
+        for (Column col : freshTable.columns()) {
+            if (stored.getTable().columnWithName(col.name()) == null) {
+                added.add(col);
             }
         }
-
         for (Column col : stored.getTable().columns()) {
-            if (afterSchema.field(col.name()) == null) {
-                pgDropped.add(col.name());
+            if (freshTable.columnWithName(col.name()) == null) {
+                dropped.add(col.name());
             }
         }
 
-        // Second diff is empty: nameDiff was a false positive (PG schema 
unchanged vs stored).
-        // This happens when pgSchemaRefresher returns a schema ahead of the 
current WAL position
-        // (e.g. a later DDL was already applied in PG while we're still 
consuming older DML
-        // records).
-        // No DDL needed, no tableSchema update, no extra stream load — just 
process the DML
-        // normally.
-        if (pgAdded.isEmpty() && pgDropped.isEmpty()) {
-            return super.deserialize(context, record);
+        // No-op: Relation re-emitted but schema unchanged vs the FE baseline 
(idempotent).
+        if (added.isEmpty() && dropped.isEmpty()) {
+            return DeserializeResult.empty();
         }
 
-        // Build updatedSchemas from fresh filtered to afterSchema columns 
only, so that the stored
-        // cache does not jump ahead to include columns not yet seen by any 
DML record. Those
-        // unseen columns will trigger their own schema change when their 
first DML record arrives.
-        TableEditor editor = fresh.getTable().edit();
-        for (Column col : fresh.getTable().columns()) {
-            if (afterSchema.field(col.name()) == null) {
-                editor.removeColumn(col.name());
-            }
-        }
-        TableChanges.TableChange filteredChange =
-                new 
TableChanges.TableChange(TableChanges.TableChangeType.ALTER, editor.create());
-        Map<TableId, TableChanges.TableChange> updatedSchemas = new 
HashMap<>();
-        updatedSchemas.put(tableId, filteredChange);
+        updatedSchemas.put(tableId, freshChange);
 
-        // Rename guard: simultaneous ADD+DROP may be a column RENAME — skip 
DDL to avoid data loss.
-        // Users must manually RENAME the column in Doris.
-        if (!pgAdded.isEmpty() && !pgDropped.isEmpty()) {
+        // Rename guard: simultaneous ADD+DROP may be a RENAME — skip DDL to 
avoid data loss.
+        if (!added.isEmpty() && !dropped.isEmpty()) {
             LOG.warn(
-                    "[SCHEMA-CHANGE-SKIPPED] Table: {}\n"
-                            + "Potential RENAME detected (simultaneous 
DROP+ADD).\n"
-                            + "Dropped columns: {}\n"
-                            + "Added columns:   {}\n"
-                            + "No DDL emitted to prevent data loss.\n"
-                            + "Action required: manually RENAME column(s) in 
Doris,"
-                            + " then data will resume.",
+                    "[SCHEMA-CHANGE-SKIPPED] Table {}: simultaneous DROP {} 
and ADD {} looks like a"
+                            + " RENAME; no DDL emitted, please RENAME 
column(s) manually in Doris.",
                     tableId.identifier(),
-                    pgDropped,
-                    
pgAdded.stream().map(Column::name).collect(Collectors.toList()));
-            List<String> dmlRecords = super.deserialize(context, 
record).getRecords();
+                    dropped,
+                    
added.stream().map(Column::name).collect(Collectors.toList()));
             return DeserializeResult.schemaChange(

Review Comment:
   When this branch treats a simultaneous DROP+ADD as a possible rename, it 
still returns `updatedSchemas` for the fresh PostgreSQL schema while emitting 
no Doris DDL. The coordinator then applies that baseline and can commit the 
following DML offsets even though the Doris table still has the dropped column 
and not the added one. The new regression codifies the consequence: after `DROP 
c1` + `ADD c3`, `c3` is never added and rows are written with `c1` as NULL. If 
this was really a rename/manual-intervention case, the writer should fail or 
otherwise avoid advancing/committing past the Relation/DML until the target 
schema is corrected; otherwise the renamed column values are skipped 
permanently and a later manual rename cannot recover them from the committed 
stream.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to