Di Wu created FLINK-39748:
-----------------------------
Summary: Postgres CDC snapshot produces wrong values for TIMESTAMP
/ TIMESTAMPTZ / DATE columns with historical dates
Key: FLINK-39748
URL: https://issues.apache.org/jira/browse/FLINK-39748
Project: Flink
Issue Type: Improvement
Components: Flink CDC
Affects Versions: cdc-3.6.0
Reporter: Di Wu
*Description*
*Problem*
When the Postgres CDC source snapshots rows containing TIMESTAMP,
TIMESTAMPTZ, or DATE column values dated
before the Julian/Gregorian cutover (1582-10-15), the emitted Debezium
records carry numerically wrong
timestamps. The same row, when later re-emitted via the streaming (logical
decoding) path, carries the correct
value — so snapshot and streaming records for the same primary key disagree,
breaking idempotent UPSERT
downstream and causing phantom updates.
Example: snapshotting a row whose TIMESTAMP column holds '0001-01-01
00:00:00' produces a Debezium
MicroTimestamp of -62135769257000000 instead of the proleptic-UTC-correct
-62135596800000000 — a 2-day +
343-second drift on a JVM configured with Asia/Shanghai.
*Root* *Cause*
PostgresScanFetchTask#createDataEventsForTable reads column values via a bare
rs.getObject(i + 1). For temporal
columns, the PG JDBC driver returns java.sql.Timestamp / java.sql.Date
constructed through
java.util.GregorianCalendar (default Julian/Gregorian cutover at 1582-10-15)
using the JVM default time zone.
This introduces two independent drifts:
1. *Julian/Proleptic-Gregorian cutover* — values before 1582-10-15 are
interpreted as Julian, while PostgreSQL
stores proleptic Gregorian. Shift is N days depending on year (2 days at year
0001).
2. *Local Mean Time (LMT)* — JVMs with an LMT segment in their default time
zone (e.g. Asia/Shanghai is LMT
+08:05:43 until 1901-01-01) further offset by the LMT delta (343 seconds for
Shanghai).
The streaming path goes through Postgres logical decoding, which does not
pass through GregorianCalendar, so
the bug is silent until snapshot and streaming records for the same row are
compared.
Additionally, PostgresScanFetchTask calling rs.getObject directly bypasses
the per-type dispatch in
PostgresConnection#getColumnValue (which already handles MONEY / BIT /
NUMERIC / TIME / TIMETZ correctly),
unlike Debezium's own
RelationalSnapshotChangeEventSource#createDataEventsForTable which delegates to
jdbcConnection.rowToArray → getColumnValue.
*Reproduction*
1. Create a Postgres table with a TIMESTAMP column.
2. Insert '0001-01-01 00:00:00'.
3. Start a Flink CDC Postgres source.
4. Compare the snapshot record's MicroTimestamp value with the same row
re-captured via logical decoding (e.g.
by updating it and re-reading).
5. The two micro values differ by 2 days − 343 seconds (on Asia/Shanghai JVM)
or 2 days (on UTC JVM).
*Proposed Fix*
1. In PostgresScanFetchTask, replace rs.getObject(i + 1) with
jdbcConnection.getColumnValue(rs, i + 1, column,
table, databaseSchema) so the snapshot path goes through the same per-type
dispatch already used by Debezium's
snapshot framework.
2. In PostgresConnection#getColumnValue, extend the existing switch
(type.getOid()) to also handle
PgOid.TIMESTAMP / TIMESTAMPTZ / DATE, reading them as java.time.LocalDateTime
/ OffsetDateTime / LocalDate via
rs.getObject(columnIndex, ...class). This bypasses the legacy
GregorianCalendar path while preserving the
existing ±infinity sentinel contract by mapping LocalDateTime.MAX/MIN and
OffsetDateTime.MAX/MIN back to
Timestamp(Long.MAX_VALUE / Long.MIN_VALUE).
*Scope*
This issue is Postgres-only. MySQL's snapshot path uses its own per-type
dispatch
(MySqlSnapshotSplitReadTask#readField) and the affected value range (DATETIME
1000-9999) does not reach the
Julian cutover. Oracle / Db2 / SqlServer use jdbcConnection.rowToArray whose
default getColumnValue is also
rs.getObject, but their column-value handling is tracked separately if
symptoms are observed there.
*Test Plan*
Add a regression test in PostgresScanFetchTaskTest that snapshots a fixture
table containing boundary dates
(0001-01-01, 1582-10-04, 1582-10-15, 1900-12-31, 1901-01-02, and a value with
microsecond precision) and
asserts that the emitted Debezium record fields match the proleptic-UTC
expectation.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)