github-actions[bot] commented on code in PR #64850:
URL: https://github.com/apache/doris/pull/64850#discussion_r3519132084
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java:
##########
@@ -34,241 +33,170 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.function.Function;
import java.util.stream.Collectors;
-import static io.debezium.connector.AbstractSourceInfo.SCHEMA_NAME_KEY;
-import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY;
-
-import io.debezium.data.Envelope;
import io.debezium.relational.Column;
-import io.debezium.relational.TableEditor;
+import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * PostgreSQL-specific deserializer that detects schema changes (ADD/DROP
column only) by comparing
- * the record's Kafka Connect schema field names with stored tableSchemas.
+ * PostgreSQL-specific deserializer with event-driven schema change handling.
+ *
+ * <p>Schema changes are detected from pgoutput Relation messages, which
flink-cdc surfaces as
+ * {@link PostgresSchemaRecord} (the source is created with {@code
includeSchemaChanges(true)}). The
+ * carried Debezium {@link Table} is the full post-change schema and is diffed
against the stored
+ * baseline — the Doris table's current full schema, loaded from FE — to
derive ADD/DROP column DDL.
+ * Regular DML records are emitted directly without per-record schema
comparison.
*
- * <p>Because PostgreSQL does not emit DDL events in the WAL stream, schema
detection is done by
- * comparing the "after" struct field names in each DML record against the
known column set.
+ * <p>The baseline is also established by Relation events: when a table first
appears (e.g. a stream
+ * started directly from an offset without a snapshot), pgoutput sends its
Relation before the first
+ * DML, and the {@code stored == null} branch of {@link
#handleSchemaChangeEvent} adopts the current
+ * schema as the baseline (no DDL). No JDBC fallback is needed — a DML can
only reach this
+ * deserializer after Debezium has resolved its Relation (otherwise Debezium
drops it as a
+ * NoopMessage), and that Relation has already established the baseline.
*
- * <p>Type comparison is intentionally skipped to avoid false positives caused
by Kafka Connect type
- * ambiguity (e.g. text/varchar/json/uuid all appear as STRING). When a column
add or drop is
- * detected, the accurate column types are fetched directly from PostgreSQL
via the injected {@link
- * #pgSchemaRefresher} callback.
+ * <p>Only ADD and DROP column are emitted. A simultaneous ADD+DROP is treated
as a possible RENAME
+ * and skipped (RENAME manually in Doris). MODIFY column type is not emitted.
*
- * <p>MODIFY column type is not supported — users must manually execute ALTER
TABLE ... MODIFY
- * COLUMN in Doris when a PG column type changes.
+ * <p>The emitted DDL is only applied on the from-to (at-least-once) write
path; the TVF
+ * (exactly-once) fetch path consumes DML only and does not execute
schema-change records, so
+ * automatic schema change is effective for from-to mode.
*/
public class PostgresDebeziumJsonDeserializer extends DebeziumJsonDeserializer
{
private static final long serialVersionUID = 1L;
private static final Logger LOG =
LoggerFactory.getLogger(PostgresDebeziumJsonDeserializer.class);
- /**
- * Callback to fetch the current PG table schema for a single table via
JDBC. Injected by {@link
- * org.apache.doris.cdcclient.source.reader.postgres.PostgresSourceReader}
after initialization.
- */
- private transient Function<TableId, TableChanges.TableChange>
pgSchemaRefresher;
-
- public void setPgSchemaRefresher(Function<TableId,
TableChanges.TableChange> refresher) {
- this.pgSchemaRefresher = refresher;
- }
-
@Override
public DeserializeResult deserialize(Map<String, String> context,
SourceRecord record)
throws IOException {
+ // 1. Schema change event (pgoutput Relation -> PostgresSchemaRecord).
+ if (SourceRecordUtils.isSchemaChangeEvent(record)) {
+ return handleSchemaChangeEvent(context, record);
+ }
+ // 2. Non-DML (heartbeat / watermark / etc.).
if (!RecordUtils.isDataChangeRecord(record)) {
return DeserializeResult.empty();
}
+ return super.deserialize(context, record);
+ }
- Schema valueSchema = record.valueSchema();
- if (valueSchema == null) {
- return super.deserialize(context, record);
- }
-
- Field afterField = valueSchema.field(Envelope.FieldName.AFTER);
- if (afterField == null) {
- return super.deserialize(context, record);
- }
-
- Schema afterSchema = afterField.schema();
- TableId tableId = extractTableId(record);
+ /**
+ * Handle a pgoutput Relation-driven schema change: diff the post-change
PG schema (carried by
+ * {@link PostgresSchemaRecord}) against the stored Doris baseline and
emit ADD/DROP column DDL.
+ * When no baseline exists yet (first appearance of the table), adopt the
fresh schema as the
+ * baseline without emitting any DDL.
+ */
+ private DeserializeResult handleSchemaChangeEvent(
+ Map<String, String> context, SourceRecord record) {
+ Table freshTable = ((PostgresSchemaRecord) record).getTable();
+ // Debezium PG TableId is (catalog=null, schema, table) — matches the
tableSchemas key.
+ TableId tableId = freshTable.id();
TableChanges.TableChange stored = tableSchemas != null ?
tableSchemas.get(tableId) : null;
- // No baseline schema available — cannot detect changes, fall through
to normal
- // deserialization
+ // changeType is not consumed inside cdc_client — downstream only
reads getTable() and
+ // serializeTableSchemas does not persist it — so ALTER is used
uniformly, including for the
+ // first-time baseline below (which is semantically a CREATE).
+ TableChanges.TableChange freshChange =
+ new
TableChanges.TableChange(TableChanges.TableChangeType.ALTER, freshTable);
+ Map<TableId, TableChanges.TableChange> updatedSchemas = new
HashMap<>();
+
+ // No baseline yet: adopt the fresh schema as baseline, no DDL.
if (stored == null || stored.getTable() == null) {
- LOG.debug(
- "No stored schema for table {}, skipping schema change
detection.",
+ LOG.info(
+ "[SCHEMA-CHANGE] Table {}: no baseline, adopting fresh
schema as baseline (no DDL)",
tableId.identifier());
- return super.deserialize(context, record);
- }
-
- // First pass: name-only diff — fast, in-memory, no type comparison,
no false positives
- SchemaChangeHelper.SchemaDiff nameDiff =
- SchemaChangeHelper.diffSchemaByName(afterSchema, stored);
- if (nameDiff.isEmpty()) {
- return super.deserialize(context, record);
- }
-
- Preconditions.checkNotNull(
- pgSchemaRefresher,
- "pgSchemaRefresher callback is not set. Cannot fetch fresh PG
schema for change detection.");
-
- // the last fresh schema
- TableChanges.TableChange fresh = pgSchemaRefresher.apply(tableId);
- if (fresh == null || fresh.getTable() == null) {
- // Cannot proceed: DDL must be executed before the triggering DML
record is written,
- // otherwise new column data in this record would be silently
dropped.
- // Throwing here causes the batch to be retried from the same
offset.
- throw new IOException(
- "Failed to fetch fresh schema for table "
- + tableId.identifier()
- + "; cannot apply schema change safely. Will
retry.");
+ updatedSchemas.put(tableId, freshChange);
+ return DeserializeResult.schemaChange(
+ Collections.emptyList(), updatedSchemas,
Collections.emptyList());
}
- // Second diff: use afterSchema as the source of truth for which
columns the current WAL
- // record is aware of. Only process additions/drops visible in
afterSchema — columns that
- // exist in fresh (JDBC) but are absent from afterSchema belong to a
later DDL that has not
- // yet produced a DML record, and will be processed when that DML
record arrives.
- //
- // pgAdded: present in afterSchema but absent from stored → look up
Column in fresh for
- // accurate PG type metadata. If fresh doesn't have the
column yet (shouldn't
- // happen normally), skip it.
- // pgDropped: present in stored but absent from afterSchema.
- List<Column> pgAdded = new ArrayList<>();
- List<String> pgDropped = new ArrayList<>();
-
- for (Field field : afterSchema.fields()) {
- if (stored.getTable().columnWithName(field.name()) == null) {
- Column freshCol =
fresh.getTable().columnWithName(field.name());
- if (freshCol != null) {
- pgAdded.add(freshCol);
- }
+ List<Column> added = new ArrayList<>();
+ List<String> dropped = new ArrayList<>();
+ for (Column col : freshTable.columns()) {
+ if (stored.getTable().columnWithName(col.name()) == null) {
+ added.add(col);
}
}
-
for (Column col : stored.getTable().columns()) {
- if (afterSchema.field(col.name()) == null) {
- pgDropped.add(col.name());
+ if (freshTable.columnWithName(col.name()) == null) {
+ dropped.add(col.name());
}
}
- // Second diff is empty: nameDiff was a false positive (PG schema
unchanged vs stored).
- // This happens when pgSchemaRefresher returns a schema ahead of the
current WAL position
- // (e.g. a later DDL was already applied in PG while we're still
consuming older DML
- // records).
- // No DDL needed, no tableSchema update, no extra stream load — just
process the DML
- // normally.
- if (pgAdded.isEmpty() && pgDropped.isEmpty()) {
- return super.deserialize(context, record);
+ // No-op: Relation re-emitted but schema unchanged vs the FE baseline
(idempotent).
+ if (added.isEmpty() && dropped.isEmpty()) {
+ return DeserializeResult.empty();
}
- // Build updatedSchemas from fresh filtered to afterSchema columns
only, so that the stored
- // cache does not jump ahead to include columns not yet seen by any
DML record. Those
- // unseen columns will trigger their own schema change when their
first DML record arrives.
- TableEditor editor = fresh.getTable().edit();
- for (Column col : fresh.getTable().columns()) {
- if (afterSchema.field(col.name()) == null) {
- editor.removeColumn(col.name());
- }
- }
- TableChanges.TableChange filteredChange =
- new
TableChanges.TableChange(TableChanges.TableChangeType.ALTER, editor.create());
- Map<TableId, TableChanges.TableChange> updatedSchemas = new
HashMap<>();
- updatedSchemas.put(tableId, filteredChange);
+ updatedSchemas.put(tableId, freshChange);
- // Rename guard: simultaneous ADD+DROP may be a column RENAME — skip
DDL to avoid data loss.
- // Users must manually RENAME the column in Doris.
- if (!pgAdded.isEmpty() && !pgDropped.isEmpty()) {
+ // Rename guard: simultaneous ADD+DROP may be a RENAME — skip DDL to
avoid data loss.
+ if (!added.isEmpty() && !dropped.isEmpty()) {
LOG.warn(
- "[SCHEMA-CHANGE-SKIPPED] Table: {}\n"
- + "Potential RENAME detected (simultaneous
DROP+ADD).\n"
- + "Dropped columns: {}\n"
- + "Added columns: {}\n"
- + "No DDL emitted to prevent data loss.\n"
- + "Action required: manually RENAME column(s) in
Doris,"
- + " then data will resume.",
+ "[SCHEMA-CHANGE-SKIPPED] Table {}: simultaneous DROP {}
and ADD {} looks like a"
+ + " RENAME; no DDL emitted, please RENAME
column(s) manually in Doris.",
tableId.identifier(),
- pgDropped,
-
pgAdded.stream().map(Column::name).collect(Collectors.toList()));
- List<String> dmlRecords = super.deserialize(context,
record).getRecords();
+ dropped,
+
added.stream().map(Column::name).collect(Collectors.toList()));
return DeserializeResult.schemaChange(
Review Comment:
When this branch treats a simultaneous DROP+ADD as a possible rename, it
still returns `updatedSchemas` for the fresh PostgreSQL schema while emitting
no Doris DDL. The coordinator then applies that baseline and can commit the
following DML offsets even though the Doris table still has the dropped column
and not the added one. The new regression codifies the consequence: after `DROP
c1` + `ADD c3`, `c3` is never added and rows are written with `c1` as NULL. If
this was really a rename/manual-intervention case, the writer should fail or
otherwise avoid advancing/committing past the Relation/DML until the target
schema is corrected; otherwise the renamed column values are skipped
permanently and a later manual rename cannot recover them from the committed
stream.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]