This is an automated email from the ASF dual-hosted git repository.

JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a0e0ee55cf6 [fix](streaming-job) fix postgres historical-date 
timestamp handling in cdc-client (#63618)
a0e0ee55cf6 is described below

commit a0e0ee55cf6e7f02a73e32aa9b71f97cf9bbba9c
Author: wudi <[email protected]>
AuthorDate: Thu May 28 15:06:20 2026 +0800

    [fix](streaming-job) fix postgres historical-date timestamp handling in 
cdc-client (#63618)
    
    ### What problem does this PR solve?
    
    Problem Summary:
    
    When a Postgres CDC streaming job ingests rows whose timestamp / date
    columns hold historical values (pre-1970 with sub-millisecond precision,
    or pre-1582 / pre-1901 dates), two independent bugs in cdc-client cause
    data corruption or task crash:
    
    1. `DebeziumJsonDeserializer.convertTimestamp` uses signed `/` and `%`
    on negative `micros` / `nanos`, producing a negative `nanoOfMillisecond`
    and tripping Flink `TimestampData`'s `checkArgument(nanoOfMillisecond >=
    0)`. Result: the ingestion task crashes whenever a pre-1970 timestamp
    with sub-millisecond precision flows through (e.g. `1969-12-31
    23:59:59.999123`).
    
    2. The snapshot path reads column values via `rs.getObject()`, which
    routes through PG JDBC's `TimestampUtils` + `GregorianCalendar`. For
    pre-1582 timestamps the Julian/proleptic cutover shifts values by N
    days; for pre-1901 timestamps the JVM time zone's LMT offset shifts
    values by the LMT difference (e.g. ~343s in `Asia/Shanghai`). Result:
    the same PG value (e.g. `0001-01-01 00:00:00`) yields different doris
    values depending on whether the row was synced via snapshot or via
    binlog.
    
    This PR fixes both:
    
    1. Use `Math.floorDiv` / `Math.floorMod` so the millisecond / nanosecond
    split stays valid for negative epoch values.
    2. Dispatch `TIMESTAMP` / `TIMESTAMPTZ` / `DATE` columns through
    `LocalDateTime` / `OffsetDateTime` / `LocalDate` in the snapshot reader,
    bypassing `GregorianCalendar` entirely. Preserve the legacy
    `Timestamp(Long.MAX/MIN_VALUE)` sentinel for `+/-infinity`.
---
 .../postgresql/connection/PostgresConnection.java  | 905 +++++++++++++++++++++
 .../deserialize/DebeziumJsonDeserializer.java      |  46 +-
 .../source/fetch/PostgresScanFetchTask.java        | 391 +++++++++
 .../deserialize/DebeziumJsonDeserializerTest.java  |  76 ++
 .../cdc/test_streaming_postgres_job_all_type.out   |   4 +-
 ...ming_postgres_job_snapshot_historical_dates.out |  45 +
 ...g_postgres_job_snapshot_historical_dates.groovy | 229 ++++++
 7 files changed, 1684 insertions(+), 12 deletions(-)

diff --git 
a/fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java
 
b/fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java
new file mode 100644
index 00000000000..2f6ca5756dd
--- /dev/null
+++ 
b/fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java
@@ -0,0 +1,905 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at 
http://www.apache.org/licenses/LICENSE-2.0
+ */
+
+package io.debezium.connector.postgresql.connection;
+
+import com.zaxxer.hikari.pool.HikariProxyConnection;
+import io.debezium.DebeziumException;
+import io.debezium.annotation.VisibleForTesting;
+import io.debezium.config.Configuration;
+import io.debezium.connector.postgresql.PgOid;
+import io.debezium.connector.postgresql.PostgresConnectorConfig;
+import io.debezium.connector.postgresql.PostgresSchema;
+import io.debezium.connector.postgresql.PostgresType;
+import io.debezium.connector.postgresql.PostgresValueConverter;
+import io.debezium.connector.postgresql.TypeRegistry;
+import io.debezium.connector.postgresql.spi.SlotState;
+import io.debezium.data.SpecialValueDecimal;
+import io.debezium.jdbc.JdbcConfiguration;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.Column;
+import io.debezium.relational.ColumnEditor;
+import io.debezium.relational.Table;
+import io.debezium.relational.TableId;
+import io.debezium.relational.Tables;
+import io.debezium.schema.DatabaseSchema;
+import io.debezium.util.Clock;
+import io.debezium.util.Metronome;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.postgresql.core.BaseConnection;
+import org.postgresql.jdbc.PgConnection;
+import org.postgresql.jdbc.TimestampUtils;
+import org.postgresql.replication.LogSequenceNumber;
+import org.postgresql.util.PGmoney;
+import org.postgresql.util.PSQLState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
+
+/**
+ * Copied from Flink Cdc 3.6.0
+ *
+ * <p>Line 820~854: modified getColumnValue method to fix FLINK-39748.
+ */
+public class PostgresConnection extends JdbcConnection {
+
+    public static final String CONNECTION_STREAMING = "Debezium Streaming";
+    public static final String CONNECTION_SLOT_INFO = "Debezium Slot Info";
+    public static final String CONNECTION_DROP_SLOT = "Debezium Drop Slot";
+    public static final String CONNECTION_VALIDATE_CONNECTION = "Debezium 
Validate Connection";
+    public static final String CONNECTION_HEARTBEAT = "Debezium Heartbeat";
+    public static final String CONNECTION_GENERAL = "Debezium General";
+
+    private static final Pattern FUNCTION_DEFAULT_PATTERN =
+            Pattern.compile("^[(]?[A-Za-z0-9_.]+\\((?:.+(?:, ?.+)*)?\\)");
+    private static final Pattern EXPRESSION_DEFAULT_PATTERN =
+            Pattern.compile("\\(+(?:.+(?:[+ - * / < > = ~ ! @ # % ^ & | ` ?] 
?.+)+)+\\)");
+    private static Logger LOGGER = 
LoggerFactory.getLogger(PostgresConnection.class);
+
+    private static final String URL_PATTERN =
+            "jdbc:postgresql://${"
+                    + JdbcConfiguration.HOSTNAME
+                    + "}:${"
+                    + JdbcConfiguration.PORT
+                    + "}/${"
+                    + JdbcConfiguration.DATABASE
+                    + "}";
+    protected static final ConnectionFactory FACTORY =
+            JdbcConnection.patternBasedFactory(
+                    URL_PATTERN,
+                    org.postgresql.Driver.class.getName(),
+                    PostgresConnection.class.getClassLoader(),
+                    JdbcConfiguration.PORT.withDefault(
+                            
PostgresConnectorConfig.PORT.defaultValueAsString()));
+
+    /**
+     * Obtaining a replication slot may fail if there's a pending transaction. 
We're retrying to get
+     * a slot for 30 min.
+     */
+    private static final int MAX_ATTEMPTS_FOR_OBTAINING_REPLICATION_SLOT = 900;
+
+    private static final Duration 
PAUSE_BETWEEN_REPLICATION_SLOT_RETRIEVAL_ATTEMPTS =
+            Duration.ofSeconds(2);
+
+    private final TypeRegistry typeRegistry;
+    private final PostgresDefaultValueConverter defaultValueConverter;
+
+    /**
+     * Creates a Postgres connection using the supplied configuration. If 
necessary this connection
+     * is able to resolve data type mappings. Such a connection requires a 
{@link
+     * PostgresValueConverter}, and will provide its own {@link TypeRegistry}. 
Usually only one such
+     * connection per connector is needed.
+     *
+     * @param config {@link Configuration} instance, may not be null.
+     * @param valueConverterBuilder supplies a configured {@link 
PostgresValueConverter} for a given
+     *     {@link TypeRegistry}
+     * @param connectionUsage a symbolic name of the connection to be tracked 
in monitoring tools
+     */
+    public PostgresConnection(
+            JdbcConfiguration config,
+            PostgresValueConverterBuilder valueConverterBuilder,
+            String connectionUsage) {
+        this(config, valueConverterBuilder, connectionUsage, FACTORY);
+    }
+
+    /**
+     * Creates a Postgres connection using the supplied configuration. If 
necessary this connection
+     * is able to resolve data type mappings. Such a connection requires a 
{@link
+     * PostgresValueConverter}, and will provide its own {@link TypeRegistry}. 
Usually only one such
+     * connection per connector is needed.
+     *
+     * @param config {@link Configuration} instance, may not be null.
+     * @param valueConverterBuilder supplies a configured {@link 
PostgresValueConverter} for a given
+     *     {@link TypeRegistry}
+     * @param connectionUsage a symbolic name of the connection to be tracked 
in monitoring tools
+     */
+    public PostgresConnection(
+            JdbcConfiguration config,
+            PostgresValueConverterBuilder valueConverterBuilder,
+            String connectionUsage,
+            ConnectionFactory factory) {
+        super(
+                addDefaultSettings(config, connectionUsage),
+                factory,
+                PostgresConnection::validateServerVersion,
+                null,
+                "\"",
+                "\"");
+
+        if (Objects.isNull(valueConverterBuilder)) {
+            this.typeRegistry = null;
+            this.defaultValueConverter = null;
+        } else {
+            this.typeRegistry = new TypeRegistry(this);
+
+            final PostgresValueConverter valueConverter =
+                    valueConverterBuilder.build(this.typeRegistry);
+            this.defaultValueConverter =
+                    new PostgresDefaultValueConverter(valueConverter, 
this.getTimestampUtils());
+        }
+    }
+
+    /**
+     * Create a Postgres connection using the supplied configuration and 
{@link TypeRegistry}
+     *
+     * @param config {@link Configuration} instance, may not be null.
+     * @param typeRegistry an existing/already-primed {@link TypeRegistry} 
instance
+     * @param connectionUsage a symbolic name of the connection to be tracked 
in monitoring tools
+     */
+    public PostgresConnection(
+            PostgresConnectorConfig config, TypeRegistry typeRegistry, String 
connectionUsage) {
+        super(
+                addDefaultSettings(config.getJdbcConfig(), connectionUsage),
+                FACTORY,
+                PostgresConnection::validateServerVersion,
+                null,
+                "\"",
+                "\"");
+        if (Objects.isNull(typeRegistry)) {
+            this.typeRegistry = null;
+            this.defaultValueConverter = null;
+        } else {
+            this.typeRegistry = typeRegistry;
+            final PostgresValueConverter valueConverter =
+                    PostgresValueConverter.of(config, 
this.getDatabaseCharset(), typeRegistry);
+            this.defaultValueConverter =
+                    new PostgresDefaultValueConverter(valueConverter, 
this.getTimestampUtils());
+        }
+    }
+
+    /**
+     * Creates a Postgres connection using the supplied configuration. The 
connector is the regular
+     * one without datatype resolution capabilities.
+     *
+     * @param config {@link Configuration} instance, may not be null.
+     * @param connectionUsage a symbolic name of the connection to be tracked 
in monitoring tools
+     */
+    public PostgresConnection(JdbcConfiguration config, String 
connectionUsage) {
+        this(config, null, connectionUsage);
+    }
+
+    /** Return an unwrapped PgConnection instead of HikariProxyConnection */
+    @Override
+    public synchronized Connection connection() throws SQLException {
+        Connection conn = connection(true);
+        if (conn instanceof HikariProxyConnection) {
+            // assuming HikariCP use org.postgresql.jdbc.PgConnection
+            return conn.unwrap(PgConnection.class);
+        }
+        return conn;
+    }
+
+    static JdbcConfiguration addDefaultSettings(
+            JdbcConfiguration configuration, String connectionUsage) {
+        // we require Postgres 9.4 as the minimum server version since that's 
where logical
+        // replication was first introduced
+        return JdbcConfiguration.adapt(
+                configuration
+                        .edit()
+                        .with("assumeMinServerVersion", "9.4")
+                        .with("ApplicationName", connectionUsage)
+                        .build());
+    }
+
+    /**
+     * Returns a JDBC connection string for the current configuration.
+     *
+     * @return a {@code String} where the variables in {@code urlPattern} are 
replaced with values
+     *     from the configuration
+     */
+    public String connectionString() {
+        return connectionString(URL_PATTERN);
+    }
+
+    /**
+     * Prints out information about the REPLICA IDENTITY status of a table. 
This in turn determines
+     * how much information is available for UPDATE and DELETE operations for 
logical replication.
+     *
+     * @param tableId the identifier of the table
+     * @return the replica identity information; never null
+     * @throws SQLException if there is a problem obtaining the replica 
identity information for the
+     *     given table
+     */
+    public ServerInfo.ReplicaIdentity readReplicaIdentityInfo(TableId tableId) 
throws SQLException {
+        String statement =
+                "SELECT relreplident FROM pg_catalog.pg_class c "
+                        + "LEFT JOIN pg_catalog.pg_namespace n ON 
c.relnamespace=n.oid "
+                        + "WHERE n.nspname=? and c.relname=?";
+        String schema =
+                tableId.schema() != null && tableId.schema().length() > 0
+                        ? tableId.schema()
+                        : "public";
+        StringBuilder replIdentity = new StringBuilder();
+        prepareQuery(
+                statement,
+                stmt -> {
+                    stmt.setString(1, schema);
+                    stmt.setString(2, tableId.table());
+                },
+                rs -> {
+                    if (rs.next()) {
+                        replIdentity.append(rs.getString(1));
+                    } else {
+                        LOGGER.warn(
+                                "Cannot determine REPLICA IDENTITY information 
for table '{}'",
+                                tableId);
+                    }
+                });
+        return ServerInfo.ReplicaIdentity.parseFromDB(replIdentity.toString());
+    }
+
+    /**
+     * Returns the current state of the replication slot
+     *
+     * @param slotName the name of the slot
+     * @param pluginName the name of the plugin used for the desired slot
+     * @return the {@link SlotState} or null, if no slot state is found
+     * @throws SQLException
+     */
+    public SlotState getReplicationSlotState(String slotName, String 
pluginName)
+            throws SQLException {
+        ServerInfo.ReplicationSlot slot;
+        try {
+            slot = readReplicationSlotInfo(slotName, pluginName);
+            if (slot.equals(ServerInfo.ReplicationSlot.INVALID)) {
+                return null;
+            } else {
+                return slot.asSlotState();
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new ConnectException(
+                    "Interrupted while waiting for valid replication slot 
info", e);
+        }
+    }
+
+    /**
+     * Fetches the state of a replication stage given a slot name and plugin 
name
+     *
+     * @param slotName the name of the slot
+     * @param pluginName the name of the plugin used for the desired slot
+     * @return the {@link ServerInfo.ReplicationSlot} object or a {@link
+     *     ServerInfo.ReplicationSlot#INVALID} if the slot is not valid
+     * @throws SQLException is thrown by the underlying JDBC
+     */
+    private ServerInfo.ReplicationSlot fetchReplicationSlotInfo(String 
slotName, String pluginName)
+            throws SQLException {
+        final String database = database();
+        final ServerInfo.ReplicationSlot slot =
+                queryForSlot(
+                        slotName,
+                        database,
+                        pluginName,
+                        rs -> {
+                            if (rs.next()) {
+                                boolean active = rs.getBoolean("active");
+                                final Lsn confirmedFlushedLsn =
+                                        parseConfirmedFlushLsn(slotName, 
pluginName, database, rs);
+                                if (confirmedFlushedLsn == null) {
+                                    return null;
+                                }
+                                Lsn restartLsn =
+                                        parseRestartLsn(slotName, pluginName, 
database, rs);
+                                if (restartLsn == null) {
+                                    return null;
+                                }
+                                final Long xmin = rs.getLong("catalog_xmin");
+                                return new ServerInfo.ReplicationSlot(
+                                        active, confirmedFlushedLsn, 
restartLsn, xmin);
+                            } else {
+                                LOGGER.debug(
+                                        "No replication slot '{}' is present 
for plugin '{}' and database '{}'",
+                                        slotName,
+                                        pluginName,
+                                        database);
+                                return ServerInfo.ReplicationSlot.INVALID;
+                            }
+                        });
+        return slot;
+    }
+
+    /**
+     * Fetches a replication slot, repeating the query until either the slot 
is created or until the
+     * max number of attempts has been reached
+     *
+     * <p>To fetch the slot without the retries, use the {@link
+     * PostgresConnection#fetchReplicationSlotInfo} call
+     *
+     * @param slotName the slot name
+     * @param pluginName the name of the plugin
+     * @return the {@link ServerInfo.ReplicationSlot} object or a {@link
+     *     ServerInfo.ReplicationSlot#INVALID} if the slot is not valid
+     * @throws SQLException is thrown by the underyling jdbc driver
+     * @throws InterruptedException is thrown if we don't return an answer 
within the set number of
+     *     retries
+     */
+    @VisibleForTesting
+    ServerInfo.ReplicationSlot readReplicationSlotInfo(String slotName, String 
pluginName)
+            throws SQLException, InterruptedException {
+        final String database = database();
+        final Metronome metronome =
+                
Metronome.parker(PAUSE_BETWEEN_REPLICATION_SLOT_RETRIEVAL_ATTEMPTS, 
Clock.SYSTEM);
+
+        for (int attempt = 1; attempt <= 
MAX_ATTEMPTS_FOR_OBTAINING_REPLICATION_SLOT; attempt++) {
+            final ServerInfo.ReplicationSlot slot = 
fetchReplicationSlotInfo(slotName, pluginName);
+            if (slot != null) {
+                LOGGER.info("Obtained valid replication slot {}", slot);
+                return slot;
+            }
+            LOGGER.warn(
+                    "Cannot obtain valid replication slot '{}' for plugin '{}' 
and database '{}' [during attempt {} out of {}, concurrent tx probably blocks 
taking snapshot.",
+                    slotName,
+                    pluginName,
+                    database,
+                    attempt,
+                    MAX_ATTEMPTS_FOR_OBTAINING_REPLICATION_SLOT);
+            metronome.pause();
+        }
+
+        throw new ConnectException(
+                "Unable to obtain valid replication slot. "
+                        + "Make sure there are no long-running transactions 
running in parallel as they may hinder the allocation of the replication slot 
when starting this connector");
+    }
+
+    protected ServerInfo.ReplicationSlot queryForSlot(
+            String slotName,
+            String database,
+            String pluginName,
+            ResultSetMapper<ServerInfo.ReplicationSlot> map)
+            throws SQLException {
+        return prepareQueryAndMap(
+                "select * from pg_replication_slots where slot_name = ? and 
database = ? and plugin = ?",
+                statement -> {
+                    statement.setString(1, slotName);
+                    statement.setString(2, database);
+                    statement.setString(3, pluginName);
+                },
+                map);
+    }
+
+    /**
+     * Obtains the LSN to resume streaming from. On PG 9.5 there is no 
confirmed_flushed_lsn yet, so
+     * restart_lsn will be read instead. This may result in more records to be 
re-read after a
+     * restart.
+     */
+    private Lsn parseConfirmedFlushLsn(
+            String slotName, String pluginName, String database, ResultSet rs) 
{
+        Lsn confirmedFlushedLsn = null;
+
+        try {
+            confirmedFlushedLsn =
+                    tryParseLsn(slotName, pluginName, database, rs, 
"confirmed_flush_lsn");
+        } catch (SQLException e) {
+            LOGGER.info("unable to find confirmed_flushed_lsn, falling back to 
restart_lsn");
+            try {
+                confirmedFlushedLsn =
+                        tryParseLsn(slotName, pluginName, database, rs, 
"restart_lsn");
+            } catch (SQLException e2) {
+                throw new ConnectException(
+                        "Neither confirmed_flush_lsn nor restart_lsn could be 
found");
+            }
+        }
+
+        return confirmedFlushedLsn;
+    }
+
+    private Lsn parseRestartLsn(String slotName, String pluginName, String 
database, ResultSet rs) {
+        Lsn restartLsn = null;
+        try {
+            restartLsn = tryParseLsn(slotName, pluginName, database, rs, 
"restart_lsn");
+        } catch (SQLException e) {
+            throw new ConnectException("restart_lsn could be found");
+        }
+
+        return restartLsn;
+    }
+
+    private Lsn tryParseLsn(
+            String slotName, String pluginName, String database, ResultSet rs, 
String column)
+            throws ConnectException, SQLException {
+        Lsn lsn = null;
+
+        String lsnStr = rs.getString(column);
+        if (lsnStr == null) {
+            return null;
+        }
+        try {
+            lsn = Lsn.valueOf(lsnStr);
+        } catch (Exception e) {
+            throw new ConnectException(
+                    "Value "
+                            + column
+                            + " in the pg_replication_slots table for slot = '"
+                            + slotName
+                            + "', plugin = '"
+                            + pluginName
+                            + "', database = '"
+                            + database
+                            + "' is not valid. This is an abnormal situation 
and the database status should be checked.");
+        }
+        if (!lsn.isValid()) {
+            throw new ConnectException("Invalid LSN returned from database");
+        }
+        return lsn;
+    }
+
+    /**
+     * Drops a replication slot that was created on the DB
+     *
+     * @param slotName the name of the replication slot, may not be null
+     * @return {@code true} if the slot was dropped, {@code false} otherwise
+     */
+    public boolean dropReplicationSlot(String slotName) {
+        final int ATTEMPTS = 3;
+        for (int i = 0; i < ATTEMPTS; i++) {
+            try {
+                execute("select pg_drop_replication_slot('" + slotName + "')");
+                return true;
+            } catch (SQLException e) {
+                // slot is active
+                if 
(PSQLState.OBJECT_IN_USE.getState().equals(e.getSQLState())) {
+                    if (i < ATTEMPTS - 1) {
+                        LOGGER.debug(
+                                "Cannot drop replication slot '{}' because 
it's still in use",
+                                slotName);
+                    } else {
+                        LOGGER.warn(
+                                "Cannot drop replication slot '{}' because 
it's still in use",
+                                slotName);
+                        return false;
+                    }
+                } else if 
(PSQLState.UNDEFINED_OBJECT.getState().equals(e.getSQLState())) {
+                    LOGGER.debug("Replication slot {} has already been 
dropped", slotName);
+                    return false;
+                } else {
+                    LOGGER.error("Unexpected error while attempting to drop 
replication slot", e);
+                    return false;
+                }
+            }
+            try {
+                Metronome.parker(Duration.ofSeconds(1), 
Clock.system()).pause();
+            } catch (InterruptedException e) {
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Drops the debezium publication that was created.
+     *
+     * @param publicationName the publication name, may not be null
+     * @return {@code true} if the publication was dropped, {@code false} 
otherwise
+     */
+    public boolean dropPublication(String publicationName) {
+        try {
+            LOGGER.debug("Dropping publication '{}'", publicationName);
+            execute("DROP PUBLICATION " + publicationName);
+            return true;
+        } catch (SQLException e) {
+            if (PSQLState.UNDEFINED_OBJECT.getState().equals(e.getSQLState())) 
{
+                LOGGER.debug("Publication {} has already been dropped", 
publicationName);
+            } else {
+                LOGGER.error("Unexpected error while attempting to drop 
publication", e);
+            }
+            return false;
+        }
+    }
+
+    @Override
+    public synchronized void close() {
+        try {
+            super.close();
+        } catch (SQLException e) {
+            LOGGER.error("Unexpected error while closing Postgres connection", 
e);
+        }
+    }
+
+    /**
+     * Returns the PG id of the current active transaction
+     *
+     * @return a PG transaction identifier, or null if no tx is active
+     * @throws SQLException if anything fails.
+     */
+    public Long currentTransactionId() throws SQLException {
+        AtomicLong txId = new AtomicLong(0);
+        query(
+                "select (case pg_is_in_recovery() when 't' then 0 else 
txid_current() end) AS pg_current_txid",
+                rs -> {
+                    if (rs.next()) {
+                        txId.compareAndSet(0, rs.getLong(1));
+                    }
+                });
+        long value = txId.get();
+        return value > 0 ? value : null;
+    }
+
+    /**
+     * Returns the current position in the server tx log.
+     *
+     * @return a long value, never negative
+     * @throws SQLException if anything unexpected fails.
+     */
+    public long currentXLogLocation() throws SQLException {
+        AtomicLong result = new AtomicLong(0);
+        int majorVersion = 
connection().getMetaData().getDatabaseMajorVersion();
+        query(
+                majorVersion >= 10
+                        ? "select (case pg_is_in_recovery() when 't' then 
pg_last_wal_receive_lsn() else pg_current_wal_lsn() end) AS pg_current_wal_lsn"
+                        : "select * from pg_current_xlog_location()",
+                rs -> {
+                    if (!rs.next()) {
+                        throw new IllegalStateException(
+                                "there should always be a valid xlog 
position");
+                    }
+                    result.compareAndSet(0, 
LogSequenceNumber.valueOf(rs.getString(1)).asLong());
+                });
+        return result.get();
+    }
+
+    /**
+     * Returns information about the PG server to which this instance is 
connected.
+     *
+     * @return a {@link ServerInfo} instance, never {@code null}
+     * @throws SQLException if anything fails
+     */
+    public ServerInfo serverInfo() throws SQLException {
+        ServerInfo serverInfo = new ServerInfo();
+        query(
+                "SELECT version(), current_user, current_database()",
+                rs -> {
+                    if (rs.next()) {
+                        serverInfo
+                                .withServer(rs.getString(1))
+                                .withUsername(rs.getString(2))
+                                .withDatabase(rs.getString(3));
+                    }
+                });
+        String username = serverInfo.username();
+        if (username != null) {
+            query(
+                    "SELECT oid, rolname, rolsuper, rolinherit, rolcreaterole, 
rolcreatedb, rolcanlogin, rolreplication FROM pg_roles "
+                            + "WHERE pg_has_role('"
+                            + username
+                            + "', oid, 'member')",
+                    rs -> {
+                        while (rs.next()) {
+                            String roleInfo =
+                                    "superuser: "
+                                            + rs.getBoolean(3)
+                                            + ", replication: "
+                                            + rs.getBoolean(8)
+                                            + ", inherit: "
+                                            + rs.getBoolean(4)
+                                            + ", create role: "
+                                            + rs.getBoolean(5)
+                                            + ", create db: "
+                                            + rs.getBoolean(6)
+                                            + ", can log in: "
+                                            + rs.getBoolean(7);
+                            String roleName = rs.getString(2);
+                            serverInfo.addRole(roleName, roleInfo);
+                        }
+                    });
+        }
+        return serverInfo;
+    }
+
+    public Charset getDatabaseCharset() {
+        try {
+            return Charset.forName(((BaseConnection) 
connection()).getEncoding().name());
+        } catch (SQLException e) {
+            throw new DebeziumException("Couldn't obtain encoding for database 
" + database(), e);
+        }
+    }
+
+    public TimestampUtils getTimestampUtils() {
+        try {
+            return ((PgConnection) this.connection()).getTimestampUtils();
+        } catch (SQLException e) {
+            throw new DebeziumException(
+                    "Couldn't get timestamp utils from underlying connection", 
e);
+        }
+    }
+
+    private static void validateServerVersion(Statement statement) throws 
SQLException {
+        DatabaseMetaData metaData = statement.getConnection().getMetaData();
+        int majorVersion = metaData.getDatabaseMajorVersion();
+        int minorVersion = metaData.getDatabaseMinorVersion();
+        if (majorVersion < 9 || (majorVersion == 9 && minorVersion < 4)) {
+            throw new SQLException("Cannot connect to a version of Postgres 
lower than 9.4");
+        }
+    }
+
+    @Override
+    public String quotedColumnIdString(String columnName) {
+        if (columnName.contains("\"")) {
+            columnName = columnName.replaceAll("\"", "\"\"");
+        }
+
+        return super.quotedColumnIdString(columnName);
+    }
+
+    @Override
+    protected int resolveNativeType(String typeName) {
+        return getTypeRegistry().get(typeName).getRootType().getOid();
+    }
+
+    @Override
+    protected int resolveJdbcType(int metadataJdbcType, int nativeType) {
+        // Special care needs to be taken for columns that use user-defined 
domain type data types
+        // where resolution of the column's JDBC type needs to be that of the 
root type instead of
+        // the actual column to properly influence schema building and value 
conversion.
+        return getTypeRegistry().get(nativeType).getRootType().getJdbcId();
+    }
+
+    @Override
+    protected Optional<ColumnEditor> readTableColumn(
+            ResultSet columnMetadata, TableId tableId, Tables.ColumnNameFilter 
columnFilter)
+            throws SQLException {
+        return doReadTableColumn(columnMetadata, tableId, columnFilter);
+    }
+
+    public Optional<Column> readColumnForDecoder(
+            ResultSet columnMetadata, TableId tableId, Tables.ColumnNameFilter 
columnNameFilter)
+            throws SQLException {
+        return doReadTableColumn(columnMetadata, tableId, columnNameFilter)
+                .map(ColumnEditor::create);
+    }
+
+    private Optional<ColumnEditor> doReadTableColumn(
+            ResultSet columnMetadata, TableId tableId, Tables.ColumnNameFilter 
columnFilter)
+            throws SQLException {
+        // FLINK-38965: Filter out columns from other tables that might be 
returned due to
+        // PostgreSQL LIKE wildcard matching. The underscore '_' matches any 
single character,
+        // and '%' matches any sequence of characters. For example:
+        // - When querying 'user_sink', the pattern may also match 'userbsink' 
(due to '_')
+        // - When querying 'user%data' (where % is literal), it may match 
'user_test_data' (due to
+        // '%')
+        final String resultTableName = columnMetadata.getString(3);
+        if (!tableId.table().equals(resultTableName)) {
+            return Optional.empty();
+        }
+
+        final String columnName = columnMetadata.getString(4);
+        if (columnFilter == null
+                || columnFilter.matches(
+                        tableId.catalog(), tableId.schema(), tableId.table(), 
columnName)) {
+            final ColumnEditor column = Column.editor().name(columnName);
+            column.type(columnMetadata.getString(6));
+
+            // first source the length/scale from the column metadata provided 
by the driver
+            // this may be overridden below if the column type is a 
user-defined domain type
+            column.length(columnMetadata.getInt(7));
+            if (columnMetadata.getObject(9) != null) {
+                column.scale(columnMetadata.getInt(9));
+            }
+
+            column.optional(isNullable(columnMetadata.getInt(11)));
+            column.position(columnMetadata.getInt(17));
+            
column.autoIncremented("YES".equalsIgnoreCase(columnMetadata.getString(23)));
+
+            String autogenerated = null;
+            try {
+                autogenerated = columnMetadata.getString(24);
+            } catch (SQLException e) {
+                // ignore, some drivers don't have this index - e.g. Postgres
+            }
+            column.generated("YES".equalsIgnoreCase(autogenerated));
+
+            // Lookup the column type from the TypeRegistry
+            // For all types, we need to set the Native and Jdbc types by 
using the root-type
+            final PostgresType nativeType = 
getTypeRegistry().get(column.typeName());
+            column.nativeType(nativeType.getRootType().getOid());
+            column.jdbcType(nativeType.getRootType().getJdbcId());
+
+            // For domain types, the postgres driver is unable to traverse a 
nested unbounded
+            // hierarchy of types and report the right length/scale of a given 
type. We use
+            // the TypeRegistry to accomplish this since it is capable of 
traversing the type
+            // hierarchy upward to resolve length/scale regardless of 
hierarchy depth.
+            if (TypeRegistry.DOMAIN_TYPE == nativeType.getJdbcId()) {
+                column.length(nativeType.getDefaultLength());
+                column.scale(nativeType.getDefaultScale());
+            }
+
+            final String defaultValueExpression = columnMetadata.getString(13);
+            if (defaultValueExpression != null
+                    && 
getDefaultValueConverter().supportConversion(column.typeName())) {
+                column.defaultValueExpression(defaultValueExpression);
+            }
+
+            return Optional.of(column);
+        }
+
+        return Optional.empty();
+    }
+
+    public PostgresDefaultValueConverter getDefaultValueConverter() {
+        Objects.requireNonNull(
+                defaultValueConverter, "Connection does not provide default 
value converter");
+        return defaultValueConverter;
+    }
+
+    public TypeRegistry getTypeRegistry() {
+        Objects.requireNonNull(typeRegistry, "Connection does not provide type 
registry");
+        return typeRegistry;
+    }
+
+    @Override
+    public <T extends DatabaseSchema<TableId>> Object getColumnValue(
+            ResultSet rs, int columnIndex, Column column, Table table, T 
schema)
+            throws SQLException {
+        try {
+            final ResultSetMetaData metaData = rs.getMetaData();
+            final String columnTypeName = 
metaData.getColumnTypeName(columnIndex);
+            final PostgresType type =
+                    ((PostgresSchema) 
schema).getTypeRegistry().get(columnTypeName);
+
+            LOGGER.trace("Type of incoming data is: {}", type.getOid());
+            LOGGER.trace("ColumnTypeName is: {}", columnTypeName);
+            LOGGER.trace("Type is: {}", type);
+
+            if (type.isArrayType()) {
+                return rs.getArray(columnIndex);
+            }
+
+            switch (type.getOid()) {
+                case PgOid.MONEY:
+                    // TODO author=Horia Chiorean date=14/11/2016 
description=workaround for
+                    // https://github.com/pgjdbc/pgjdbc/issues/100
+                    final String sMoney = rs.getString(columnIndex);
+                    if (sMoney == null) {
+                        return sMoney;
+                    }
+                    if (sMoney.startsWith("-")) {
+                        // PGmoney expects negative values to be provided in 
the format of
+                        // "($XXXXX.YY)"
+                        final String negativeMoney = "(" + sMoney.substring(1) 
+ ")";
+                        return new PGmoney(negativeMoney).val;
+                    }
+                    return new PGmoney(sMoney).val;
+                case PgOid.BIT:
+                    return rs.getString(columnIndex);
+                case PgOid.NUMERIC:
+                    final String s = rs.getString(columnIndex);
+                    if (s == null) {
+                        return s;
+                    }
+
+                    Optional<SpecialValueDecimal> value = 
PostgresValueConverter.toSpecialValue(s);
+                    return value.isPresent()
+                            ? value.get()
+                            : new 
SpecialValueDecimal(rs.getBigDecimal(columnIndex));
+                case PgOid.TIME:
+                // To handle time 24:00:00 supported by TIME columns, read the 
column as a
+                // string.
+                case PgOid.TIMETZ:
+                    // In order to guarantee that we resolve TIMETZ columns 
with proper microsecond
+                    // precision,
+                    // read the column as a string instead and then re-parse 
inside the converter.
+                    return rs.getString(columnIndex);
+                case PgOid.TIMESTAMP:
+                    {
+                        // LocalDateTime bypasses GregorianCalendar's 
Julian/Gregorian cutover
+                        // (1582-10-15), which shifts pre-cutover values by N 
days
+                        // (e.g. 0001-01-01 by 2 days).
+                        LocalDateTime ldt = rs.getObject(columnIndex, 
LocalDateTime.class);
+                        if (ldt == null) {
+                            return null;
+                        }
+                        // PG +/-infinity surfaces as 
Timestamp(Long.MAX/MIN_VALUE) via the legacy
+                        // rs.getObject() path; preserve that contract for 
downstream converters.
+                        if (ldt == LocalDateTime.MAX) {
+                            return new Timestamp(Long.MAX_VALUE);
+                        }
+                        if (ldt == LocalDateTime.MIN) {
+                            return new Timestamp(Long.MIN_VALUE);
+                        }
+                        return ldt;
+                    }
+                case PgOid.TIMESTAMPTZ:
+                    {
+                        OffsetDateTime odt = rs.getObject(columnIndex, 
OffsetDateTime.class);
+                        if (odt == null) {
+                            return null;
+                        }
+                        if (odt == OffsetDateTime.MAX) {
+                            return new Timestamp(Long.MAX_VALUE);
+                        }
+                        if (odt == OffsetDateTime.MIN) {
+                            return new Timestamp(Long.MIN_VALUE);
+                        }
+                        return odt;
+                    }
+                case PgOid.DATE:
+                    return rs.getObject(columnIndex, LocalDate.class);
+                default:
+                    Object x = rs.getObject(columnIndex);
+                    if (x != null) {
+                        LOGGER.trace(
+                                "rs getobject returns class: {}; rs getObject 
value is: {}",
+                                x.getClass(),
+                                x);
+                    }
+                    return x;
+            }
+        } catch (SQLException e) {
+            // not a known type
+            return super.getColumnValue(rs, columnIndex, column, table, 
schema);
+        }
+    }
+
+    @Override
+    protected String[] supportedTableTypes() {
+        return new String[] {"VIEW", "MATERIALIZED VIEW", "TABLE", 
"PARTITIONED TABLE"};
+    }
+
+    @Override
+    protected boolean isTableType(String tableType) {
+        return "TABLE".equals(tableType) || "PARTITIONED 
TABLE".equals(tableType);
+    }
+
+    @Override
+    protected boolean isTableUniqueIndexIncluded(String indexName, String 
columnName) {
+        if (columnName != null) {
+            return !FUNCTION_DEFAULT_PATTERN.matcher(columnName).matches()
+                    && 
!EXPRESSION_DEFAULT_PATTERN.matcher(columnName).matches();
+        }
+        return false;
+    }
+
+    /**
+     * Retrieves all {@code TableId}s in a given database catalog, including 
partitioned tables.
+     *
+     * @param catalogName the catalog/database name
+     * @return set of all table ids for existing table objects
+     * @throws SQLException if a database exception occurred
+     */
+    public Set<TableId> getAllTableIds(String catalogName) throws SQLException 
{
+        return readTableNames(catalogName, null, null, new String[] {"TABLE", 
"PARTITIONED TABLE"});
+    }
+
+    @FunctionalInterface
+    public interface PostgresValueConverterBuilder {
+        PostgresValueConverter build(TypeRegistry registry);
+    }
+}
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
index 7876597660e..6881dd70d40 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
@@ -21,6 +21,7 @@ import org.apache.doris.cdcclient.utils.ConfigUtil;
 import org.apache.doris.job.cdc.DataSourceConfigKeys;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils;
 import org.apache.flink.cdc.debezium.utils.TemporalConversions;
 import org.apache.flink.table.data.TimestampData;
@@ -161,6 +162,7 @@ public class DebeziumJsonDeserializer
                             if (!excludeColumns.contains(field.name())) {
                                 Object valueConverted =
                                         convert(
+                                                field.name(),
                                                 field.schema(),
                                                 
after.getWithoutDefault(field.name()));
                                 record.put(field.name(), valueConverted);
@@ -185,6 +187,7 @@ public class DebeziumJsonDeserializer
                             if (!excludeColumns.contains(field.name())) {
                                 Object valueConverted =
                                         convert(
+                                                field.name(),
                                                 field.schema(),
                                                 
before.getWithoutDefault(field.name()));
                                 record.put(field.name(), valueConverted);
@@ -194,7 +197,20 @@ public class DebeziumJsonDeserializer
         return objectMapper.writeValueAsString(record);
     }
 
-    private Object convert(Schema fieldSchema, Object dbzObj) {
+    private Object convert(String fieldName, Schema fieldSchema, Object 
dbzObj) {
+        try {
+            return convertInternal(fieldSchema, dbzObj);
+        } catch (Exception e) {
+            String msg =
+                    String.format(
+                            "Failed to convert column '%s' value=%s: %s",
+                            fieldName, dbzObj, ExceptionUtils.getMessage(e));
+            LOG.error(msg, e);
+            throw new RuntimeException(msg);
+        }
+    }
+
+    private Object convertInternal(Schema fieldSchema, Object dbzObj) {
         if (dbzObj == null) {
             return null;
         }
@@ -307,15 +323,25 @@ public class DebeziumJsonDeserializer
                 case Timestamp.SCHEMA_NAME:
                     return TimestampData.fromEpochMillis((Long) 
dbzObj).toTimestamp().toString();
                 case MicroTimestamp.SCHEMA_NAME:
-                    long micro = (long) dbzObj;
-                    return TimestampData.fromEpochMillis(micro / 1000, (int) 
(micro % 1000 * 1000))
-                            .toTimestamp()
-                            .toString();
+                    {
+                        // floorDiv/floorMod keep nanoOfMillisecond 
non-negative for pre-1970
+                        // values.
+                        long micro = (long) dbzObj;
+                        long millis = Math.floorDiv(micro, 1000L);
+                        int nanos = (int) Math.floorMod(micro, 1000L) * 1000;
+                        return TimestampData.fromEpochMillis(millis, nanos)
+                                .toTimestamp()
+                                .toString();
+                    }
                 case NanoTimestamp.SCHEMA_NAME:
-                    long nano = (long) dbzObj;
-                    return TimestampData.fromEpochMillis(nano / 1000_000, 
(int) (nano % 1000_000))
-                            .toTimestamp()
-                            .toString();
+                    {
+                        long nano = (long) dbzObj;
+                        long millis = Math.floorDiv(nano, 1_000_000L);
+                        int nanos = (int) Math.floorMod(nano, 1_000_000L);
+                        return TimestampData.fromEpochMillis(millis, nanos)
+                                .toTimestamp()
+                                .toString();
+                    }
             }
         }
         LocalDateTime localDateTime = 
TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone);
@@ -364,7 +390,7 @@ public class DebeziumJsonDeserializer
             Schema elementSchema = fieldSchema.valueSchema();
             List<Object> result = new ArrayList<>();
             for (Object element : (List<?>) dbzObj) {
-                result.add(element == null ? null : convert(elementSchema, 
element));
+                result.add(element == null ? null : 
convertInternal(elementSchema, element));
             }
             return result;
         }
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java
new file mode 100644
index 00000000000..6c26cf4e74a
--- /dev/null
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source.fetch;
+
+import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
+import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
+import 
org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask;
+import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
+import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
+import 
org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetUtils;
+import 
org.apache.flink.cdc.connectors.postgres.source.utils.PostgresQueryUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import io.debezium.connector.postgresql.PostgresConnectorConfig;
+import io.debezium.connector.postgresql.PostgresEventDispatcher;
+import io.debezium.connector.postgresql.PostgresOffsetContext;
+import io.debezium.connector.postgresql.PostgresPartition;
+import io.debezium.connector.postgresql.PostgresSchema;
+import io.debezium.connector.postgresql.connection.PostgresConnection;
+import 
io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
+import io.debezium.connector.postgresql.connection.ReplicationConnection;
+import io.debezium.connector.postgresql.spi.SlotState;
+import io.debezium.pipeline.EventDispatcher;
+import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
+import io.debezium.pipeline.source.spi.SnapshotProgressListener;
+import io.debezium.pipeline.spi.SnapshotResult;
+import io.debezium.relational.Column;
+import io.debezium.relational.RelationalSnapshotChangeEventSource;
+import io.debezium.relational.SnapshotChangeRecordEmitter;
+import io.debezium.relational.Table;
+import io.debezium.relational.TableId;
+import io.debezium.util.Clock;
+import io.debezium.util.ColumnUtils;
+import io.debezium.util.Strings;
+import io.debezium.util.Threads;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static 
io.debezium.connector.postgresql.PostgresObjectUtils.waitForReplicationSlotReady;
+import static io.debezium.connector.postgresql.Utils.refreshSchema;
+
+/**
+ * Copied from Flink Cdc 3.6.0
+ *
+ * <p>Line 333~336: modified createDataEventsForTable to fix FLINK-39748.
+ */
+public class PostgresScanFetchTask extends AbstractScanFetchTask {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PostgresScanFetchTask.class);
+
+    public PostgresScanFetchTask(SnapshotSplit split) {
+        super(split);
+    }
+
+    @Override
+    public void execute(Context context) throws Exception {
+
+        PostgresSourceFetchTaskContext ctx = (PostgresSourceFetchTaskContext) 
context;
+        PostgresSourceConfig sourceConfig = (PostgresSourceConfig) 
context.getSourceConfig();
+        try {
+            // create slot here,  because a slot can only read wal-log after 
its own creation.
+            // if skip backfill, no need to create slot here
+            maybeCreateSlotForBackFillReadTask(
+                    ctx.getConnection(),
+                    ctx.getReplicationConnection(),
+                    sourceConfig.getSlotNameForBackfillTask(),
+                    ctx.getPluginName(),
+                    sourceConfig.isSkipSnapshotBackfill());
+            super.execute(context);
+        } finally {
+            // remove slot after snapshot slit finish
+            maybeDropSlotForBackFillReadTask(
+                    (PostgresReplicationConnection) 
ctx.getReplicationConnection(),
+                    sourceConfig.isSkipSnapshotBackfill());
+        }
+    }
+
+    @Override
+    protected void executeDataSnapshot(Context context) throws Exception {
+        PostgresSourceFetchTaskContext ctx = (PostgresSourceFetchTaskContext) 
context;
+
+        PostgresSnapshotSplitReadTask snapshotSplitReadTask =
+                new PostgresSnapshotSplitReadTask(
+                        ctx.getConnection(),
+                        ctx.getDbzConnectorConfig(),
+                        ctx.getDatabaseSchema(),
+                        ctx.getOffsetContext(),
+                        ctx.getEventDispatcher(),
+                        ctx.getSnapshotChangeEventSourceMetrics(),
+                        snapshotSplit);
+
+        StoppableChangeEventSourceContext changeEventSourceContext =
+                new StoppableChangeEventSourceContext();
+        SnapshotResult<PostgresOffsetContext> snapshotResult =
+                snapshotSplitReadTask.execute(
+                        changeEventSourceContext, ctx.getPartition(), 
ctx.getOffsetContext());
+
+        if (!snapshotResult.isCompletedOrSkipped()) {
+            taskRunning = false;
+            throw new IllegalStateException(
+                    String.format("Read snapshot for postgres split %s fail", 
snapshotResult));
+        }
+    }
+
+    @Override
+    protected void executeBackfillTask(Context context, StreamSplit 
backfillStreamSplit)
+            throws Exception {
+        PostgresSourceFetchTaskContext ctx = (PostgresSourceFetchTaskContext) 
context;
+
+        final PostgresOffsetContext.Loader loader =
+                new PostgresOffsetContext.Loader(ctx.getDbzConnectorConfig());
+        final PostgresOffsetContext postgresOffsetContext =
+                PostgresOffsetUtils.getPostgresOffsetContext(
+                        loader, backfillStreamSplit.getStartingOffset());
+
+        final PostgresStreamFetchTask.StreamSplitReadTask backfillReadTask =
+                new PostgresStreamFetchTask.StreamSplitReadTask(
+                        ctx.getDbzConnectorConfig(),
+                        ctx.getSnapShotter(),
+                        ctx.getConnection(),
+                        ctx.getEventDispatcher(),
+                        ctx.getWaterMarkDispatcher(),
+                        ctx.getErrorHandler(),
+                        ctx.getTaskContext().getClock(),
+                        ctx.getDatabaseSchema(),
+                        ctx.getTaskContext(),
+                        ctx.getReplicationConnection(),
+                        backfillStreamSplit);
+        LOG.info(
+                "Execute backfillReadTask for split {} with slot name {}",
+                snapshotSplit,
+                ((PostgresSourceConfig) 
ctx.getSourceConfig()).getSlotNameForBackfillTask());
+        backfillReadTask.execute(
+                new StoppableChangeEventSourceContext(), ctx.getPartition(), 
postgresOffsetContext);
+    }
+
+    /**
+     * Create a slot before snapshot reading so that the slot can track the 
WAL log during the
+     * snapshot reading phase.
+     */
+    private void maybeCreateSlotForBackFillReadTask(
+            PostgresConnection jdbcConnection,
+            ReplicationConnection replicationConnection,
+            String slotName,
+            String pluginName,
+            boolean skipSnapshotBackfill) {
+        // if skip backfill, no need to create slot here
+        if (skipSnapshotBackfill) {
+            return;
+        }
+
+        try {
+            SlotState slotInfo = null;
+            try {
+                slotInfo = jdbcConnection.getReplicationSlotState(slotName, 
pluginName);
+            } catch (SQLException e) {
+                LOG.info("Unable to load info of replication slot, will try to 
create the slot");
+            }
+            if (slotInfo == null) {
+                try {
+                    replicationConnection.createReplicationSlot().orElse(null);
+                } catch (SQLException ex) {
+                    String message = "Creation of replication slot failed";
+                    if (ex.getMessage().contains("already exists")) {
+                        message +=
+                                "; when setting up multiple connectors for the 
same database host, please make sure to use a distinct replication slot name 
for each.";
+                    }
+                    throw new FlinkRuntimeException(message, ex);
+                }
+            }
+            waitForReplicationSlotReady(30, jdbcConnection, slotName, 
pluginName);
+        } catch (Throwable t) {
+            throw new FlinkRuntimeException(t);
+        }
+    }
+
+    /** Drop slot for backfill task and close replication connection. */
+    private void maybeDropSlotForBackFillReadTask(
+            PostgresReplicationConnection replicationConnection, boolean 
skipSnapshotBackfill) {
+        // if skip backfill, no need to create slot here
+        if (skipSnapshotBackfill) {
+            return;
+        }
+
+        try {
+            replicationConnection.close(true);
+        } catch (Throwable t) {
+            LOG.error("Unexpected error while dropping replication slot", t);
+            throw new FlinkRuntimeException(t);
+        }
+    }
+
+    /** A SnapshotChangeEventSource implementation for Postgres to read 
snapshot split. */
+    public static class PostgresSnapshotSplitReadTask
+            extends AbstractSnapshotChangeEventSource<PostgresPartition, 
PostgresOffsetContext> {
+        private static final Logger LOG =
+                LoggerFactory.getLogger(PostgresSnapshotSplitReadTask.class);
+
+        private final PostgresConnection jdbcConnection;
+        private final PostgresConnectorConfig connectorConfig;
+        private final PostgresEventDispatcher<TableId> eventDispatcher;
+        private final SnapshotSplit snapshotSplit;
+        private final PostgresOffsetContext offsetContext;
+        private final PostgresSchema databaseSchema;
+        private final SnapshotProgressListener<PostgresPartition> 
snapshotProgressListener;
+        private final Clock clock;
+
+        public PostgresSnapshotSplitReadTask(
+                PostgresConnection jdbcConnection,
+                PostgresConnectorConfig connectorConfig,
+                PostgresSchema databaseSchema,
+                PostgresOffsetContext previousOffset,
+                PostgresEventDispatcher<TableId> eventDispatcher,
+                SnapshotProgressListener snapshotProgressListener,
+                SnapshotSplit snapshotSplit) {
+            super(connectorConfig, snapshotProgressListener);
+            this.jdbcConnection = jdbcConnection;
+            this.connectorConfig = connectorConfig;
+            this.snapshotProgressListener = snapshotProgressListener;
+            this.databaseSchema = databaseSchema;
+            this.eventDispatcher = eventDispatcher;
+            this.snapshotSplit = snapshotSplit;
+            this.offsetContext = previousOffset;
+            this.clock = Clock.SYSTEM;
+        }
+
+        @Override
+        protected SnapshotResult<PostgresOffsetContext> doExecute(
+                ChangeEventSourceContext context,
+                PostgresOffsetContext previousOffset,
+                SnapshotContext<PostgresPartition, PostgresOffsetContext> 
snapshotContext,
+                SnapshottingTask snapshottingTask)
+                throws Exception {
+            final PostgresSnapshotContext ctx = (PostgresSnapshotContext) 
snapshotContext;
+            ctx.offset = offsetContext;
+
+            refreshSchema(databaseSchema, jdbcConnection, true);
+            createDataEvents(ctx, snapshotSplit.getTableId());
+
+            return SnapshotResult.completed(ctx.offset);
+        }
+
+        private void createDataEvents(PostgresSnapshotContext snapshotContext, 
TableId tableId)
+                throws InterruptedException {
+            EventDispatcher.SnapshotReceiver<PostgresPartition> 
snapshotReceiver =
+                    eventDispatcher.getSnapshotChangeEventReceiver();
+            LOG.info("Snapshotting table {}", tableId);
+            createDataEventsForTable(
+                    snapshotContext,
+                    snapshotReceiver,
+                    Objects.requireNonNull(databaseSchema.tableFor(tableId)));
+            snapshotReceiver.completeSnapshot();
+        }
+
+        /** Dispatches the data change events for the records of a single 
table. */
+        private void createDataEventsForTable(
+                PostgresSnapshotContext snapshotContext,
+                EventDispatcher.SnapshotReceiver<PostgresPartition> 
snapshotReceiver,
+                Table table)
+                throws InterruptedException {
+
+            long exportStart = clock.currentTimeInMillis();
+            LOG.info(
+                    "Exporting data from split '{}' of table {}",
+                    snapshotSplit.splitId(),
+                    table.id());
+
+            List<String> uuidFields =
+                    snapshotSplit.getSplitKeyType().getFieldNames().stream()
+                            .filter(field -> 
table.columnWithName(field).typeName().equals("uuid"))
+                            .collect(Collectors.toList());
+
+            List<String> columnNames =
+                    table.columns().stream()
+                            .map(column -> 
jdbcConnection.quotedColumnIdString(column.name()))
+                            .collect(Collectors.toList());
+            final String selectSql =
+                    PostgresQueryUtils.buildSplitScanQuery(
+                            snapshotSplit.getTableId(),
+                            snapshotSplit.getSplitKeyType(),
+                            snapshotSplit.getSplitStart() == null,
+                            snapshotSplit.getSplitEnd() == null,
+                            columnNames,
+                            uuidFields);
+            LOG.debug(
+                    "For split '{}' of table {} using select statement: '{}'",
+                    snapshotSplit.splitId(),
+                    table.id(),
+                    selectSql);
+
+            try (PreparedStatement selectStatement =
+                            PostgresQueryUtils.readTableSplitDataStatement(
+                                    jdbcConnection,
+                                    selectSql,
+                                    snapshotSplit.getSplitStart() == null,
+                                    snapshotSplit.getSplitEnd() == null,
+                                    snapshotSplit.getSplitStart(),
+                                    snapshotSplit.getSplitEnd(),
+                                    
snapshotSplit.getSplitKeyType().getFieldCount(),
+                                    connectorConfig.getSnapshotFetchSize());
+                    ResultSet rs = selectStatement.executeQuery()) {
+
+                ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, 
table);
+                long rows = 0;
+                Threads.Timer logTimer = getTableScanLogTimer();
+
+                while (rs.next()) {
+                    rows++;
+                    final Object[] row = new 
Object[columnArray.getGreatestColumnPosition()];
+                    for (int i = 0; i < columnArray.getColumns().length; i++) {
+                        Column col = columnArray.getColumns()[i];
+                        row[col.position() - 1] =
+                                jdbcConnection.getColumnValue(
+                                        rs, i + 1, col, table, databaseSchema);
+                    }
+                    if (logTimer.expired()) {
+                        long stop = clock.currentTimeInMillis();
+                        LOG.info(
+                                "Exported {} records for split '{}' after {}",
+                                rows,
+                                snapshotSplit.splitId(),
+                                Strings.duration(stop - exportStart));
+                        snapshotProgressListener.rowsScanned(
+                                snapshotContext.partition, table.id(), rows);
+                        logTimer = getTableScanLogTimer();
+                    }
+                    snapshotContext.offset.event(table.id(), 
clock.currentTime());
+                    SnapshotChangeRecordEmitter<PostgresPartition> emitter =
+                            new SnapshotChangeRecordEmitter<>(
+                                    snapshotContext.partition, 
snapshotContext.offset, row, clock);
+                    eventDispatcher.dispatchSnapshotEvent(
+                            snapshotContext.partition, table.id(), emitter, 
snapshotReceiver);
+                }
+                LOG.info(
+                        "Finished exporting {} records for split '{}', total 
duration '{}'",
+                        rows,
+                        snapshotSplit.splitId(),
+                        Strings.duration(clock.currentTimeInMillis() - 
exportStart));
+            } catch (SQLException e) {
+                throw new FlinkRuntimeException(
+                        "Snapshotting of table " + table.id() + " failed", e);
+            }
+        }
+
+        private Threads.Timer getTableScanLogTimer() {
+            return Threads.timer(clock, LOG_INTERVAL);
+        }
+
+        @Override
+        protected SnapshottingTask getSnapshottingTask(
+                PostgresPartition partition, PostgresOffsetContext 
previousOffset) {
+            return new SnapshottingTask(false, true);
+        }
+
+        @Override
+        protected PostgresSnapshotContext prepare(PostgresPartition partition) 
throws Exception {
+            return new PostgresSnapshotContext(partition);
+        }
+
+        private static class PostgresSnapshotContext
+                extends 
RelationalSnapshotChangeEventSource.RelationalSnapshotContext<
+                        PostgresPartition, PostgresOffsetContext> {
+
+            public PostgresSnapshotContext(PostgresPartition partition) throws 
SQLException {
+                super(partition, "");
+            }
+        }
+    }
+}
diff --git 
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializerTest.java
 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializerTest.java
new file mode 100644
index 00000000000..ac4d7b58226
--- /dev/null
+++ 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializerTest.java
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.deserialize;
+
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Method;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import io.debezium.time.MicroTimestamp;
+import io.debezium.time.NanoTimestamp;
+
+/** Unit tests for {@link DebeziumJsonDeserializer}. */
+class DebeziumJsonDeserializerTest {
+
+    private final DebeziumJsonDeserializer deserializer = new 
DebeziumJsonDeserializer();
+
+    // ─── convertTimestamp 
─────────────────────────────────────────────────────
+
+    @Test
+    void microTimestamp_negativeSubMillisecond_doesNotThrow() {
+        // micros = -877 ⇒ 1969-12-31 23:59:59.999123. Signed `/` `%` produced 
a
+        // negative nanoOfMillisecond and tripped TimestampData's >= 0 check.
+        Object out = invokeConvertTimestamp(MicroTimestamp.SCHEMA_NAME, -877L);
+        assertEquals("1969-12-31 23:59:59.999123", out.toString());
+    }
+
+    @Test
+    void microTimestamp_positive_unchanged() {
+        Object out = invokeConvertTimestamp(MicroTimestamp.SCHEMA_NAME, 
1_234_567L);
+        assertEquals("1970-01-01 00:00:01.234567", out.toString());
+    }
+
+    @Test
+    void microTimestamp_negativeIntegerMillis_unchanged() {
+        // micros = -1000 ⇒ 1969-12-31 23:59:59.999, negative but no 
sub-millisecond
+        // (the old code happened to produce the right result here; protect 
that path).
+        Object out = invokeConvertTimestamp(MicroTimestamp.SCHEMA_NAME, 
-1000L);
+        assertEquals("1969-12-31 23:59:59.999", out.toString());
+    }
+
+    @Test
+    void nanoTimestamp_negativeSubMillisecond_doesNotThrow() {
+        // nanos = -877_000 ⇒ 1969-12-31 23:59:59.999123.
+        Object out = invokeConvertTimestamp(NanoTimestamp.SCHEMA_NAME, 
-877_000L);
+        assertEquals("1969-12-31 23:59:59.999123", out.toString());
+    }
+
+    private Object invokeConvertTimestamp(String typeName, Object dbzObj) {
+        try {
+            Method m =
+                    DebeziumJsonDeserializer.class.getDeclaredMethod(
+                            "convertTimestamp", String.class, Object.class);
+            m.setAccessible(true);
+            return m.invoke(deserializer, typeName, dbzObj);
+        } catch (ReflectiveOperationException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.out
index abdc4038774..feeb6757d2d 100644
--- 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.out
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.out
@@ -34,9 +34,9 @@ xml_col       text    Yes     false   \N      NONE
 hstore_col     text    Yes     false   \N      NONE
 
 -- !select_all_types_null --
-1      1       100     1000    1.23    4.56    12345.678901    char            
varchar text value      true    2024-01-01      12:00   12:00:00Z       
2024-01-01T12:00        2024-01-01T04:00        P0Y0M1DT0H0M0S  3q2+7w==        
11111111-2222-3333-4444-555555555555    {"a":1} {"b": 2}        192.168.1.1     
192.168.0.0/24  08:00:2b:01:02:03       qg==    Cg==    [1, 2, 3]       ["a", 
"b", "c"] {"coordinates":[1,2],"type":"Point","srid":0}   
08:00:2b:01:02:03:04:05 <root><item>1</item></root>     {"a":"1","b":"2"}
+1      1       100     1000    1.23    4.56    12345.678901    char            
varchar text value      true    2024-01-01      12:00   04:00:00Z       
2024-01-01T12:00        2024-01-01T04:00        P0Y0M1DT0H0M0S  3q2+7w==        
11111111-2222-3333-4444-555555555555    {"a":1} {"b": 2}        192.168.1.1     
192.168.0.0/24  08:00:2b:01:02:03       qg==    Cg==    [1, 2, 3]       ["a", 
"b", "c"] {"coordinates":[1,2],"type":"Point","srid":0}   
08:00:2b:01:02:03:04:05 <root><item>1</item></root>     {"a":"1","b":"2"}
 
 -- !select_all_types_null2 --
-1      1       100     1000    1.23    4.56    12345.678901    char            
varchar text value      true    2024-01-01      12:00   12:00:00Z       
2024-01-01T12:00        2024-01-01T04:00        P0Y0M1DT0H0M0S  3q2+7w==        
11111111-2222-3333-4444-555555555555    {"a":1} {"b": 2}        192.168.1.1     
192.168.0.0/24  08:00:2b:01:02:03       qg==    Cg==    [1, 2, 3]       ["a", 
"b", "c"] {"coordinates":[1,2],"type":"Point","srid":0}   
08:00:2b:01:02:03:04:05 <root><item>1</item></root>     {"a":"1","b":"2"}
+1      1       100     1000    1.23    4.56    12345.678901    char            
varchar text value      true    2024-01-01      12:00   04:00:00Z       
2024-01-01T12:00        2024-01-01T04:00        P0Y0M1DT0H0M0S  3q2+7w==        
11111111-2222-3333-4444-555555555555    {"a":1} {"b": 2}        192.168.1.1     
192.168.0.0/24  08:00:2b:01:02:03       qg==    Cg==    [1, 2, 3]       ["a", 
"b", "c"] {"coordinates":[1,2],"type":"Point","srid":0}   
08:00:2b:01:02:03:04:05 <root><item>1</item></root>     {"a":"1","b":"2"}
 2      2       200     2000    7.89    0.12    99999.000001    char2           
varchar2        another text    false   2025-01-01      23:59:59        
23:59:59Z       2025-01-01T23:59:59     2025-01-01T23:59:59     P0Y0M0DT2H0M0S  
3q2+7w==        11111111-2222-3333-4444-555555555556    {"x":10}        {"y": 
20}       10.0.0.1        10.0.0.0/16     08:00:2b:aa:bb:cc       8A==    Dw==  
  [10, 20]        ["x", "y"]      {"coordinates":[3,4],"type":"Point","srid":0} 
  08:00:2b:aa:bb:cc:dd:ee <root><item>2</item></root>     {"x":"10","y":"20"}
 
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_historical_dates.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_historical_dates.out
new file mode 100644
index 00000000000..0df4f0ca737
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_historical_dates.out
@@ -0,0 +1,45 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !snapshot --
+1      0001-01-01T00:00:00.000123      0001-01-01T00:00:00.000123      
0001-01-01
+2      0500-06-15T10:00        0500-06-15T15:00        0500-06-15
+3      1582-10-04T12:34:56     1582-10-04T12:34:56     1582-10-04
+4      1582-10-15T00:00        1582-10-15T00:00        1582-10-15
+5      1800-07-20T03:30        1800-07-19T22:00        1800-07-20
+6      1900-12-31T23:59:59.999 1900-12-31T23:59:59.999 1900-12-31
+7      1901-01-02T00:00        1901-01-01T15:00        1901-01-02
+8      1969-12-31T23:59:59.999123      1969-12-31T23:59:59.999123      
1969-12-31
+9      \N      \N      \N
+
+-- !binlog_insert --
+11     0001-01-01T00:00:00.000123      0001-01-01T00:00:00.000123      
0001-01-01
+12     0500-06-15T10:00        0500-06-15T15:00        0500-06-15
+13     1582-10-04T12:34:56     1582-10-04T12:34:56     1582-10-04
+14     1582-10-15T00:00        1582-10-15T00:00        1582-10-15
+15     1800-07-20T03:30        1800-07-19T22:00        1800-07-20
+16     1900-12-31T23:59:59.999 1900-12-31T23:59:59.999 1900-12-31
+17     1901-01-02T00:00        1901-01-01T15:00        1901-01-02
+18     1969-12-31T23:59:59.999123      1969-12-31T23:59:59.999123      
1969-12-31
+19     \N      \N      \N
+
+-- !binlog_update --
+1      1582-10-15T12:00:00.000123      1582-10-15T12:00:00.000123      
1900-12-31
+
+-- !binlog_after_delete --
+1      1582-10-15T12:00:00.000123      1582-10-15T12:00:00.000123      
1900-12-31
+3      1582-10-04T12:34:56     1582-10-04T12:34:56     1582-10-04
+4      1582-10-15T00:00        1582-10-15T00:00        1582-10-15
+5      1800-07-20T03:30        1800-07-19T22:00        1800-07-20
+6      1900-12-31T23:59:59.999 1900-12-31T23:59:59.999 1900-12-31
+7      1901-01-02T00:00        1901-01-01T15:00        1901-01-02
+8      1969-12-31T23:59:59.999123      1969-12-31T23:59:59.999123      
1969-12-31
+9      \N      \N      \N
+11     0001-01-01T00:00:00.000123      0001-01-01T00:00:00.000123      
0001-01-01
+12     0500-06-15T10:00        0500-06-15T15:00        0500-06-15
+13     1582-10-04T12:34:56     1582-10-04T12:34:56     1582-10-04
+14     1582-10-15T00:00        1582-10-15T00:00        1582-10-15
+15     1800-07-20T03:30        1800-07-19T22:00        1800-07-20
+16     1900-12-31T23:59:59.999 1900-12-31T23:59:59.999 1900-12-31
+17     1901-01-02T00:00        1901-01-01T15:00        1901-01-02
+18     1969-12-31T23:59:59.999123      1969-12-31T23:59:59.999123      
1969-12-31
+19     \N      \N      \N
+
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_historical_dates.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_historical_dates.groovy
new file mode 100644
index 00000000000..db8b748abbd
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_historical_dates.groovy
@@ -0,0 +1,229 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+/**
+ * Verify snapshot and binlog paths produce identical values for historical
+ * dates that previously drifted in the snapshot path (PG JDBC's
+ * GregorianCalendar + JVM-zone LMT).
+ *
+ * Phases:
+ *   1. snapshot batch (ids 1..N) inserted in postgres before the job starts;
+ *      after sync, assert values in doris match the original input.
+ *   2. binlog INSERT batch (ids 11..10+N) with the same boundary values;
+ *      assert each binlog row equals its snapshot counterpart cell-for-cell.
+ *   3. binlog UPDATE: rewrite id=1's columns to a different boundary value
+ *      and assert the streamed change lands.
+ *   4. binlog DELETE: remove id=2 and assert it disappears in doris.
+ */
+suite("test_streaming_postgres_job_snapshot_historical_dates",
+        "p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+    def jobName = "test_streaming_postgres_job_snapshot_historical_dates_name"
+    def currentDb = (sql "select database()")[0][0]
+    def table1 = "streaming_pg_historical_dates"
+    def pgDB = "postgres"
+    def pgSchema = "cdc_test"
+    def pgUser = "postgres"
+    def pgPassword = "123456"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${table1} force"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        return
+    }
+
+    String pg_port = context.config.otherConfigs.get("pg_14_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    String s3_endpoint = getS3Endpoint()
+    String bucket = getS3BucketName()
+    String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+    // Boundary rows. Picked to exercise:
+    //   - Julian/Gregorian cutover (0001-01-01 ⇒ 2-day drift, 1582-10-04/15 
boundary)
+    //   - negative tz offset before the cutover (0500-06-15 with -05)
+    //   - half-hour tz offset pre-1900 (1800-07-20 with +05:30)
+    //   - LMT offset for pre-1901 values in zones like Asia/Shanghai 
(1900-12-31 vs 1901-01-02)
+    //   - sub-millisecond precision on a pre-1970 value (negative micros, bug 
B)
+    //   - NULL across all three columns
+    def boundaryRows = [
+            [ts: "0001-01-01 00:00:00.000123", tstz: "0001-01-01 
00:00:00.000123+00", date: "0001-01-01"],
+            [ts: "0500-06-15 10:00:00.000000", tstz: "0500-06-15 10:00:00-05", 
       date: "0500-06-15"],
+            [ts: "1582-10-04 12:34:56.000000", tstz: "1582-10-04 12:34:56+00", 
       date: "1582-10-04"],
+            [ts: "1582-10-15 00:00:00.000000", tstz: "1582-10-15 00:00:00+00", 
       date: "1582-10-15"],
+            [ts: "1800-07-20 03:30:00.000000", tstz: "1800-07-20 
03:30:00+05:30",     date: "1800-07-20"],
+            [ts: "1900-12-31 23:59:59.999000", tstz: "1900-12-31 
23:59:59.999+00",    date: "1900-12-31"],
+            [ts: "1901-01-02 00:00:00.000000", tstz: "1901-01-02 00:00:00+09", 
       date: "1901-01-02"],
+            [ts: "1969-12-31 23:59:59.999123", tstz: "1969-12-31 
23:59:59.999123+00", date: "1969-12-31"],
+            [ts: null,                         tstz: null,                     
       date: null],
+    ]
+    def rowsPerBatch = boundaryRows.size()
+    def snapshotIdBase = 1
+    def binlogIdBase = 11
+
+    def buildInsertValues = { int idBase ->
+        boundaryRows.withIndex().collect { row, i ->
+            def id = idBase + i
+            def tsLit   = row.ts   == null ? "NULL" : "TIMESTAMP '${row.ts}'"
+            def tstzLit = row.tstz == null ? "NULL" : "TIMESTAMPTZ 
'${row.tstz}'"
+            def dateLit = row.date == null ? "NULL" : "DATE '${row.date}'"
+            "(${id}, ${tsLit}, ${tstzLit}, ${dateLit})"
+        }.join(",\n        ")
+    }
+
+    def dumpJobOnFailure = {
+        log.info("show job: " + sql("""select * from jobs("type"="insert") 
where Name='${jobName}'"""))
+        log.info("show task: " + sql("""select * from tasks("type"="insert") 
where JobName='${jobName}'"""))
+    }
+
+    // ── postgres setup + snapshot batch 
───────────────────────────────────────
+    connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+        sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+        sql """
+        CREATE TABLE ${pgDB}.${pgSchema}.${table1} (
+            id            bigint PRIMARY KEY,
+            ts_col        timestamp(6),
+            tstz_col      timestamp(6) with time zone,
+            date_col      date
+        )
+        """
+        sql """
+        INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES
+        ${buildInsertValues(snapshotIdBase)}
+        """
+    }
+
+    // ── start streaming job (offset=initial ⇒ snapshot + binlog) 
──────────────
+    sql """CREATE JOB ${jobName}
+            ON STREAMING
+            FROM POSTGRES (
+                "jdbc_url" = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}?timezone=UTC",
+                "driver_url" = "${driver_url}",
+                "driver_class" = "org.postgresql.Driver",
+                "user" = "${pgUser}",
+                "password" = "${pgPassword}",
+                "database" = "${pgDB}",
+                "schema" = "${pgSchema}",
+                "include_tables" = "${table1}",
+                "offset" = "initial"
+            )
+            TO DATABASE ${currentDb} (
+              "table.create.properties.replication_num" = "1"
+            )
+        """
+
+    // ── phase 1: snapshot 
─────────────────────────────────────────────────────
+    try {
+        Awaitility.await().atMost(300, SECONDS).pollInterval(2, SECONDS).until 
{
+            def cnt = sql """SELECT count(1) FROM ${currentDb}.${table1}"""
+            log.info("snapshot row count: ${cnt}")
+            cnt.size() == 1 && (cnt.get(0).get(0) as long) == (long) 
rowsPerBatch
+        }
+    } catch (Exception ex) {
+        dumpJobOnFailure()
+        throw ex
+    }
+
+    qt_snapshot """SELECT id, ts_col, tstz_col, date_col FROM 
${currentDb}.${table1} ORDER BY id"""
+
+    // ── phase 2: binlog INSERT 
────────────────────────────────────────────────
+    connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+        sql """
+        INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES
+        ${buildInsertValues(binlogIdBase)}
+        """
+    }
+    try {
+        Awaitility.await().atMost(300, SECONDS).pollInterval(2, SECONDS).until 
{
+            def cnt = sql """SELECT count(1) FROM ${currentDb}.${table1}"""
+            cnt.size() == 1 && (cnt.get(0).get(0) as long) == (long) 
(rowsPerBatch * 2)
+        }
+    } catch (Exception ex) {
+        dumpJobOnFailure()
+        throw ex
+    }
+
+    qt_binlog_insert """SELECT id, ts_col, tstz_col, date_col FROM 
${currentDb}.${table1}
+                        WHERE id >= ${binlogIdBase} ORDER BY id"""
+
+    // Parity: snapshot row i must equal binlog row i+10 cell-for-cell.
+    def snapshotRows = sql """SELECT ts_col, tstz_col, date_col FROM 
${currentDb}.${table1}
+                              WHERE id <  ${binlogIdBase} ORDER BY id"""
+    def binlogRows   = sql """SELECT ts_col, tstz_col, date_col FROM 
${currentDb}.${table1}
+                              WHERE id >= ${binlogIdBase} ORDER BY id"""
+    assert snapshotRows.size() == rowsPerBatch
+    assert binlogRows.size()   == rowsPerBatch
+    for (int i = 0; i < rowsPerBatch; i++) {
+        def s = snapshotRows.get(i)
+        def b = binlogRows.get(i)
+        assert s.get(0)?.toString() == b.get(0)?.toString() :
+                "ts_col mismatch at row ${i}: snapshot=${s.get(0)} 
binlog=${b.get(0)}"
+        assert s.get(1)?.toString() == b.get(1)?.toString() :
+                "tstz_col mismatch at row ${i}: snapshot=${s.get(1)} 
binlog=${b.get(1)}"
+        assert s.get(2)?.toString() == b.get(2)?.toString() :
+                "date_col mismatch at row ${i}: snapshot=${s.get(2)} 
binlog=${b.get(2)}"
+    }
+
+    // ── phase 3: binlog UPDATE 
────────────────────────────────────────────────
+    // Rewrite id=1 (originally 0001-01-01) to a different boundary value via 
UPDATE.
+    def updatedTs   = "1582-10-15 12:00:00.000123"
+    def updatedTstz = "1582-10-15 12:00:00.000123+00"
+    def updatedDate = "1900-12-31"
+    connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+        sql """
+        UPDATE ${pgDB}.${pgSchema}.${table1}
+        SET ts_col   = TIMESTAMP '${updatedTs}',
+            tstz_col = TIMESTAMPTZ '${updatedTstz}',
+            date_col = DATE '${updatedDate}'
+        WHERE id = 1
+        """
+    }
+    try {
+        Awaitility.await().atMost(300, SECONDS).pollInterval(2, SECONDS).until 
{
+            def row = sql """SELECT cast(date_col as string) FROM 
${currentDb}.${table1} WHERE id = 1"""
+            row.size() == 1 && row.get(0).get(0) == updatedDate
+        }
+    } catch (Exception ex) {
+        dumpJobOnFailure()
+        throw ex
+    }
+    qt_binlog_update """SELECT id, ts_col, tstz_col, date_col FROM 
${currentDb}.${table1}
+                        WHERE id = 1"""
+
+    // ── phase 4: binlog DELETE 
────────────────────────────────────────────────
+    connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+        sql """DELETE FROM ${pgDB}.${pgSchema}.${table1} WHERE id = 2"""
+    }
+    try {
+        Awaitility.await().atMost(300, SECONDS).pollInterval(2, SECONDS).until 
{
+            def cnt = sql """SELECT count(1) FROM ${currentDb}.${table1} WHERE 
id = 2"""
+            cnt.size() == 1 && (cnt.get(0).get(0) as long) == 0L
+        }
+    } catch (Exception ex) {
+        dumpJobOnFailure()
+        throw ex
+    }
+    qt_binlog_after_delete """SELECT id, ts_col, tstz_col, date_col FROM 
${currentDb}.${table1}
+                              ORDER BY id"""
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to