This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 36a2032acc [INLONG-10339][Sort] Fix PostgreSQL AuditOperator not 
serialized (#10344)
36a2032acc is described below

commit 36a2032acc80d80ff044a7ab811e12fef68bd4d8
Author: XiaoYou201 <58425449+xiaoyou...@users.noreply.github.com>
AuthorDate: Wed Jun 5 11:37:10 2024 +0800

    [INLONG-10339][Sort] Fix PostgreSQL AuditOperator not serialized (#10344)
---
 .../inlong/sort/postgre/DebeziumChangeFetcher.java | 322 ++++++++++++
 .../postgre/DebeziumDeserializationSchema.java     |  42 ++
 .../sort/postgre/DebeziumSourceFunction.java       | 579 +++++++++++++++++++++
 .../inlong/sort/postgre/PostgreSQLSource.java      | 184 +++++++
 .../inlong/sort/postgre/PostgreSQLTableSource.java |   9 +-
 .../postgre/RowDataDebeziumDeserializeSchema.java  |  27 +-
 licenses/inlong-sort-connectors/LICENSE            |  12 +-
 7 files changed, 1157 insertions(+), 18 deletions(-)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumChangeFetcher.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumChangeFetcher.java
new file mode 100644
index 0000000000..ef9251f79d
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumChangeFetcher.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.postgre;
+
+import com.ververica.cdc.debezium.internal.DebeziumOffset;
+import com.ververica.cdc.debezium.internal.DebeziumOffsetSerializer;
+import com.ververica.cdc.debezium.internal.Handover;
+import io.debezium.connector.SnapshotRecord;
+import io.debezium.data.Envelope;
+import io.debezium.engine.ChangeEvent;
+import io.debezium.engine.DebeziumEngine;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.util.Collector;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+/**
+ * A Handler that convert change messages from {@link DebeziumEngine} to data 
in Flink. Considering
+ * Debezium in different mode has different strategies to hold the lock, e.g. 
snapshot, the handler
+ * also needs different strategy. In snapshot phase, the handler needs to hold 
the lock until the
+ * snapshot finishes. But in non-snapshot phase, the handler only needs to 
hold the lock when
+ * emitting the records.
+ *
+ * @param <T> The type of elements produced by the handler.
+ * <p>
+ * Copy from com.ververica:flink-connector-postgres-cdc-2.3.0
+ */
+@Internal
+public class DebeziumChangeFetcher<T> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DebeziumChangeFetcher.class);
+
+    private final SourceFunction.SourceContext<T> sourceContext;
+
+    /**
+     * The lock that guarantees that record emission and state updates are 
atomic, from the view of
+     * taking a checkpoint.
+     */
+    private final Object checkpointLock;
+
+    /** The schema to convert from Debezium's messages into Flink's objects. */
+    private final DebeziumDeserializationSchema<T> deserialization;
+
+    /** A collector to emit records in batch (bundle). */
+    private final DebeziumCollector debeziumCollector;
+
+    private final DebeziumOffset debeziumOffset;
+
+    private final DebeziumOffsetSerializer stateSerializer;
+
+    private final String heartbeatTopicPrefix;
+
+    private boolean isInDbSnapshotPhase;
+
+    private final Handover handover;
+
+    private volatile boolean isRunning = true;
+
+    // 
---------------------------------------------------------------------------------------
+    // Metrics
+    // 
---------------------------------------------------------------------------------------
+
+    /** Timestamp of change event. If the event is a snapshot event, the 
timestamp is 0L. */
+    private volatile long messageTimestamp = 0L;
+
+    /** The last record processing time. */
+    private volatile long processTime = 0L;
+
+    /**
+     * currentFetchEventTimeLag = FetchTime - messageTimestamp, where the 
FetchTime is the time the
+     * record fetched into the source operator.
+     */
+    private volatile long fetchDelay = 0L;
+
+    /**
+     * emitDelay = EmitTime - messageTimestamp, where the EmitTime is the time 
the record leaves the
+     * source operator.
+     */
+    private volatile long emitDelay = 0L;
+
+    // ------------------------------------------------------------------------
+
+    public DebeziumChangeFetcher(
+            SourceFunction.SourceContext<T> sourceContext,
+            DebeziumDeserializationSchema<T> deserialization,
+            boolean isInDbSnapshotPhase,
+            String heartbeatTopicPrefix,
+            Handover handover) {
+        this.sourceContext = sourceContext;
+        this.checkpointLock = sourceContext.getCheckpointLock();
+        this.deserialization = deserialization;
+        this.isInDbSnapshotPhase = isInDbSnapshotPhase;
+        this.heartbeatTopicPrefix = heartbeatTopicPrefix;
+        this.debeziumCollector = new DebeziumCollector();
+        this.debeziumOffset = new DebeziumOffset();
+        this.stateSerializer = DebeziumOffsetSerializer.INSTANCE;
+        this.handover = handover;
+    }
+
+    /**
+     * Take a snapshot of the Debezium handler state.
+     *
+     * <p>Important: This method must be called under the checkpoint lock.
+     */
+    public byte[] snapshotCurrentState() throws Exception {
+        // this method assumes that the checkpoint lock is held
+        assert Thread.holdsLock(checkpointLock);
+        if (debeziumOffset.sourceOffset == null || 
debeziumOffset.sourcePartition == null) {
+            return null;
+        }
+
+        return stateSerializer.serialize(debeziumOffset);
+    }
+
+    /**
+     * Process change messages from the {@link Handover} and collect the 
processed messages by
+     * {@link Collector}.
+     */
+    public void runFetchLoop() throws Exception {
+        try {
+            // begin snapshot database phase
+            if (isInDbSnapshotPhase) {
+                List<ChangeEvent<SourceRecord, SourceRecord>> events = 
handover.pollNext();
+
+                synchronized (checkpointLock) {
+                    LOG.info(
+                            "Database snapshot phase can't perform checkpoint, 
acquired Checkpoint lock.");
+                    handleBatch(events);
+                    while (isRunning && isInDbSnapshotPhase) {
+                        handleBatch(handover.pollNext());
+                    }
+                }
+                LOG.info("Received record from streaming binlog phase, 
released checkpoint lock.");
+            }
+
+            // begin streaming binlog phase
+            while (isRunning) {
+                // If the handover is closed or has errors, exit.
+                // If there is no streaming phase, the handover will be closed 
by the engine.
+                handleBatch(handover.pollNext());
+            }
+        } catch (Handover.ClosedException e) {
+            // ignore
+        } catch (RetriableException e) {
+            // Retriable exception should be ignored by DebeziumChangeFetcher,
+            // refer https://issues.redhat.com/browse/DBZ-2531 for more 
information.
+            // Because Retriable exception is ignored by the DebeziumEngine and
+            // the retry is handled in 
io.debezium.connector.common.BaseSourceTask.poll()
+            LOG.info(
+                    "Ignore the RetriableException, the underlying 
DebeziumEngine will restart automatically",
+                    e);
+        }
+    }
+
+    public void close() {
+        isRunning = false;
+        handover.close();
+    }
+
+    // 
---------------------------------------------------------------------------------------
+    // Metric getter
+    // 
---------------------------------------------------------------------------------------
+
+    /**
+     * The metric indicates delay from data generation to entry into the 
system.
+     *
+     * <p>Note: the metric is available during the binlog phase. Use 0 to 
indicate the metric is
+     * unavailable.
+     */
+    public long getFetchDelay() {
+        return fetchDelay;
+    }
+
+    /**
+     * The metric indicates delay from data generation to leaving the source 
operator.
+     *
+     * <p>Note: the metric is available during the binlog phase. Use 0 to 
indicate the metric is
+     * unavailable.
+     */
+    public long getEmitDelay() {
+        return emitDelay;
+    }
+
+    public long getIdleTime() {
+        return System.currentTimeMillis() - processTime;
+    }
+
+    // 
---------------------------------------------------------------------------------------
+    // Helper
+    // 
---------------------------------------------------------------------------------------
+
+    private void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> 
changeEvents)
+            throws Exception {
+        if (CollectionUtils.isEmpty(changeEvents)) {
+            return;
+        }
+        this.processTime = System.currentTimeMillis();
+
+        for (ChangeEvent<SourceRecord, SourceRecord> event : changeEvents) {
+            SourceRecord record = event.value();
+            updateMessageTimestamp(record);
+            fetchDelay = isInDbSnapshotPhase ? 0L : processTime - 
messageTimestamp;
+
+            if (isHeartbeatEvent(record)) {
+                // keep offset update
+                synchronized (checkpointLock) {
+                    
debeziumOffset.setSourcePartition(record.sourcePartition());
+                    debeziumOffset.setSourceOffset(record.sourceOffset());
+                }
+                // drop heartbeat events
+                continue;
+            }
+
+            deserialization.deserialize(record, debeziumCollector);
+
+            if (!isSnapshotRecord(record)) {
+                LOG.debug("Snapshot phase finishes.");
+                isInDbSnapshotPhase = false;
+            }
+
+            // emit the actual records. this also updates offset state 
atomically
+            emitRecordsUnderCheckpointLock(
+                    debeziumCollector.records, record.sourcePartition(), 
record.sourceOffset());
+        }
+    }
+
+    private void emitRecordsUnderCheckpointLock(
+            Queue<T> records, Map<String, ?> sourcePartition, Map<String, ?> 
sourceOffset) {
+        // Emit the records. Use the checkpoint lock to guarantee
+        // atomicity of record emission and offset state update.
+        // The synchronized checkpointLock is reentrant. It's safe to sync 
again in snapshot mode.
+        synchronized (checkpointLock) {
+            T record;
+            while ((record = records.poll()) != null) {
+                emitDelay =
+                        isInDbSnapshotPhase ? 0L : System.currentTimeMillis() 
- messageTimestamp;
+                sourceContext.collect(record);
+            }
+            // update offset to state
+            debeziumOffset.setSourcePartition(sourcePartition);
+            debeziumOffset.setSourceOffset(sourceOffset);
+        }
+    }
+
+    private void updateMessageTimestamp(SourceRecord record) {
+        Schema schema = record.valueSchema();
+        Struct value = (Struct) record.value();
+        if (schema.field(Envelope.FieldName.SOURCE) == null) {
+            return;
+        }
+
+        Struct source = value.getStruct(Envelope.FieldName.SOURCE);
+        if (source.schema().field(Envelope.FieldName.TIMESTAMP) == null) {
+            return;
+        }
+
+        Long tsMs = source.getInt64(Envelope.FieldName.TIMESTAMP);
+        if (tsMs != null) {
+            this.messageTimestamp = tsMs;
+        }
+    }
+
+    private boolean isHeartbeatEvent(SourceRecord record) {
+        String topic = record.topic();
+        return topic != null && topic.startsWith(heartbeatTopicPrefix);
+    }
+
+    private boolean isSnapshotRecord(SourceRecord record) {
+        Struct value = (Struct) record.value();
+        if (value != null) {
+            Struct source = value.getStruct(Envelope.FieldName.SOURCE);
+            SnapshotRecord snapshotRecord = SnapshotRecord.fromSource(source);
+            // even if it is the last record of snapshot, i.e. 
SnapshotRecord.LAST
+            // we can still recover from checkpoint and continue to read the 
binlog,
+            // because the checkpoint contains binlog position
+            return SnapshotRecord.TRUE == snapshotRecord;
+        }
+        return false;
+    }
+
+    // 
---------------------------------------------------------------------------------------
+
+    private class DebeziumCollector implements Collector<T> {
+
+        private final Queue<T> records = new ArrayDeque<>();
+
+        @Override
+        public void collect(T record) {
+            records.add(record);
+        }
+
+        @Override
+        public void close() {
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumDeserializationSchema.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumDeserializationSchema.java
new file mode 100644
index 0000000000..31dd750341
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumDeserializationSchema.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.postgre;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.io.Serializable;
+
+/**
+ * The deserialization schema describes how to turn the Debezium SourceRecord 
into data types
+ * (Java/Scala objects) that are processed by Flink.
+ *
+ * @param <T> The type created by the deserialization schema.
+ * <p>
+ * Copy from com.ververica:flink-connector-postgres-cdc-2.3.0
+ */
+@PublicEvolving
+public interface DebeziumDeserializationSchema<T> extends Serializable, 
ResultTypeQueryable<T> {
+
+    void open();
+
+    /** Deserialize the Debezium record, it is represented in Kafka {@link 
SourceRecord}. */
+    void deserialize(SourceRecord record, Collector<T> out) throws Exception;
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java
new file mode 100644
index 0000000000..27b9988dd0
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.postgre;
+
+import com.ververica.cdc.debezium.Validator;
+import com.ververica.cdc.debezium.internal.DebeziumChangeConsumer;
+import com.ververica.cdc.debezium.internal.DebeziumOffset;
+import com.ververica.cdc.debezium.internal.DebeziumOffsetSerializer;
+import com.ververica.cdc.debezium.internal.FlinkDatabaseHistory;
+import com.ververica.cdc.debezium.internal.FlinkDatabaseSchemaHistory;
+import com.ververica.cdc.debezium.internal.FlinkOffsetBackingStore;
+import com.ververica.cdc.debezium.internal.Handover;
+import com.ververica.cdc.debezium.internal.SchemaRecord;
+import io.debezium.document.DocumentReader;
+import io.debezium.document.DocumentWriter;
+import io.debezium.embedded.Connect;
+import io.debezium.engine.DebeziumEngine;
+import io.debezium.engine.spi.OffsetCommitPolicy;
+import io.debezium.heartbeat.Heartbeat;
+import org.apache.commons.collections.map.LinkedMap;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import static 
com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
+import static 
com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
+
+/**
+ * The {@link DebeziumSourceFunction} is a streaming data source that pulls 
captured change data
+ * from databases into Flink.
+ *
+ * <p>There are two workers during the runtime. One worker periodically pulls 
records from the
+ * database and pushes the records into the {@link Handover}. The other worker 
consumes the records
+ * from the {@link Handover} and convert the records to the data in Flink 
style. The reason why
+ * don't use one workers is because debezium has different behaviours in 
snapshot phase and
+ * streaming phase.
+ *
+ * <p>Here we use the {@link Handover} as the buffer to submit data from the 
producer to the
+ * consumer. Because the two threads don't communicate to each other directly, 
the error reporting
+ * also relies on {@link Handover}. When the engine gets errors, the engine 
uses the {@link
+ * DebeziumEngine.CompletionCallback} to report errors to the {@link Handover} 
and wakes up the
+ * consumer to check the error. However, the source function just closes the 
engine and wakes up the
+ * producer if the error is from the Flink side.
+ *
+ * <p>If the execution is canceled or finish(only snapshot phase), the exit 
logic is as same as the
+ * logic in the error reporting.
+ *
+ * <p>The source function participates in checkpointing and guarantees that no 
data is lost during a
+ * failure, and that the computation processes elements "exactly once".
+ *
+ * <p>Note: currently, the source function can't run in multiple parallel 
instances.
+ *
+ * <p>
+ * Copy from com.ververica:flink-connector-postgres-cdc-2.3.0
+ */
+@PublicEvolving
+public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
+        implements
+            CheckpointedFunction,
+            CheckpointListener,
+            ResultTypeQueryable<T> {
+
+    private static final long serialVersionUID = -5808108641062931623L;
+
+    protected static final Logger LOG = 
LoggerFactory.getLogger(DebeziumSourceFunction.class);
+
+    /** State name of the consumer's partition offset states. */
+    public static final String OFFSETS_STATE_NAME = "offset-states";
+
+    /** State name of the consumer's history records state. */
+    public static final String HISTORY_RECORDS_STATE_NAME = 
"history-records-states";
+
+    /** The maximum number of pending non-committed checkpoints to track, to 
avoid memory leaks. */
+    public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
+
+    /**
+     * The configuration represents the Debezium MySQL Connector uses the 
legacy implementation or
+     * not.
+     */
+    public static final String LEGACY_IMPLEMENTATION_KEY = 
"internal.implementation";
+
+    /** The configuration value represents legacy implementation. */
+    public static final String LEGACY_IMPLEMENTATION_VALUE = "legacy";
+
+    // 
---------------------------------------------------------------------------------------
+    // Properties
+    // 
---------------------------------------------------------------------------------------
+
+    /** The schema to convert from Debezium's messages into Flink's objects. */
+    private final DebeziumDeserializationSchema<T> deserializer;
+
+    /** User-supplied properties for Kafka. * */
+    private final Properties properties;
+
+    /** The specific binlog offset to read from when the first startup. */
+    private final @Nullable DebeziumOffset specificOffset;
+
+    /** Data for pending but uncommitted offsets. */
+    private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
+
+    /** Flag indicating whether the Debezium Engine is started. */
+    private volatile boolean debeziumStarted = false;
+
+    /** Validator to validate the connected database satisfies the cdc 
connector's requirements. */
+    private final Validator validator;
+
+    // 
---------------------------------------------------------------------------------------
+    // State
+    // 
---------------------------------------------------------------------------------------
+
+    /**
+     * The offsets to restore to, if the consumer restores state from a 
checkpoint.
+     *
+     * <p>This map will be populated by the {@link 
#initializeState(FunctionInitializationContext)}
+     * method.
+     *
+     * <p>Using a String because we are encoding the offset state in JSON 
bytes.
+     */
+    private transient volatile String restoredOffsetState;
+
+    /** Accessor for state in the operator state backend. */
+    private transient ListState<byte[]> offsetState;
+
+    /**
+     * State to store the history records, i.e. schema changes.
+     *
+     * @see FlinkDatabaseHistory
+     * @see FlinkDatabaseSchemaHistory
+     */
+    private transient ListState<String> schemaRecordsState;
+
+    // 
---------------------------------------------------------------------------------------
+    // Worker
+    // 
---------------------------------------------------------------------------------------
+
+    private transient ExecutorService executor;
+    private transient DebeziumEngine<?> engine;
+    /**
+     * Unique name of this Debezium Engine instance across all the jobs. 
Currently we randomly
+     * generate a UUID for it. This is used for {@link FlinkDatabaseHistory}.
+     */
+    private transient String engineInstanceName;
+
+    /** Consume the events from the engine and commit the offset to the 
engine. */
+    private transient DebeziumChangeConsumer changeConsumer;
+
+    /** The consumer to fetch records from {@link Handover}. */
+    private transient DebeziumChangeFetcher<T> debeziumChangeFetcher;
+
+    /** Buffer the events from the source and record the errors from the 
debezium. */
+    private transient Handover handover;
+
+    // 
---------------------------------------------------------------------------------------
+
+    public DebeziumSourceFunction(
+            DebeziumDeserializationSchema<T> deserializer,
+            Properties properties,
+            @Nullable DebeziumOffset specificOffset,
+            Validator validator) {
+        this.deserializer = deserializer;
+        this.properties = properties;
+        this.specificOffset = specificOffset;
+        this.validator = validator;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        validator.validate();
+        super.open(parameters);
+        ThreadFactory threadFactory =
+                new 
ThreadFactoryBuilder().setNameFormat("debezium-engine").build();
+        deserializer.open();
+        this.executor = Executors.newSingleThreadExecutor(threadFactory);
+        this.handover = new Handover();
+        this.changeConsumer = new DebeziumChangeConsumer(handover);
+    }
+
+    // ------------------------------------------------------------------------
+    // Checkpoint and restore
+    // ------------------------------------------------------------------------
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws 
Exception {
+        OperatorStateStore stateStore = context.getOperatorStateStore();
+        this.offsetState =
+                stateStore.getUnionListState(
+                        new ListStateDescriptor<>(
+                                OFFSETS_STATE_NAME,
+                                
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO));
+        this.schemaRecordsState =
+                stateStore.getUnionListState(
+                        new ListStateDescriptor<>(
+                                HISTORY_RECORDS_STATE_NAME, 
BasicTypeInfo.STRING_TYPE_INFO));
+
+        if (context.isRestored()) {
+            restoreOffsetState();
+            restoreHistoryRecordsState();
+        } else {
+            if (specificOffset != null) {
+                byte[] serializedOffset =
+                        
DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset);
+                restoredOffsetState = new String(serializedOffset, 
StandardCharsets.UTF_8);
+                LOG.info(
+                        "Consumer subtask {} starts to read from specified 
offset {}.",
+                        getRuntimeContext().getIndexOfThisSubtask(),
+                        restoredOffsetState);
+            } else {
+                LOG.info(
+                        "Consumer subtask {} has no restore state.",
+                        getRuntimeContext().getIndexOfThisSubtask());
+            }
+        }
+    }
+
+    private void restoreOffsetState() throws Exception {
+        for (byte[] serializedOffset : offsetState.get()) {
+            if (restoredOffsetState == null) {
+                restoredOffsetState = new String(serializedOffset, 
StandardCharsets.UTF_8);
+            } else {
+                throw new RuntimeException(
+                        "Debezium Source only support single task, "
+                                + "however, this is restored from multiple 
tasks.");
+            }
+        }
+        LOG.info(
+                "Consumer subtask {} restored offset state: {}.",
+                getRuntimeContext().getIndexOfThisSubtask(),
+                restoredOffsetState);
+    }
+
+    private void restoreHistoryRecordsState() throws Exception {
+        DocumentReader reader = DocumentReader.defaultReader();
+        ConcurrentLinkedQueue<SchemaRecord> historyRecords = new 
ConcurrentLinkedQueue<>();
+        int recordsCount = 0;
+        boolean firstEntry = true;
+        for (String record : schemaRecordsState.get()) {
+            if (firstEntry) {
+                // we store the engine instance name in the first element
+                this.engineInstanceName = record;
+                firstEntry = false;
+            } else {
+                // Put the records into the state. The database history should 
read, reorganize and
+                // register the state.
+                historyRecords.add(new SchemaRecord(reader.read(record)));
+                recordsCount++;
+            }
+        }
+        if (engineInstanceName != null) {
+            registerHistory(engineInstanceName, historyRecords);
+        }
+        LOG.info(
+                "Consumer subtask {} restored history records state: {} with 
{} records.",
+                getRuntimeContext().getIndexOfThisSubtask(),
+                engineInstanceName,
+                recordsCount);
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) 
throws Exception {
+        if (handover.hasError()) {
+            LOG.debug("snapshotState() called on closed source");
+            throw new FlinkRuntimeException(
+                    "Call snapshotState() on closed source, checkpoint 
failed.");
+        } else {
+            snapshotOffsetState(functionSnapshotContext.getCheckpointId());
+            snapshotHistoryRecordsState();
+        }
+    }
+
+    private void snapshotOffsetState(long checkpointId) throws Exception {
+        offsetState.clear();
+
+        final DebeziumChangeFetcher<?> fetcher = this.debeziumChangeFetcher;
+
+        byte[] serializedOffset = null;
+        if (fetcher == null) {
+            // the fetcher has not yet been initialized, which means we need 
to return the
+            // originally restored offsets
+            if (restoredOffsetState != null) {
+                serializedOffset = 
restoredOffsetState.getBytes(StandardCharsets.UTF_8);
+            }
+        } else {
+            byte[] currentState = fetcher.snapshotCurrentState();
+            if (currentState == null && restoredOffsetState != null) {
+                // the fetcher has been initialized, but has not yet received 
any data,
+                // which means we need to return the originally restored 
offsets.
+                serializedOffset = 
restoredOffsetState.getBytes(StandardCharsets.UTF_8);
+            } else {
+                serializedOffset = currentState;
+            }
+        }
+
+        if (serializedOffset != null) {
+            offsetState.add(serializedOffset);
+            // the map cannot be asynchronously updated, because only one 
checkpoint call
+            // can happen on this function at a time: either snapshotState() or
+            // notifyCheckpointComplete()
+            pendingOffsetsToCommit.put(checkpointId, serializedOffset);
+            // truncate the map of pending offsets to commit, to prevent 
infinite growth
+            while (pendingOffsetsToCommit.size() > 
MAX_NUM_PENDING_CHECKPOINTS) {
+                pendingOffsetsToCommit.remove(0);
+            }
+        }
+    }
+
+    private void snapshotHistoryRecordsState() throws Exception {
+        schemaRecordsState.clear();
+
+        if (engineInstanceName != null) {
+            schemaRecordsState.add(engineInstanceName);
+            Collection<SchemaRecord> records = 
retrieveHistory(engineInstanceName);
+            DocumentWriter writer = DocumentWriter.defaultWriter();
+            for (SchemaRecord record : records) {
+                schemaRecordsState.add(writer.write(record.toDocument()));
+            }
+        }
+    }
+
+    @Override
+    public void run(SourceContext<T> sourceContext) throws Exception {
+        properties.setProperty("name", "engine");
+        properties.setProperty("offset.storage", 
FlinkOffsetBackingStore.class.getCanonicalName());
+        if (restoredOffsetState != null) {
+            // restored from state
+            properties.setProperty(FlinkOffsetBackingStore.OFFSET_STATE_VALUE, 
restoredOffsetState);
+        }
+        // DO NOT include schema change, e.g. DDL
+        properties.setProperty("include.schema.changes", "false");
+        // disable the offset flush totally
+        properties.setProperty("offset.flush.interval.ms", 
String.valueOf(Long.MAX_VALUE));
+        // disable tombstones
+        properties.setProperty("tombstones.on.delete", "false");
+        if (engineInstanceName == null) {
+            // not restore from recovery
+            engineInstanceName = UUID.randomUUID().toString();
+        }
+        // history instance name to initialize FlinkDatabaseHistory
+        properties.setProperty(
+                FlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME, 
engineInstanceName);
+        // we have to use a persisted DatabaseHistory implementation, 
otherwise, recovery can't
+        // continue to read binlog
+        // see
+        // 
https://stackoverflow.com/questions/57147584/debezium-error-schema-isnt-know-to-this-connector
+        // and 
https://debezium.io/blog/2018/03/16/note-on-database-history-topic-configuration/
+        properties.setProperty("database.history", 
determineDatabase().getCanonicalName());
+
+        // we have to filter out the heartbeat events, otherwise the 
deserializer will fail
+        String dbzHeartbeatPrefix =
+                properties.getProperty(
+                        Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(),
+                        
Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString());
+        this.debeziumChangeFetcher =
+                new DebeziumChangeFetcher<>(
+                        sourceContext,
+                        deserializer,
+                        restoredOffsetState == null, // DB snapshot phase if 
restore state is null
+                        dbzHeartbeatPrefix,
+                        handover);
+
+        // create the engine with this configuration ...
+        this.engine =
+                DebeziumEngine.create(Connect.class)
+                        .using(properties)
+                        .notifying(changeConsumer)
+                        .using(OffsetCommitPolicy.always())
+                        .using(
+                                (success, message, error) -> {
+                                    if (success) {
+                                        // Close the handover and prepare to 
exit.
+                                        handover.close();
+                                    } else {
+                                        handover.reportError(error);
+                                    }
+                                })
+                        .build();
+
+        // run the engine asynchronously
+        executor.execute(engine);
+        debeziumStarted = true;
+
+        // initialize metrics
+        // make RuntimeContext#getMetricGroup compatible between Flink 1.13 
and Flink 1.14
+        final Method getMetricGroupMethod =
+                getRuntimeContext().getClass().getMethod("getMetricGroup");
+        getMetricGroupMethod.setAccessible(true);
+        final MetricGroup metricGroup =
+                (MetricGroup) getMetricGroupMethod.invoke(getRuntimeContext());
+
+        metricGroup.gauge(
+                "currentFetchEventTimeLag",
+                (Gauge<Long>) () -> debeziumChangeFetcher.getFetchDelay());
+        metricGroup.gauge(
+                "currentEmitEventTimeLag",
+                (Gauge<Long>) () -> debeziumChangeFetcher.getEmitDelay());
+        metricGroup.gauge(
+                "sourceIdleTime", (Gauge<Long>) () -> 
debeziumChangeFetcher.getIdleTime());
+
+        // start the real debezium consumer
+        debeziumChangeFetcher.runFetchLoop();
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {
+        if (!debeziumStarted) {
+            LOG.debug("notifyCheckpointComplete() called when engine is not 
started.");
+            return;
+        }
+
+        final DebeziumChangeFetcher<T> fetcher = this.debeziumChangeFetcher;
+        if (fetcher == null) {
+            LOG.debug("notifyCheckpointComplete() called on uninitialized 
source");
+            return;
+        }
+
+        try {
+            final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
+            if (posInMap == -1) {
+                LOG.warn(
+                        "Consumer subtask {} received confirmation for unknown 
checkpoint id {}",
+                        getRuntimeContext().getIndexOfThisSubtask(),
+                        checkpointId);
+                return;
+            }
+
+            byte[] serializedOffsets = (byte[]) 
pendingOffsetsToCommit.remove(posInMap);
+
+            // remove older checkpoints in map
+            for (int i = 0; i < posInMap; i++) {
+                pendingOffsetsToCommit.remove(0);
+            }
+
+            if (serializedOffsets == null || serializedOffsets.length == 0) {
+                LOG.debug(
+                        "Consumer subtask {} has empty checkpoint state.",
+                        getRuntimeContext().getIndexOfThisSubtask());
+                return;
+            }
+
+            DebeziumOffset offset =
+                    
DebeziumOffsetSerializer.INSTANCE.deserialize(serializedOffsets);
+            changeConsumer.commitOffset(offset);
+        } catch (Exception e) {
+            // ignore exception if we are no longer running
+            LOG.warn("Ignore error when committing offset to database.", e);
+        }
+    }
+
+    @Override
+    public void cancel() {
+        // safely and gracefully stop the engine
+        shutdownEngine();
+        if (debeziumChangeFetcher != null) {
+            debeziumChangeFetcher.close();
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        cancel();
+
+        if (executor != null) {
+            executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+        }
+
+        super.close();
+    }
+
+    /** Safely and gracefully stop the Debezium engine. */
+    private void shutdownEngine() {
+        try {
+            if (engine != null) {
+                engine.close();
+            }
+        } catch (IOException e) {
+            ExceptionUtils.rethrow(e);
+        } finally {
+            if (executor != null) {
+                executor.shutdownNow();
+            }
+
+            debeziumStarted = false;
+
+            if (handover != null) {
+                handover.close();
+            }
+        }
+    }
+
+    @Override
+    public TypeInformation<T> getProducedType() {
+        return deserializer.getProducedType();
+    }
+
+    @VisibleForTesting
+    public LinkedMap getPendingOffsetsToCommit() {
+        return pendingOffsetsToCommit;
+    }
+
+    @VisibleForTesting
+    public boolean getDebeziumStarted() {
+        return debeziumStarted;
+    }
+
+    private Class<?> determineDatabase() {
+        boolean isCompatibleWithLegacy =
+                
FlinkDatabaseHistory.isCompatible(retrieveHistory(engineInstanceName));
+        if 
(LEGACY_IMPLEMENTATION_VALUE.equals(properties.get(LEGACY_IMPLEMENTATION_KEY))) 
{
+            // specifies the legacy implementation but the state may be 
incompatible
+            if (isCompatibleWithLegacy) {
+                return FlinkDatabaseHistory.class;
+            } else {
+                throw new IllegalStateException(
+                        "The configured option 
'debezium.internal.implementation' is 'legacy', but the state of source is 
incompatible with this implementation, you should remove the the option.");
+            }
+        } else if 
(FlinkDatabaseSchemaHistory.isCompatible(retrieveHistory(engineInstanceName))) {
+            // tries the non-legacy first
+            return FlinkDatabaseSchemaHistory.class;
+        } else if (isCompatibleWithLegacy) {
+            // fallback to legacy if possible
+            return FlinkDatabaseHistory.class;
+        } else {
+            // impossible
+            throw new IllegalStateException("Can't determine which 
DatabaseHistory to use.");
+        }
+    }
+
+    @VisibleForTesting
+    public String getEngineInstanceName() {
+        return engineInstanceName;
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLSource.java
new file mode 100644
index 0000000000..040541b827
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLSource.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.postgre;
+
+import com.ververica.cdc.debezium.Validator;
+import io.debezium.connector.postgresql.PostgresConnector;
+
+import java.time.Duration;
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A builder to build a SourceFunction which can read snapshot and continue to 
consume binlog for
+ * PostgreSQL.
+ * <p>
+ * Copy from com.ververica:flink-connector-postgres-cdc-2.3.0
+ * */
+public class PostgreSQLSource {
+
+    private static final long DEFAULT_HEARTBEAT_MS = 
Duration.ofMinutes(5).toMillis();
+
+    public static <T> Builder<T> builder() {
+        return new Builder<>();
+    }
+
+    /** Builder class of {@link PostgreSQLSource}. */
+    public static class Builder<T> {
+
+        private String pluginName = "decoderbufs";
+        private String slotName = "flink";
+        private int port = 5432; // default 5432 port
+        private String hostname;
+        private String database;
+        private String username;
+        private String password;
+        private String[] schemaList;
+        private String[] tableList;
+        private Properties dbzProperties;
+        private DebeziumDeserializationSchema<T> deserializer;
+
+        /**
+         * The name of the Postgres logical decoding plug-in installed on the 
server. Supported
+         * values are decoderbufs, wal2json, wal2json_rds, wal2json_streaming,
+         * wal2json_rds_streaming and pgoutput.
+         */
+        public Builder<T> decodingPluginName(String name) {
+            this.pluginName = name;
+            return this;
+        }
+
+        public Builder<T> hostname(String hostname) {
+            this.hostname = hostname;
+            return this;
+        }
+
+        /** Integer port number of the PostgreSQL database server. */
+        public Builder<T> port(int port) {
+            this.port = port;
+            return this;
+        }
+
+        /** The name of the PostgreSQL database from which to stream the 
changes. */
+        public Builder<T> database(String database) {
+            this.database = database;
+            return this;
+        }
+
+        /**
+         * An optional list of regular expressions that match schema names to 
be monitored; any
+         * schema name not included in the whitelist will be excluded from 
monitoring. By default
+         * all non-system schemas will be monitored.
+         */
+        public Builder<T> schemaList(String... schemaList) {
+            this.schemaList = schemaList;
+            return this;
+        }
+
+        /**
+         * An optional list of regular expressions that match fully-qualified 
table identifiers for
+         * tables to be monitored; any table not included in the whitelist 
will be excluded from
+         * monitoring. Each identifier is of the form schemaName.tableName. By 
default the connector
+         * will monitor every non-system table in each monitored schema.
+         */
+        public Builder<T> tableList(String... tableList) {
+            this.tableList = tableList;
+            return this;
+        }
+
+        /**
+         * Name of the PostgreSQL database to use when connecting to the 
PostgreSQL database server.
+         */
+        public Builder<T> username(String username) {
+            this.username = username;
+            return this;
+        }
+
+        /** Password to use when connecting to the PostgreSQL database server. 
*/
+        public Builder<T> password(String password) {
+            this.password = password;
+            return this;
+        }
+
+        /**
+         * The name of the PostgreSQL logical decoding slot that was created 
for streaming changes
+         * from a particular plug-in for a particular database/schema. The 
server uses this slot to
+         * stream events to the connector that you are configuring. Default is 
"flink".
+         *
+         * <p>Slot names must conform to <a
+         * 
href="https://www.postgresql.org/docs/current/static/warm-standby.html#STREAMING-REPLICATION-SLOTS-MANIPULATION";>PostgreSQL
+         * replication slot naming rules</a>, which state: "Each replication 
slot has a name, which
+         * can contain lower-case letters, numbers, and the underscore 
character."
+         */
+        public Builder<T> slotName(String slotName) {
+            this.slotName = slotName;
+            return this;
+        }
+
+        /** The Debezium Postgres connector properties. */
+        public Builder<T> debeziumProperties(Properties properties) {
+            this.dbzProperties = properties;
+            return this;
+        }
+
+        /**
+         * The deserializer used to convert from consumed {@link
+         * org.apache.kafka.connect.source.SourceRecord}.
+         */
+        public Builder<T> deserializer(DebeziumDeserializationSchema<T> 
deserializer) {
+            this.deserializer = deserializer;
+            return this;
+        }
+
+        public DebeziumSourceFunction<T> build() {
+            Properties props = new Properties();
+            props.setProperty("connector.class", 
PostgresConnector.class.getCanonicalName());
+            props.setProperty("plugin.name", pluginName);
+            // hard code server name, because we don't need to distinguish it, 
docs:
+            // Logical name that identifies and provides a namespace for the 
particular PostgreSQL
+            // database server/cluster being monitored. The logical name 
should be unique across
+            // all other connectors, since it is used as a prefix for all 
Kafka topic names coming
+            // from this connector. Only alphanumeric characters and 
underscores should be used.
+            props.setProperty("database.server.name", "postgres_cdc_source");
+            props.setProperty("database.hostname", checkNotNull(hostname));
+            props.setProperty("database.dbname", checkNotNull(database));
+            props.setProperty("database.user", checkNotNull(username));
+            props.setProperty("database.password", checkNotNull(password));
+            props.setProperty("database.port", String.valueOf(port));
+            props.setProperty("slot.name", slotName);
+            // we have to enable heartbeat for PG to make sure 
DebeziumChangeConsumer#handleBatch
+            // is invoked after job restart
+            props.setProperty("heartbeat.interval.ms", 
String.valueOf(DEFAULT_HEARTBEAT_MS));
+
+            if (schemaList != null) {
+                props.setProperty("schema.whitelist", String.join(",", 
schemaList));
+            }
+            if (tableList != null) {
+                props.setProperty("table.whitelist", String.join(",", 
tableList));
+            }
+
+            if (dbzProperties != null) {
+                props.putAll(dbzProperties);
+            }
+
+            return new DebeziumSourceFunction<>(
+                    deserializer, props, null, 
Validator.getDefaultValidator());
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java
index 22fb76a4e1..6e4bd7c922 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java
@@ -18,13 +18,9 @@
 package org.apache.inlong.sort.postgre;
 
 import org.apache.inlong.sort.base.metric.MetricOption;
-import org.apache.inlong.sort.base.metric.SourceMetricData;
 
-import com.ververica.cdc.connectors.postgres.PostgreSQLSource;
 import 
com.ververica.cdc.connectors.postgres.table.PostgreSQLDeserializationConverterFactory;
 import com.ververica.cdc.connectors.postgres.table.PostgreSQLReadableMetadata;
-import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
-import com.ververica.cdc.debezium.DebeziumSourceFunction;
 import com.ververica.cdc.debezium.table.DebeziumChangelogMode;
 import com.ververica.cdc.debezium.table.MetadataConverter;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -139,7 +135,7 @@ public class PostgreSQLTableSource implements 
ScanTableSource, SupportsReadingMe
                                 
PostgreSQLDeserializationConverterFactory.instance())
                         .setValueValidator(new 
PostgresValueValidator(schemaName, tableName))
                         .setChangelogMode(changelogMode)
-                        .setSourceMetricData(metricOption == null ? null : new 
SourceMetricData(metricOption))
+                        .setMetricOption(metricOption)
                         .build();
         DebeziumSourceFunction<RowData> sourceFunction =
                 PostgreSQLSource.<RowData>builder()
@@ -237,7 +233,8 @@ public class PostgreSQLTableSource implements 
ScanTableSource, SupportsReadingMe
                 dbzProperties,
                 producedDataType,
                 metadataKeys,
-                changelogMode);
+                changelogMode,
+                metricOption);
     }
 
     @Override
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java
index c6cf4e0d54..de701a2ae1 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java
@@ -17,10 +17,10 @@
 
 package org.apache.inlong.sort.postgre;
 
+import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricsCollector;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
 
-import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
 import com.ververica.cdc.debezium.table.AppendMetadataCollector;
 import com.ververica.cdc.debezium.table.DebeziumChangelogMode;
 import com.ververica.cdc.debezium.table.DeserializationRuntimeConverter;
@@ -100,7 +100,8 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
 
     /** Changelog Mode to use for encoding changes in Flink internal data 
structure. */
     private final DebeziumChangelogMode changelogMode;
-    private final SourceMetricData sourceMetricData;
+    private final MetricOption metricOption;
+    private SourceMetricData sourceMetricData;
 
     /** Returns a builder to build {@link RowDataDebeziumDeserializeSchema}. */
     public static Builder newBuilder() {
@@ -115,7 +116,7 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
             ZoneId serverTimeZone,
             DeserializationRuntimeConverterFactory userDefinedConverterFactory,
             DebeziumChangelogMode changelogMode,
-            SourceMetricData sourceMetricData) {
+            MetricOption metricOption) {
         this.hasMetadata = checkNotNull(metadataConverters).length > 0;
         this.appendMetadataCollector = new 
AppendMetadataCollector(metadataConverters);
         this.physicalConverter =
@@ -126,7 +127,14 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
         this.resultTypeInfo = checkNotNull(resultTypeInfo);
         this.validator = checkNotNull(validator);
         this.changelogMode = checkNotNull(changelogMode);
-        this.sourceMetricData = sourceMetricData;
+        this.metricOption = metricOption;
+    }
+
+    @Override
+    public void open() {
+        if (metricOption != null) {
+            sourceMetricData = new SourceMetricData(metricOption);
+        }
     }
 
     @Override
@@ -146,6 +154,9 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
             GenericRowData delete = extractBeforeRow(value, valueSchema);
             validator.validate(delete, RowKind.DELETE);
             delete.setRowKind(RowKind.DELETE);
+            if (sourceMetricData != null) {
+                out = new MetricsCollector<>(out, sourceMetricData);
+            }
             emit(record, delete, out);
         } else {
             if (changelogMode == DebeziumChangelogMode.ALL) {
@@ -208,7 +219,7 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
         private DeserializationRuntimeConverterFactory 
userDefinedConverterFactory =
                 DeserializationRuntimeConverterFactory.DEFAULT;
         private DebeziumChangelogMode changelogMode = 
DebeziumChangelogMode.ALL;
-        private SourceMetricData sourceMetricData;
+        private MetricOption metricOption;
 
         public Builder setPhysicalRowType(RowType physicalRowType) {
             this.physicalRowType = physicalRowType;
@@ -240,8 +251,8 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
             this.changelogMode = changelogMode;
             return this;
         }
-        public Builder setSourceMetricData(SourceMetricData sourceMetricData) {
-            this.sourceMetricData = sourceMetricData;
+        public Builder setMetricOption(MetricOption metricOption) {
+            this.metricOption = metricOption;
             return this;
         }
 
@@ -254,7 +265,7 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
                     serverTimeZone,
                     userDefinedConverterFactory,
                     changelogMode,
-                    sourceMetricData);
+                    metricOption);
         }
     }
 
diff --git a/licenses/inlong-sort-connectors/LICENSE 
b/licenses/inlong-sort-connectors/LICENSE
index 262d48fce0..261b1c9f97 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -862,10 +862,14 @@
     Source  : com.ververica:flink-connector-mongodb-cdc:2.3.0 (Please note 
that the software have been modified.)
     License : 
https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
 
-1.3.24 
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableFactory.java
-             
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java
-             
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgresValueValidator.java
-             
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java
+1.3.24 
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumChangeFetcher.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumDeserializationSchema.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLSource.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableFactory.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgresValueValidator.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java
 Source  : com.ververica:flink-connector-postgres-cdc:2.3.0 (Please note that 
the software have been modified.)
 License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
 

Reply via email to