[ 
https://issues.apache.org/jira/browse/FLINK-39748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-39748:
-----------------------------------
    Labels: pull-request-available  (was: )

> Postgres CDC snapshot produces wrong values for TIMESTAMP / TIMESTAMPTZ / 
> DATE columns with historical dates
> ------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39748
>                 URL: https://issues.apache.org/jira/browse/FLINK-39748
>             Project: Flink
>          Issue Type: Improvement
>          Components: Flink CDC
>    Affects Versions: cdc-3.6.0
>            Reporter: Di Wu
>            Priority: Major
>              Labels: pull-request-available
>
> *Description*
>   
>   *Problem*
>  
>   When the Postgres CDC source snapshots rows containing TIMESTAMP, 
> TIMESTAMPTZ, or DATE column values dated
>   before the Julian/Gregorian cutover (1582-10-15), the emitted Debezium 
> records carry numerically wrong
>   timestamps. The same row, when later re-emitted via the streaming (logical 
> decoding) path, carries the correct
>   value — so snapshot and streaming records for the same primary key 
> disagree, breaking idempotent UPSERT
>   downstream and causing phantom updates.
>  
>   Example: snapshotting a row whose TIMESTAMP column holds '0001-01-01 
> 00:00:00' produces a Debezium
>   MicroTimestamp of -62135769257000000 instead of the proleptic-UTC-correct 
> -62135596800000000 — a 2-day +
>   343-second drift on a JVM configured with Asia/Shanghai.
>   
>   *Root* *Cause*
>  
>   PostgresScanFetchTask#createDataEventsForTable reads column values via a 
> bare rs.getObject(i + 1). For temporal
>    columns, the PG JDBC driver returns java.sql.Timestamp / java.sql.Date 
> constructed through
>   java.util.GregorianCalendar (default Julian/Gregorian cutover at 
> 1582-10-15) using the JVM default time zone.
>   This introduces two independent drifts:                                    
>  
>   1. *Julian/Proleptic-Gregorian cutover* — values before 1582-10-15 are 
> interpreted as Julian, while PostgreSQL
>   stores proleptic Gregorian. Shift is N days depending on year (2 days at 
> year 0001).
>   2. *Local Mean Time (LMT)* — JVMs with an LMT segment in their default time 
> zone (e.g. Asia/Shanghai is LMT
>   +08:05:43 until 1901-01-01) further offset by the LMT delta (343 seconds 
> for Shanghai).
>  
>   The streaming path goes through Postgres logical decoding, which does not 
> pass through GregorianCalendar, so
>   the bug is silent until snapshot and streaming records for the same row are 
> compared.
>  
>   Additionally, PostgresScanFetchTask calling rs.getObject directly bypasses 
> the per-type dispatch in
>   PostgresConnection#getColumnValue (which already handles MONEY / BIT / 
> NUMERIC / TIME / TIMETZ correctly),
>   unlike Debezium's own 
> RelationalSnapshotChangeEventSource#createDataEventsForTable which delegates 
> to
>   jdbcConnection.rowToArray → getColumnValue.                                
>  
>   *Reproduction*
>  
>   1. Create a Postgres table with a TIMESTAMP column.
>   2. Insert '0001-01-01 00:00:00'.
>   3. Start a Flink CDC Postgres source.
>   4. Compare the snapshot record's MicroTimestamp value with the same row 
> re-captured via logical decoding (e.g.
>   by updating it and re-reading).
>   5. The two micro values differ by 2 days − 343 seconds (on Asia/Shanghai 
> JVM) or 2 days (on UTC JVM).
>  
>   *Proposed Fix*
>  
>   1. In PostgresScanFetchTask, replace rs.getObject(i + 1) with 
> jdbcConnection.getColumnValue(rs, i + 1, column, 
>   table, databaseSchema) so the snapshot path goes through the same per-type 
> dispatch already used by Debezium's
>   snapshot framework.
>   2. In PostgresConnection#getColumnValue, extend the existing switch 
> (type.getOid()) to also handle
>   PgOid.TIMESTAMP / TIMESTAMPTZ / DATE, reading them as 
> java.time.LocalDateTime / OffsetDateTime / LocalDate via
>   rs.getObject(columnIndex, ...class). This bypasses the legacy 
> GregorianCalendar path while preserving the
>   existing ±infinity sentinel contract by mapping LocalDateTime.MAX/MIN and 
> OffsetDateTime.MAX/MIN back to
>   Timestamp(Long.MAX_VALUE / Long.MIN_VALUE).                                
>  
>   *Scope*
>  
>   This issue is Postgres-only. MySQL's snapshot path uses its own per-type 
> dispatch
>   (MySqlSnapshotSplitReadTask#readField) and the affected value range 
> (DATETIME 1000-9999) does not reach the
>   Julian cutover. Oracle / Db2 / SqlServer use jdbcConnection.rowToArray 
> whose default getColumnValue is also
>   rs.getObject, but their column-value handling is tracked separately if 
> symptoms are observed there.
>  
>   *Test Plan*
>  
>   Add a regression test in PostgresScanFetchTaskTest that snapshots a fixture 
> table containing boundary dates
>   (0001-01-01, 1582-10-04, 1582-10-15, 1900-12-31, 1901-01-02, and a value 
> with microsecond precision) and
>   asserts that the emitted Debezium record fields match the proleptic-UTC 
> expectation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to