This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 24beb99161 [INLONG-10340][Sort] Fix MongoDB AuditOperator not 
serialized (#10343)
24beb99161 is described below

commit 24beb99161cd5fce8d74d5f3bef104896f93cc8f
Author: XiaoYou201 <58425449+xiaoyou...@users.noreply.github.com>
AuthorDate: Wed Jun 5 11:37:35 2024 +0800

    [INLONG-10340][Sort] Fix MongoDB AuditOperator not serialized (#10343)
---
 .../inlong/sort/mongodb/DebeziumChangeFetcher.java | 322 ++++++++++++
 .../mongodb/DebeziumDeserializationSchema.java     |  42 ++
 .../sort/mongodb/DebeziumSourceFunction.java       | 563 +++++++++++++++++++++
 .../MongoDBConnectorDeserializationSchema.java     |  12 +-
 .../apache/inlong/sort/mongodb/MongoDBSource.java  | 344 +++++++++++++
 .../inlong/sort/mongodb/MongoDBTableSource.java    |   9 +-
 .../sort/mongodb/source/IncrementalSource.java     | 217 ++++++++
 .../source/IncrementalSourceRecordEmitter.java     | 176 +++++++
 .../sort/mongodb/source/MongoDBRecordEmitter.java  | 111 ++++
 .../inlong/sort/mongodb/source/MongoDBSource.java  |  95 ++++
 .../sort/mongodb/source/MongoDBSourceBuilder.java  | 201 ++++++++
 licenses/inlong-sort-connectors/LICENSE            |  14 +-
 12 files changed, 2096 insertions(+), 10 deletions(-)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumChangeFetcher.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumChangeFetcher.java
new file mode 100644
index 0000000000..b5d06500fe
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumChangeFetcher.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.mongodb;
+
+import com.ververica.cdc.debezium.internal.DebeziumOffset;
+import com.ververica.cdc.debezium.internal.DebeziumOffsetSerializer;
+import com.ververica.cdc.debezium.internal.Handover;
+import io.debezium.connector.SnapshotRecord;
+import io.debezium.data.Envelope;
+import io.debezium.engine.ChangeEvent;
+import io.debezium.engine.DebeziumEngine;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.util.Collector;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+/**
+ * A Handler that convert change messages from {@link DebeziumEngine} to data 
in Flink. Considering
+ * Debezium in different mode has different strategies to hold the lock, e.g. 
snapshot, the handler
+ * also needs different strategy. In snapshot phase, the handler needs to hold 
the lock until the
+ * snapshot finishes. But in non-snapshot phase, the handler only needs to 
hold the lock when
+ * emitting the records.
+ *
+ * @param <T> The type of elements produced by the handler.
+ * <p>
+ * Copy from com.ververica:flink-connector-mongodb-cdc-2.3.0
+ */
+@Internal
+public class DebeziumChangeFetcher<T> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DebeziumChangeFetcher.class);
+
+    private final SourceFunction.SourceContext<T> sourceContext;
+
+    /**
+     * The lock that guarantees that record emission and state updates are 
atomic, from the view of
+     * taking a checkpoint.
+     */
+    private final Object checkpointLock;
+
+    /** The schema to convert from Debezium's messages into Flink's objects. */
+    private final DebeziumDeserializationSchema<T> deserialization;
+
+    /** A collector to emit records in batch (bundle). */
+    private final DebeziumCollector debeziumCollector;
+
+    private final DebeziumOffset debeziumOffset;
+
+    private final DebeziumOffsetSerializer stateSerializer;
+
+    private final String heartbeatTopicPrefix;
+
+    private boolean isInDbSnapshotPhase;
+
+    private final Handover handover;
+
+    private volatile boolean isRunning = true;
+
+    // 
---------------------------------------------------------------------------------------
+    // Metrics
+    // 
---------------------------------------------------------------------------------------
+
+    /** Timestamp of change event. If the event is a snapshot event, the 
timestamp is 0L. */
+    private volatile long messageTimestamp = 0L;
+
+    /** The last record processing time. */
+    private volatile long processTime = 0L;
+
+    /**
+     * currentFetchEventTimeLag = FetchTime - messageTimestamp, where the 
FetchTime is the time the
+     * record fetched into the source operator.
+     */
+    private volatile long fetchDelay = 0L;
+
+    /**
+     * emitDelay = EmitTime - messageTimestamp, where the EmitTime is the time 
the record leaves the
+     * source operator.
+     */
+    private volatile long emitDelay = 0L;
+
+    // ------------------------------------------------------------------------
+
+    public DebeziumChangeFetcher(
+            SourceFunction.SourceContext<T> sourceContext,
+            DebeziumDeserializationSchema<T> deserialization,
+            boolean isInDbSnapshotPhase,
+            String heartbeatTopicPrefix,
+            Handover handover) {
+        this.sourceContext = sourceContext;
+        this.checkpointLock = sourceContext.getCheckpointLock();
+        this.deserialization = deserialization;
+        this.isInDbSnapshotPhase = isInDbSnapshotPhase;
+        this.heartbeatTopicPrefix = heartbeatTopicPrefix;
+        this.debeziumCollector = new DebeziumCollector();
+        this.debeziumOffset = new DebeziumOffset();
+        this.stateSerializer = DebeziumOffsetSerializer.INSTANCE;
+        this.handover = handover;
+    }
+
+    /**
+     * Take a snapshot of the Debezium handler state.
+     *
+     * <p>Important: This method must be called under the checkpoint lock.
+     */
+    public byte[] snapshotCurrentState() throws Exception {
+        // this method assumes that the checkpoint lock is held
+        assert Thread.holdsLock(checkpointLock);
+        if (debeziumOffset.sourceOffset == null || 
debeziumOffset.sourcePartition == null) {
+            return null;
+        }
+
+        return stateSerializer.serialize(debeziumOffset);
+    }
+
+    /**
+     * Process change messages from the {@link Handover} and collect the 
processed messages by
+     * {@link Collector}.
+     */
+    public void runFetchLoop() throws Exception {
+        try {
+            // begin snapshot database phase
+            if (isInDbSnapshotPhase) {
+                List<ChangeEvent<SourceRecord, SourceRecord>> events = 
handover.pollNext();
+
+                synchronized (checkpointLock) {
+                    LOG.info(
+                            "Database snapshot phase can't perform checkpoint, 
acquired Checkpoint lock.");
+                    handleBatch(events);
+                    while (isRunning && isInDbSnapshotPhase) {
+                        handleBatch(handover.pollNext());
+                    }
+                }
+                LOG.info("Received record from streaming binlog phase, 
released checkpoint lock.");
+            }
+
+            // begin streaming binlog phase
+            while (isRunning) {
+                // If the handover is closed or has errors, exit.
+                // If there is no streaming phase, the handover will be closed 
by the engine.
+                handleBatch(handover.pollNext());
+            }
+        } catch (Handover.ClosedException e) {
+            // ignore
+        } catch (RetriableException e) {
+            // Retriable exception should be ignored by DebeziumChangeFetcher,
+            // refer https://issues.redhat.com/browse/DBZ-2531 for more 
information.
+            // Because Retriable exception is ignored by the DebeziumEngine and
+            // the retry is handled in 
io.debezium.connector.common.BaseSourceTask.poll()
+            LOG.info(
+                    "Ignore the RetriableException, the underlying 
DebeziumEngine will restart automatically",
+                    e);
+        }
+    }
+
+    public void close() {
+        isRunning = false;
+        handover.close();
+    }
+
+    // 
---------------------------------------------------------------------------------------
+    // Metric getter
+    // 
---------------------------------------------------------------------------------------
+
+    /**
+     * The metric indicates delay from data generation to entry into the 
system.
+     *
+     * <p>Note: the metric is available during the binlog phase. Use 0 to 
indicate the metric is
+     * unavailable.
+     */
+    public long getFetchDelay() {
+        return fetchDelay;
+    }
+
+    /**
+     * The metric indicates delay from data generation to leaving the source 
operator.
+     *
+     * <p>Note: the metric is available during the binlog phase. Use 0 to 
indicate the metric is
+     * unavailable.
+     */
+    public long getEmitDelay() {
+        return emitDelay;
+    }
+
+    public long getIdleTime() {
+        return System.currentTimeMillis() - processTime;
+    }
+
+    // 
---------------------------------------------------------------------------------------
+    // Helper
+    // 
---------------------------------------------------------------------------------------
+
+    private void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> 
changeEvents)
+            throws Exception {
+        if (CollectionUtils.isEmpty(changeEvents)) {
+            return;
+        }
+        this.processTime = System.currentTimeMillis();
+
+        for (ChangeEvent<SourceRecord, SourceRecord> event : changeEvents) {
+            SourceRecord record = event.value();
+            updateMessageTimestamp(record);
+            fetchDelay = isInDbSnapshotPhase ? 0L : processTime - 
messageTimestamp;
+
+            if (isHeartbeatEvent(record)) {
+                // keep offset update
+                synchronized (checkpointLock) {
+                    
debeziumOffset.setSourcePartition(record.sourcePartition());
+                    debeziumOffset.setSourceOffset(record.sourceOffset());
+                }
+                // drop heartbeat events
+                continue;
+            }
+
+            deserialization.deserialize(record, debeziumCollector);
+
+            if (!isSnapshotRecord(record)) {
+                LOG.debug("Snapshot phase finishes.");
+                isInDbSnapshotPhase = false;
+            }
+
+            // emit the actual records. this also updates offset state 
atomically
+            emitRecordsUnderCheckpointLock(
+                    debeziumCollector.records, record.sourcePartition(), 
record.sourceOffset());
+        }
+    }
+
+    private void emitRecordsUnderCheckpointLock(
+            Queue<T> records, Map<String, ?> sourcePartition, Map<String, ?> 
sourceOffset) {
+        // Emit the records. Use the checkpoint lock to guarantee
+        // atomicity of record emission and offset state update.
+        // The synchronized checkpointLock is reentrant. It's safe to sync 
again in snapshot mode.
+        synchronized (checkpointLock) {
+            T record;
+            while ((record = records.poll()) != null) {
+                emitDelay =
+                        isInDbSnapshotPhase ? 0L : System.currentTimeMillis() 
- messageTimestamp;
+                sourceContext.collect(record);
+            }
+            // update offset to state
+            debeziumOffset.setSourcePartition(sourcePartition);
+            debeziumOffset.setSourceOffset(sourceOffset);
+        }
+    }
+
+    private void updateMessageTimestamp(SourceRecord record) {
+        Schema schema = record.valueSchema();
+        Struct value = (Struct) record.value();
+        if (schema.field(Envelope.FieldName.SOURCE) == null) {
+            return;
+        }
+
+        Struct source = value.getStruct(Envelope.FieldName.SOURCE);
+        if (source.schema().field(Envelope.FieldName.TIMESTAMP) == null) {
+            return;
+        }
+
+        Long tsMs = source.getInt64(Envelope.FieldName.TIMESTAMP);
+        if (tsMs != null) {
+            this.messageTimestamp = tsMs;
+        }
+    }
+
+    private boolean isHeartbeatEvent(SourceRecord record) {
+        String topic = record.topic();
+        return topic != null && topic.startsWith(heartbeatTopicPrefix);
+    }
+
+    private boolean isSnapshotRecord(SourceRecord record) {
+        Struct value = (Struct) record.value();
+        if (value != null) {
+            Struct source = value.getStruct(Envelope.FieldName.SOURCE);
+            SnapshotRecord snapshotRecord = SnapshotRecord.fromSource(source);
+            // even if it is the last record of snapshot, i.e. 
SnapshotRecord.LAST
+            // we can still recover from checkpoint and continue to read the 
binlog,
+            // because the checkpoint contains binlog position
+            return SnapshotRecord.TRUE == snapshotRecord;
+        }
+        return false;
+    }
+
+    // 
---------------------------------------------------------------------------------------
+
+    private class DebeziumCollector implements Collector<T> {
+
+        private final Queue<T> records = new ArrayDeque<>();
+
+        @Override
+        public void collect(T record) {
+            records.add(record);
+        }
+
+        @Override
+        public void close() {
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumDeserializationSchema.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumDeserializationSchema.java
new file mode 100644
index 0000000000..982adfd542
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumDeserializationSchema.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.mongodb;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.io.Serializable;
+
+/**
+ * The deserialization schema describes how to turn the Debezium SourceRecord 
into data types
+ * (Java/Scala objects) that are processed by Flink.
+ *
+ * @param <T> The type created by the deserialization schema.
+ * <p>
+ * Copy from com.ververica:flink-connector-mongodb-cdc-2.3.0
+ */
+@PublicEvolving
+public interface DebeziumDeserializationSchema<T> extends Serializable, 
ResultTypeQueryable<T> {
+
+    void open();
+
+    /** Deserialize the Debezium record, it is represented in Kafka {@link 
SourceRecord}. */
+    void deserialize(SourceRecord record, Collector<T> out) throws Exception;
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumSourceFunction.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumSourceFunction.java
new file mode 100644
index 0000000000..daaaace1b0
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumSourceFunction.java
@@ -0,0 +1,563 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.mongodb;
+
+import com.ververica.cdc.debezium.Validator;
+import com.ververica.cdc.debezium.internal.DebeziumChangeConsumer;
+import com.ververica.cdc.debezium.internal.DebeziumOffset;
+import com.ververica.cdc.debezium.internal.DebeziumOffsetSerializer;
+import com.ververica.cdc.debezium.internal.FlinkDatabaseHistory;
+import com.ververica.cdc.debezium.internal.FlinkDatabaseSchemaHistory;
+import com.ververica.cdc.debezium.internal.FlinkOffsetBackingStore;
+import com.ververica.cdc.debezium.internal.Handover;
+import com.ververica.cdc.debezium.internal.SchemaRecord;
+import io.debezium.document.DocumentReader;
+import io.debezium.document.DocumentWriter;
+import io.debezium.embedded.Connect;
+import io.debezium.engine.DebeziumEngine;
+import io.debezium.engine.spi.OffsetCommitPolicy;
+import io.debezium.heartbeat.Heartbeat;
+import org.apache.commons.collections.map.LinkedMap;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import static 
com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
+import static 
com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
+
+/**
+ * The {@link DebeziumSourceFunction} is a streaming data source that pulls 
captured change data
+ * from databases into Flink.
+ *
+ * <p>There are two workers during the runtime. One worker periodically pulls 
records from the
+ * database and pushes the records into the {@link Handover}. The other worker 
consumes the records
+ * from the {@link Handover} and convert the records to the data in Flink 
style. The reason why
+ * don't use one workers is because debezium has different behaviours in 
snapshot phase and
+ * streaming phase.
+ *
+ * <p>Here we use the {@link Handover} as the buffer to submit data from the 
producer to the
+ * consumer. Because the two threads don't communicate to each other directly, 
the error reporting
+ * also relies on {@link Handover}. When the engine gets errors, the engine 
uses the {@link
+ * DebeziumEngine.CompletionCallback} to report errors to the {@link Handover} 
and wakes up the
+ * consumer to check the error. However, the source function just closes the 
engine and wakes up the
+ * producer if the error is from the Flink side.
+ *
+ * <p>If the execution is canceled or finish(only snapshot phase), the exit 
logic is as same as the
+ * logic in the error reporting.
+ *
+ * <p>The source function participates in checkpointing and guarantees that no 
data is lost during a
+ * failure, and that the computation processes elements "exactly once".
+ *
+ * <p>Note: currently, the source function can't run in multiple parallel 
instances.
+ * Copy from com.ververica:flink-connector-mongodb-cdc-2.3.0
+ * <p>
+ * Copy from com.ververica:flink-connector-mongodb-cdc-2.3.0
+ */
+@PublicEvolving
+public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
+        implements
+            CheckpointedFunction,
+            CheckpointListener,
+            ResultTypeQueryable<T> {
+
+    private static final long serialVersionUID = -5808108641062931623L;
+
+    protected static final Logger LOG = 
LoggerFactory.getLogger(DebeziumSourceFunction.class);
+
+    /** State name of the consumer's partition offset states. */
+    public static final String OFFSETS_STATE_NAME = "offset-states";
+
+    /** State name of the consumer's history records state. */
+    public static final String HISTORY_RECORDS_STATE_NAME = 
"history-records-states";
+
+    /** The maximum number of pending non-committed checkpoints to track, to 
avoid memory leaks. */
+    public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
+
+    /**
+     * The configuration represents the Debezium MySQL Connector uses the 
legacy implementation or
+     * not.
+     */
+    public static final String LEGACY_IMPLEMENTATION_KEY = 
"internal.implementation";
+
+    /** The configuration value represents legacy implementation. */
+    public static final String LEGACY_IMPLEMENTATION_VALUE = "legacy";
+
+    // 
---------------------------------------------------------------------------------------
+    // Properties
+    // 
---------------------------------------------------------------------------------------
+
+    /** The schema to convert from Debezium's messages into Flink's objects. */
+    private final DebeziumDeserializationSchema<T> deserializer;
+
+    /** User-supplied properties for Kafka. * */
+    private final Properties properties;
+
+    /** The specific binlog offset to read from when the first startup. */
+    private final @Nullable DebeziumOffset specificOffset;
+
+    /** Data for pending but uncommitted offsets. */
+    private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
+
+    /** Flag indicating whether the Debezium Engine is started. */
+    private volatile boolean debeziumStarted = false;
+
+    /** Validator to validate the connected database satisfies the cdc 
connector's requirements. */
+    private final Validator validator;
+
+    // 
---------------------------------------------------------------------------------------
+    // State
+    // 
---------------------------------------------------------------------------------------
+
+    /**
+     * The offsets to restore to, if the consumer restores state from a 
checkpoint.
+     *
+     * <p>This map will be populated by the {@link 
#initializeState(FunctionInitializationContext)}
+     * method.
+     *
+     * <p>Using a String because we are encoding the offset state in JSON 
bytes.
+     */
+    private transient volatile String restoredOffsetState;
+
+    /** Accessor for state in the operator state backend. */
+    private transient ListState<byte[]> offsetState;
+
+    /**
+     * State to store the history records, i.e. schema changes.
+     *
+     * @see FlinkDatabaseHistory
+     * @see FlinkDatabaseSchemaHistory
+     */
+    private transient ListState<String> schemaRecordsState;
+
+    // 
---------------------------------------------------------------------------------------
+    // Worker
+    // 
---------------------------------------------------------------------------------------
+
+    private transient ExecutorService executor;
+    private transient DebeziumEngine<?> engine;
+    /**
+     * Unique name of this Debezium Engine instance across all the jobs. 
Currently we randomly
+     * generate a UUID for it. This is used for {@link FlinkDatabaseHistory}.
+     */
+    private transient String engineInstanceName;
+
+    /** Consume the events from the engine and commit the offset to the 
engine. */
+    private transient DebeziumChangeConsumer changeConsumer;
+
+    /** The consumer to fetch records from {@link Handover}. */
+    private transient DebeziumChangeFetcher<T> debeziumChangeFetcher;
+
+    /** Buffer the events from the source and record the errors from the 
debezium. */
+    private transient Handover handover;
+
+    // 
---------------------------------------------------------------------------------------
+
+    public DebeziumSourceFunction(
+            DebeziumDeserializationSchema<T> deserializer,
+            Properties properties,
+            @Nullable DebeziumOffset specificOffset,
+            Validator validator) {
+        this.deserializer = deserializer;
+        this.properties = properties;
+        this.specificOffset = specificOffset;
+        this.validator = validator;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        validator.validate();
+        super.open(parameters);
+        ThreadFactory threadFactory =
+                new 
ThreadFactoryBuilder().setNameFormat("debezium-engine").build();
+        deserializer.open();
+        this.executor = Executors.newSingleThreadExecutor(threadFactory);
+        this.handover = new Handover();
+        this.changeConsumer = new DebeziumChangeConsumer(handover);
+    }
+
+    // ------------------------------------------------------------------------
+    // Checkpoint and restore
+    // ------------------------------------------------------------------------
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws 
Exception {
+        OperatorStateStore stateStore = context.getOperatorStateStore();
+        this.offsetState =
+                stateStore.getUnionListState(
+                        new ListStateDescriptor<>(
+                                OFFSETS_STATE_NAME,
+                                
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO));
+        this.schemaRecordsState =
+                stateStore.getUnionListState(
+                        new ListStateDescriptor<>(
+                                HISTORY_RECORDS_STATE_NAME, 
BasicTypeInfo.STRING_TYPE_INFO));
+
+        if (context.isRestored()) {
+            restoreOffsetState();
+            restoreHistoryRecordsState();
+        } else {
+            if (specificOffset != null) {
+                byte[] serializedOffset =
+                        
DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset);
+                restoredOffsetState = new String(serializedOffset, 
StandardCharsets.UTF_8);
+                LOG.info(
+                        "Consumer subtask {} starts to read from specified 
offset {}.",
+                        getRuntimeContext().getIndexOfThisSubtask(),
+                        restoredOffsetState);
+            } else {
+                LOG.info(
+                        "Consumer subtask {} has no restore state.",
+                        getRuntimeContext().getIndexOfThisSubtask());
+            }
+        }
+    }
+
+    private void restoreOffsetState() throws Exception {
+        for (byte[] serializedOffset : offsetState.get()) {
+            if (restoredOffsetState == null) {
+                restoredOffsetState = new String(serializedOffset, 
StandardCharsets.UTF_8);
+            } else {
+                throw new RuntimeException(
+                        "Debezium Source only support single task, "
+                                + "however, this is restored from multiple 
tasks.");
+            }
+        }
+        LOG.info(
+                "Consumer subtask {} restored offset state: {}.",
+                getRuntimeContext().getIndexOfThisSubtask(),
+                restoredOffsetState);
+    }
+
+    private void restoreHistoryRecordsState() throws Exception {
+        DocumentReader reader = DocumentReader.defaultReader();
+        ConcurrentLinkedQueue<SchemaRecord> historyRecords = new 
ConcurrentLinkedQueue<>();
+        int recordsCount = 0;
+        boolean firstEntry = true;
+        for (String record : schemaRecordsState.get()) {
+            if (firstEntry) {
+                // we store the engine instance name in the first element
+                this.engineInstanceName = record;
+                firstEntry = false;
+            } else {
+                // Put the records into the state. The database history should 
read, reorganize and
+                // register the state.
+                historyRecords.add(new SchemaRecord(reader.read(record)));
+                recordsCount++;
+            }
+        }
+        if (engineInstanceName != null) {
+            registerHistory(engineInstanceName, historyRecords);
+        }
+        LOG.info(
+                "Consumer subtask {} restored history records state: {} with 
{} records.",
+                getRuntimeContext().getIndexOfThisSubtask(),
+                engineInstanceName,
+                recordsCount);
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) 
throws Exception {
+        if (handover.hasError()) {
+            LOG.debug("snapshotState() called on closed source");
+            throw new FlinkRuntimeException(
+                    "Call snapshotState() on closed source, checkpoint 
failed.");
+        } else {
+            snapshotOffsetState(functionSnapshotContext.getCheckpointId());
+            snapshotHistoryRecordsState();
+        }
+    }
+
+    private void snapshotOffsetState(long checkpointId) throws Exception {
+        offsetState.clear();
+
+        final DebeziumChangeFetcher<?> fetcher = this.debeziumChangeFetcher;
+
+        byte[] serializedOffset = null;
+        if (fetcher == null) {
+            // the fetcher has not yet been initialized, which means we need 
to return the
+            // originally restored offsets
+            if (restoredOffsetState != null) {
+                serializedOffset = 
restoredOffsetState.getBytes(StandardCharsets.UTF_8);
+            }
+        } else {
+            byte[] currentState = fetcher.snapshotCurrentState();
+            if (currentState == null && restoredOffsetState != null) {
+                // the fetcher has been initialized, but has not yet received 
any data,
+                // which means we need to return the originally restored 
offsets.
+                serializedOffset = 
restoredOffsetState.getBytes(StandardCharsets.UTF_8);
+            } else {
+                serializedOffset = currentState;
+            }
+        }
+
+        if (serializedOffset != null) {
+            offsetState.add(serializedOffset);
+            // the map cannot be asynchronously updated, because only one 
checkpoint call
+            // can happen on this function at a time: either snapshotState() or
+            // notifyCheckpointComplete()
+            pendingOffsetsToCommit.put(checkpointId, serializedOffset);
+            // truncate the map of pending offsets to commit, to prevent 
infinite growth
+            while (pendingOffsetsToCommit.size() > 
MAX_NUM_PENDING_CHECKPOINTS) {
+                pendingOffsetsToCommit.remove(0);
+            }
+        }
+    }
+
+    private void snapshotHistoryRecordsState() throws Exception {
+        schemaRecordsState.clear();
+
+        if (engineInstanceName != null) {
+            schemaRecordsState.add(engineInstanceName);
+            Collection<SchemaRecord> records = 
retrieveHistory(engineInstanceName);
+            DocumentWriter writer = DocumentWriter.defaultWriter();
+            for (SchemaRecord record : records) {
+                schemaRecordsState.add(writer.write(record.toDocument()));
+            }
+        }
+    }
+
+    @Override
+    public void run(SourceContext<T> sourceContext) throws Exception {
+        properties.setProperty("name", "engine");
+        properties.setProperty("offset.storage", 
FlinkOffsetBackingStore.class.getCanonicalName());
+        if (restoredOffsetState != null) {
+            // restored from state
+            properties.setProperty(FlinkOffsetBackingStore.OFFSET_STATE_VALUE, 
restoredOffsetState);
+        }
+        // DO NOT include schema change, e.g. DDL
+        properties.setProperty("include.schema.changes", "false");
+        // disable the offset flush totally
+        properties.setProperty("offset.flush.interval.ms", 
String.valueOf(Long.MAX_VALUE));
+        // disable tombstones
+        properties.setProperty("tombstones.on.delete", "false");
+        if (engineInstanceName == null) {
+            // not restore from recovery
+            engineInstanceName = UUID.randomUUID().toString();
+        }
+        // history instance name to initialize FlinkDatabaseHistory
+        properties.setProperty(
+                FlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME, 
engineInstanceName);
+        // we have to use a persisted DatabaseHistory implementation, 
otherwise, recovery can't
+        // continue to read binlog
+        // see
+        // 
https://stackoverflow.com/questions/57147584/debezium-error-schema-isnt-know-to-this-connector
+        // and 
https://debezium.io/blog/2018/03/16/note-on-database-history-topic-configuration/
+        properties.setProperty("database.history", 
determineDatabase().getCanonicalName());
+
+        // we have to filter out the heartbeat events, otherwise the 
deserializer will fail
+        String dbzHeartbeatPrefix =
+                properties.getProperty(
+                        Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(),
+                        
Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString());
+        this.debeziumChangeFetcher =
+                new DebeziumChangeFetcher<>(
+                        sourceContext,
+                        deserializer,
+                        restoredOffsetState == null, // DB snapshot phase if 
restore state is null
+                        dbzHeartbeatPrefix,
+                        handover);
+
+        // create the engine with this configuration ...
+        this.engine =
+                DebeziumEngine.create(Connect.class)
+                        .using(properties)
+                        .notifying(changeConsumer)
+                        .using(OffsetCommitPolicy.always())
+                        .using(
+                                (success, message, error) -> {
+                                    if (success) {
+                                        // Close the handover and prepare to 
exit.
+                                        handover.close();
+                                    } else {
+                                        handover.reportError(error);
+                                    }
+                                })
+                        .build();
+
+        // run the engine asynchronously
+        executor.execute(engine);
+        debeziumStarted = true;
+
+        // initialize metrics
+        // make RuntimeContext#getMetricGroup compatible between Flink 1.13 
and Flink 1.14
+        final Method getMetricGroupMethod =
+                getRuntimeContext().getClass().getMethod("getMetricGroup");
+        getMetricGroupMethod.setAccessible(true);
+        final MetricGroup metricGroup =
+                (MetricGroup) getMetricGroupMethod.invoke(getRuntimeContext());
+
+        metricGroup.gauge(
+                "currentFetchEventTimeLag",
+                (Gauge<Long>) () -> debeziumChangeFetcher.getFetchDelay());
+        metricGroup.gauge(
+                "currentEmitEventTimeLag",
+                (Gauge<Long>) () -> debeziumChangeFetcher.getEmitDelay());
+        metricGroup.gauge(
+                "sourceIdleTime", (Gauge<Long>) () -> 
debeziumChangeFetcher.getIdleTime());
+
+        // start the real debezium consumer
+        debeziumChangeFetcher.runFetchLoop();
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {
+        if (!debeziumStarted) {
+            LOG.debug("notifyCheckpointComplete() called when engine is not 
started.");
+            return;
+        }
+
+        final DebeziumChangeFetcher<T> fetcher = this.debeziumChangeFetcher;
+        if (fetcher == null) {
+            LOG.debug("notifyCheckpointComplete() called on uninitialized 
source");
+            return;
+        }
+
+        try {
+            final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
+            if (posInMap == -1) {
+                LOG.warn(
+                        "Consumer subtask {} received confirmation for unknown 
checkpoint id {}",
+                        getRuntimeContext().getIndexOfThisSubtask(),
+                        checkpointId);
+                return;
+            }
+
+            byte[] serializedOffsets = (byte[]) 
pendingOffsetsToCommit.remove(posInMap);
+
+            // remove older checkpoints in map
+            for (int i = 0; i < posInMap; i++) {
+                pendingOffsetsToCommit.remove(0);
+            }
+
+            if (serializedOffsets == null || serializedOffsets.length == 0) {
+                LOG.debug(
+                        "Consumer subtask {} has empty checkpoint state.",
+                        getRuntimeContext().getIndexOfThisSubtask());
+                return;
+            }
+
+            DebeziumOffset offset =
+                    
DebeziumOffsetSerializer.INSTANCE.deserialize(serializedOffsets);
+            changeConsumer.commitOffset(offset);
+        } catch (Exception e) {
+            // ignore exception if we are no longer running
+            LOG.warn("Ignore error when committing offset to database.", e);
+        }
+    }
+
+    @Override
+    public void cancel() {
+        // safely and gracefully stop the engine
+        shutdownEngine();
+        if (debeziumChangeFetcher != null) {
+            debeziumChangeFetcher.close();
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        cancel();
+
+        if (executor != null) {
+            executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+        }
+
+        super.close();
+    }
+
+    /** Safely and gracefully stop the Debezium engine. */
+    private void shutdownEngine() {
+        try {
+            if (engine != null) {
+                engine.close();
+            }
+        } catch (IOException e) {
+            ExceptionUtils.rethrow(e);
+        } finally {
+            if (executor != null) {
+                executor.shutdownNow();
+            }
+
+            debeziumStarted = false;
+
+            if (handover != null) {
+                handover.close();
+            }
+        }
+    }
+
+    @Override
+    public TypeInformation<T> getProducedType() {
+        return deserializer.getProducedType();
+    }
+
+    private Class<?> determineDatabase() {
+        boolean isCompatibleWithLegacy =
+                
FlinkDatabaseHistory.isCompatible(retrieveHistory(engineInstanceName));
+        if 
(LEGACY_IMPLEMENTATION_VALUE.equals(properties.get(LEGACY_IMPLEMENTATION_KEY))) 
{
+            // specifies the legacy implementation but the state may be 
incompatible
+            if (isCompatibleWithLegacy) {
+                return FlinkDatabaseHistory.class;
+            } else {
+                throw new IllegalStateException(
+                        "The configured option 
'debezium.internal.implementation' is 'legacy', but the state of source is 
incompatible with this implementation, you should remove the the option.");
+            }
+        } else if 
(FlinkDatabaseSchemaHistory.isCompatible(retrieveHistory(engineInstanceName))) {
+            // tries the non-legacy first
+            return FlinkDatabaseSchemaHistory.class;
+        } else if (isCompatibleWithLegacy) {
+            // fallback to legacy if possible
+            return FlinkDatabaseHistory.class;
+        } else {
+            // impossible
+            throw new IllegalStateException("Can't determine which 
DatabaseHistory to use.");
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBConnectorDeserializationSchema.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBConnectorDeserializationSchema.java
index 22e44f046f..d8e4770f79 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBConnectorDeserializationSchema.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBConnectorDeserializationSchema.java
@@ -24,7 +24,6 @@ import org.apache.inlong.sort.base.metric.SourceMetricData;
 import com.mongodb.client.model.changestream.OperationType;
 import com.mongodb.internal.HexUtils;
 import com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope;
-import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
 import com.ververica.cdc.debezium.table.AppendMetadataCollector;
 import com.ververica.cdc.debezium.table.MetadataConverter;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -111,6 +110,8 @@ public class MongoDBConnectorDeserializationSchema 
implements DebeziumDeserializ
      */
     private final AppendMetadataCollector appendMetadataCollector;
 
+    private final MetricOption metricOption;
+
     private SourceMetricData sourceMetricData;
 
     public MongoDBConnectorDeserializationSchema(
@@ -124,8 +125,13 @@ public class MongoDBConnectorDeserializationSchema 
implements DebeziumDeserializ
         this.physicalConverter = createConverter(physicalDataType);
         this.resultTypeInfo = resultTypeInfo;
         this.localTimeZone = localTimeZone;
+        this.metricOption = metricOption;
+    }
+
+    @Override
+    public void open() {
         if (metricOption != null) {
-            this.sourceMetricData = new SourceMetricData(metricOption);
+            sourceMetricData = new SourceMetricData(metricOption);
         }
     }
 
@@ -151,7 +157,7 @@ public class MongoDBConnectorDeserializationSchema 
implements DebeziumDeserializ
             case DELETE:
                 GenericRowData delete = extractRowData(documentKey);
                 delete.setRowKind(RowKind.DELETE);
-                emit(record, delete, out);
+                emit(record, delete, sourceMetricData == null ? out : new 
MetricsCollector<>(out, sourceMetricData));
                 break;
             case UPDATE:
                 // It’s null if another operation deletes the document
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBSource.java
new file mode 100644
index 0000000000..f9aab2d54f
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBSource.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.mongodb;
+
+import com.mongodb.client.model.changestream.FullDocument;
+import com.mongodb.kafka.connect.source.MongoSourceConfig;
+import com.mongodb.kafka.connect.source.MongoSourceConfig.ErrorTolerance;
+import com.mongodb.kafka.connect.source.MongoSourceConfig.OutputFormat;
+import 
com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceConnector;
+import com.ververica.cdc.debezium.Validator;
+import io.debezium.heartbeat.Heartbeat;
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.Properties;
+
+import static 
com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.COLLECTION_INCLUDE_LIST;
+import static 
com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.DATABASE_INCLUDE_LIST;
+import static 
com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.HEARTBEAT_TOPIC_NAME;
+import static 
com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.OUTPUT_SCHEMA;
+import static 
com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.*;
+import static 
com.ververica.cdc.connectors.mongodb.source.utils.MongoUtils.buildConnectionString;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A builder to build a SourceFunction which can read snapshot and continue to 
consume change stream
+ * events.
+ * <p>
+ * Copy from com.ververica:flink-connector-mongodb-cdc-2.3.0
+ */
+@PublicEvolving
+public class MongoDBSource {
+
+    public static final String FULL_DOCUMENT_UPDATE_LOOKUP = 
FullDocument.UPDATE_LOOKUP.getValue();
+
+    public static final String OUTPUT_FORMAT_SCHEMA =
+            OutputFormat.SCHEMA.name().toLowerCase(Locale.ROOT);
+
+    public static <T> Builder<T> builder() {
+        return new Builder<>();
+    }
+
+    /** Builder class of {@link MongoDBSource}. */
+    public static class Builder<T> {
+
+        private String hosts;
+        private String username;
+        private String password;
+        private List<String> databaseList;
+        private List<String> collectionList;
+        private String connectionOptions;
+        private Integer batchSize = BATCH_SIZE.defaultValue();
+        private Integer pollAwaitTimeMillis = 
POLL_AWAIT_TIME_MILLIS.defaultValue();
+        private Integer pollMaxBatchSize = POLL_MAX_BATCH_SIZE.defaultValue();
+        private Boolean updateLookup = true;
+        private Boolean copyExisting = COPY_EXISTING.defaultValue();
+        private Integer copyExistingMaxThreads;
+        private Integer copyExistingQueueSize;
+        private String copyExistingPipeline;
+        private Integer heartbeatIntervalMillis = 
HEARTBEAT_INTERVAL_MILLIS.defaultValue();
+        private DebeziumDeserializationSchema<T> deserializer;
+
+        /** The comma-separated list of hostname and port pairs of mongodb 
servers. */
+        public Builder<T> hosts(String hosts) {
+            this.hosts = hosts;
+            return this;
+        }
+
+        /**
+         * Ampersand (i.e. &) separated MongoDB connection options eg
+         * replicaSet=test&connectTimeoutMS=300000
+         * 
https://docs.mongodb.com/manual/reference/connection-string/#std-label-connections-connection-options
+         */
+        public Builder<T> connectionOptions(String connectionOptions) {
+            this.connectionOptions = connectionOptions;
+            return this;
+        }
+
+        /** Name of the database user to be used when connecting to MongoDB. */
+        public Builder<T> username(String username) {
+            this.username = username;
+            return this;
+        }
+
+        /** Password to be used when connecting to MongoDB. */
+        public Builder<T> password(String password) {
+            this.password = password;
+            return this;
+        }
+
+        /** Regular expressions list that match database names to be 
monitored. */
+        public Builder<T> databaseList(String... databaseList) {
+            this.databaseList = Arrays.asList(databaseList);
+            return this;
+        }
+
+        /**
+         * Regular expressions that match fully-qualified collection 
identifiers for collections to
+         * be monitored. Each identifier is of the form {@code 
<databaseName>.<collectionName>}.
+         */
+        public Builder<T> collectionList(String... collectionList) {
+            this.collectionList = Arrays.asList(collectionList);
+            return this;
+        }
+
+        /**
+         * batch.size
+         *
+         * <p>The cursor batch size. Default: 1024
+         *
+         * <p>The change stream cursor batch size. Specifies the maximum 
number of change events to
+         * return in each batch of the response from the MongoDB cluster. The 
default is 0 meaning
+         * it uses the server's default value. Default: 0
+         */
+        public Builder<T> batchSize(int batchSize) {
+            checkArgument(batchSize >= 0);
+            this.batchSize = batchSize;
+            return this;
+        }
+
+        /**
+         * poll.await.time.ms
+         *
+         * <p>The amount of time to wait before checking for new results on 
the change stream.
+         * Default: 1000
+         */
+        public Builder<T> pollAwaitTimeMillis(int pollAwaitTimeMillis) {
+            checkArgument(pollAwaitTimeMillis > 0);
+            this.pollAwaitTimeMillis = pollAwaitTimeMillis;
+            return this;
+        }
+
+        /**
+         * poll.max.batch.size
+         *
+         * <p>Maximum number of change stream documents to include in a single 
batch when polling
+         * for new data. This setting can be used to limit the amount of data 
buffered internally in
+         * the connector. Default: 1024
+         */
+        public Builder<T> pollMaxBatchSize(int pollMaxBatchSize) {
+            checkArgument(pollMaxBatchSize > 0);
+            this.pollMaxBatchSize = pollMaxBatchSize;
+            return this;
+        }
+
+        /**
+         * change.stream.full.document
+         *
+         * <p>Determines what to return for update operations when using a 
Change Stream. When set
+         * to true, the change stream for partial updates will include both a 
delta describing the
+         * changes to the document and a copy of the entire document that was 
changed from some time
+         * after the change occurred. Default: true
+         */
+        public Builder<T> updateLookup(boolean updateLookup) {
+            this.updateLookup = updateLookup;
+            return this;
+        }
+
+        /**
+         * copy.existing
+         *
+         * <p>Copy existing data from source collections and convert them to 
Change Stream events on
+         * their respective topics. Any changes to the data that occur during 
the copy process are
+         * applied once the copy is completed.
+         */
+        public Builder<T> copyExisting(boolean copyExisting) {
+            this.copyExisting = copyExisting;
+            return this;
+        }
+
+        /**
+         * copy.existing.max.threads
+         *
+         * <p>The number of threads to use when performing the data copy. 
Defaults to the number of
+         * processors. Default: defaults to the number of processors
+         */
+        public Builder<T> copyExistingMaxThreads(int copyExistingMaxThreads) {
+            checkArgument(copyExistingMaxThreads > 0);
+            this.copyExistingMaxThreads = copyExistingMaxThreads;
+            return this;
+        }
+
+        /**
+         * copy.existing.queue.size
+         *
+         * <p>The max size of the queue to use when copying data. Default: 
10240
+         */
+        public Builder<T> copyExistingQueueSize(int copyExistingQueueSize) {
+            checkArgument(copyExistingQueueSize > 0);
+            this.copyExistingQueueSize = copyExistingQueueSize;
+            return this;
+        }
+
+        /**
+         * copy.existing.pipeline eg. [ { "$match": { "closed": "false" } } ]
+         *
+         * <p>An array of JSON objects describing the pipeline operations to 
run when copying
+         * existing data. This can improve the use of indexes by the copying 
manager and make
+         * copying more efficient.
+         */
+        public Builder<T> copyExistingPipeline(String copyExistingPipeline) {
+            this.copyExistingPipeline = copyExistingPipeline;
+            return this;
+        }
+
+        /**
+         * heartbeat.interval.ms
+         *
+         * <p>The length of time in milliseconds between sending heartbeat 
messages. Heartbeat
+         * messages contain the post batch resume token and are sent when no 
source records have
+         * been published in the specified interval. This improves the 
resumability of the connector
+         * for low volume namespaces. Use 0 to disable.
+         */
+        public Builder<T> heartbeatIntervalMillis(int heartbeatIntervalMillis) 
{
+            checkArgument(heartbeatIntervalMillis >= 0);
+            this.heartbeatIntervalMillis = heartbeatIntervalMillis;
+            return this;
+        }
+
+        /**
+         * The deserializer used to convert from consumed {@link
+         * org.apache.kafka.connect.source.SourceRecord}.
+         */
+        public Builder<T> deserializer(DebeziumDeserializationSchema<T> 
deserializer) {
+            this.deserializer = deserializer;
+            return this;
+        }
+
+        /**
+         * The properties of mongodb kafka connector.
+         * https://docs.mongodb.com/kafka-connector/current/kafka-source
+         */
+        public DebeziumSourceFunction<T> build() {
+            Properties props = new Properties();
+
+            props.setProperty(
+                    "connector.class", 
MongoDBConnectorSourceConnector.class.getCanonicalName());
+            props.setProperty("name", "mongodb_cdc_source");
+
+            props.setProperty(
+                    MongoSourceConfig.CONNECTION_URI_CONFIG,
+                    String.valueOf(
+                            buildConnectionString(username, password, hosts, 
connectionOptions)));
+
+            if (databaseList != null) {
+                props.setProperty(DATABASE_INCLUDE_LIST, String.join(",", 
databaseList));
+            }
+
+            if (collectionList != null) {
+                props.setProperty(COLLECTION_INCLUDE_LIST, String.join(",", 
collectionList));
+            }
+
+            if (updateLookup) {
+                props.setProperty(
+                        MongoSourceConfig.FULL_DOCUMENT_CONFIG, 
FULL_DOCUMENT_UPDATE_LOOKUP);
+            }
+
+            props.setProperty(
+                    MongoSourceConfig.PUBLISH_FULL_DOCUMENT_ONLY_CONFIG,
+                    String.valueOf(Boolean.FALSE));
+
+            props.setProperty(MongoSourceConfig.OUTPUT_FORMAT_KEY_CONFIG, 
OUTPUT_FORMAT_SCHEMA);
+            props.setProperty(MongoSourceConfig.OUTPUT_FORMAT_VALUE_CONFIG, 
OUTPUT_FORMAT_SCHEMA);
+            props.setProperty(
+                    MongoSourceConfig.OUTPUT_SCHEMA_INFER_VALUE_CONFIG,
+                    String.valueOf(Boolean.FALSE));
+            props.setProperty(MongoSourceConfig.OUTPUT_SCHEMA_VALUE_CONFIG, 
OUTPUT_SCHEMA);
+
+            if (batchSize != null) {
+                props.setProperty(MongoSourceConfig.BATCH_SIZE_CONFIG, 
String.valueOf(batchSize));
+            }
+
+            if (pollAwaitTimeMillis != null) {
+                props.setProperty(
+                        MongoSourceConfig.POLL_AWAIT_TIME_MS_CONFIG,
+                        String.valueOf(pollAwaitTimeMillis));
+            }
+
+            if (pollMaxBatchSize != null) {
+                props.setProperty(
+                        MongoSourceConfig.POLL_MAX_BATCH_SIZE_CONFIG,
+                        String.valueOf(pollMaxBatchSize));
+            }
+
+            if (copyExisting != null) {
+                props.setProperty(
+                        MongoSourceConfig.COPY_EXISTING_CONFIG, 
String.valueOf(copyExisting));
+            }
+
+            if (copyExistingMaxThreads != null) {
+                props.setProperty(
+                        MongoSourceConfig.COPY_EXISTING_MAX_THREADS_CONFIG,
+                        String.valueOf(copyExistingMaxThreads));
+            }
+
+            if (copyExistingQueueSize != null) {
+                props.setProperty(
+                        MongoSourceConfig.COPY_EXISTING_QUEUE_SIZE_CONFIG,
+                        String.valueOf(copyExistingQueueSize));
+            }
+
+            if (copyExistingPipeline != null) {
+                props.setProperty(
+                        MongoSourceConfig.COPY_EXISTING_PIPELINE_CONFIG, 
copyExistingPipeline);
+            }
+
+            if (heartbeatIntervalMillis != null) {
+                props.setProperty(
+                        MongoSourceConfig.HEARTBEAT_INTERVAL_MS_CONFIG,
+                        String.valueOf(heartbeatIntervalMillis));
+            }
+
+            props.setProperty(MongoSourceConfig.HEARTBEAT_TOPIC_NAME_CONFIG, 
HEARTBEAT_TOPIC_NAME);
+
+            // Let DebeziumChangeFetcher recognize heartbeat record
+            props.setProperty(Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(), 
HEARTBEAT_TOPIC_NAME);
+
+            props.setProperty(
+                    MongoSourceConfig.ERRORS_LOG_ENABLE_CONFIG, 
String.valueOf(Boolean.TRUE));
+            props.setProperty(
+                    MongoSourceConfig.ERRORS_TOLERANCE_CONFIG, 
ErrorTolerance.NONE.value());
+
+            return new DebeziumSourceFunction<>(
+                    deserializer, props, null, 
Validator.getDefaultValidator());
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBTableSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBTableSource.java
index a20122f99c..9c417b4edf 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBTableSource.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBTableSource.java
@@ -18,11 +18,10 @@
 package org.apache.inlong.sort.mongodb;
 
 import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.mongodb.source.MongoDBSource;
+import org.apache.inlong.sort.mongodb.source.MongoDBSourceBuilder;
 
-import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;
-import com.ververica.cdc.connectors.mongodb.source.MongoDBSourceBuilder;
 import com.ververica.cdc.connectors.mongodb.table.MongoDBReadableMetadata;
-import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
 import com.ververica.cdc.debezium.table.MetadataConverter;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -195,8 +194,8 @@ public class MongoDBTableSource implements ScanTableSource, 
SupportsReadingMetad
 
             return SourceProvider.of(builder.build());
         } else {
-            
com.ververica.cdc.connectors.mongodb.MongoDBSource.Builder<RowData> builder =
-                    
com.ververica.cdc.connectors.mongodb.MongoDBSource.<RowData>builder()
+            org.apache.inlong.sort.mongodb.MongoDBSource.Builder<RowData> 
builder =
+                    
org.apache.inlong.sort.mongodb.MongoDBSource.<RowData>builder()
                             .hosts(hosts)
                             .deserializer(deserializer);
 
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/IncrementalSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/IncrementalSource.java
new file mode 100644
index 0000000000..350e588e21
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/IncrementalSource.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.mongodb.source;
+
+import org.apache.inlong.sort.mongodb.DebeziumDeserializationSchema;
+
+import com.ververica.cdc.connectors.base.config.SourceConfig;
+import com.ververica.cdc.connectors.base.dialect.DataSourceDialect;
+import com.ververica.cdc.connectors.base.options.StartupMode;
+import com.ververica.cdc.connectors.base.source.assigner.HybridSplitAssigner;
+import com.ververica.cdc.connectors.base.source.assigner.SplitAssigner;
+import com.ververica.cdc.connectors.base.source.assigner.StreamSplitAssigner;
+import 
com.ververica.cdc.connectors.base.source.assigner.state.HybridPendingSplitsState;
+import 
com.ververica.cdc.connectors.base.source.assigner.state.PendingSplitsState;
+import 
com.ververica.cdc.connectors.base.source.assigner.state.PendingSplitsStateSerializer;
+import 
com.ververica.cdc.connectors.base.source.assigner.state.StreamPendingSplitsState;
+import 
com.ververica.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator;
+import com.ververica.cdc.connectors.base.source.meta.offset.OffsetFactory;
+import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords;
+import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
+import 
com.ververica.cdc.connectors.base.source.meta.split.SourceSplitSerializer;
+import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitState;
+import com.ververica.cdc.connectors.base.source.metrics.SourceReaderMetrics;
+import com.ververica.cdc.connectors.base.source.reader.IncrementalSourceReader;
+import 
com.ververica.cdc.connectors.base.source.reader.IncrementalSourceSplitReader;
+import io.debezium.relational.TableId;
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.function.Supplier;
+
+/**
+ * The basic source of Incremental Snapshot framework for datasource, it is 
based on FLIP-27 and
+ * Watermark Signal Algorithm which supports parallel reading snapshot of 
table and then continue to
+ * capture data change by streaming reading.
+ * <p>
+ * Copy from com.ververica:flink-connector-mongodb-cdc-2.3.0
+ */
+@Experimental
+public class IncrementalSource<T, C extends SourceConfig>
+        implements
+            Source<T, SourceSplitBase, PendingSplitsState>,
+            ResultTypeQueryable<T> {
+
+    private static final long serialVersionUID = 1L;
+
+    protected final SourceConfig.Factory<C> configFactory;
+    protected final DataSourceDialect<C> dataSourceDialect;
+    protected final OffsetFactory offsetFactory;
+    protected final DebeziumDeserializationSchema<T> deserializationSchema;
+    protected final SourceSplitSerializer sourceSplitSerializer;
+
+    public IncrementalSource(
+            SourceConfig.Factory<C> configFactory,
+            DebeziumDeserializationSchema<T> deserializationSchema,
+            OffsetFactory offsetFactory,
+            DataSourceDialect<C> dataSourceDialect) {
+        this.configFactory = configFactory;
+        this.deserializationSchema = deserializationSchema;
+        this.offsetFactory = offsetFactory;
+        this.dataSourceDialect = dataSourceDialect;
+        this.sourceSplitSerializer =
+                new SourceSplitSerializer() {
+
+                    @Override
+                    public OffsetFactory getOffsetFactory() {
+                        return offsetFactory;
+                    }
+                };
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.CONTINUOUS_UNBOUNDED;
+    }
+
+    @Override
+    public IncrementalSourceReader<T, C> createReader(SourceReaderContext 
readerContext)
+            throws Exception {
+        // create source config for the given subtask (e.g. unique server id)
+        C sourceConfig = 
configFactory.create(readerContext.getIndexOfSubtask());
+        FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>> 
elementsQueue =
+                new FutureCompletingBlockingQueue<>();
+
+        // Forward compatible with flink 1.13
+        final Method metricGroupMethod = 
readerContext.getClass().getMethod("metricGroup");
+        metricGroupMethod.setAccessible(true);
+        final MetricGroup metricGroup = (MetricGroup) 
metricGroupMethod.invoke(readerContext);
+        final SourceReaderMetrics sourceReaderMetrics = new 
SourceReaderMetrics(metricGroup);
+
+        sourceReaderMetrics.registerMetrics();
+        Supplier<IncrementalSourceSplitReader<C>> splitReaderSupplier =
+                () -> new IncrementalSourceSplitReader<>(
+                        readerContext.getIndexOfSubtask(), dataSourceDialect, 
sourceConfig);
+        return new IncrementalSourceReader<>(
+                elementsQueue,
+                splitReaderSupplier,
+                createRecordEmitter(sourceConfig, sourceReaderMetrics),
+                readerContext.getConfiguration(),
+                readerContext,
+                sourceConfig,
+                sourceSplitSerializer,
+                dataSourceDialect);
+    }
+
+    @Override
+    public SplitEnumerator<SourceSplitBase, PendingSplitsState> 
createEnumerator(
+            SplitEnumeratorContext<SourceSplitBase> enumContext) {
+        C sourceConfig = configFactory.create(0);
+        final SplitAssigner splitAssigner;
+        if (sourceConfig.getStartupOptions().startupMode == 
StartupMode.INITIAL) {
+            try {
+                final List<TableId> remainingTables =
+                        
dataSourceDialect.discoverDataCollections(sourceConfig);
+                boolean isTableIdCaseSensitive =
+                        
dataSourceDialect.isDataCollectionIdCaseSensitive(sourceConfig);
+                splitAssigner =
+                        new HybridSplitAssigner<>(
+                                sourceConfig,
+                                enumContext.currentParallelism(),
+                                remainingTables,
+                                isTableIdCaseSensitive,
+                                dataSourceDialect,
+                                offsetFactory);
+            } catch (Exception e) {
+                throw new FlinkRuntimeException(
+                        "Failed to discover captured tables for enumerator", 
e);
+            }
+        } else {
+            splitAssigner = new StreamSplitAssigner(sourceConfig, 
dataSourceDialect, offsetFactory);
+        }
+
+        return new IncrementalSourceEnumerator(enumContext, sourceConfig, 
splitAssigner);
+    }
+
+    @Override
+    public SplitEnumerator<SourceSplitBase, PendingSplitsState> 
restoreEnumerator(
+            SplitEnumeratorContext<SourceSplitBase> enumContext, 
PendingSplitsState checkpoint) {
+        C sourceConfig = configFactory.create(0);
+
+        final SplitAssigner splitAssigner;
+        if (checkpoint instanceof HybridPendingSplitsState) {
+            splitAssigner =
+                    new HybridSplitAssigner<>(
+                            sourceConfig,
+                            enumContext.currentParallelism(),
+                            (HybridPendingSplitsState) checkpoint,
+                            dataSourceDialect,
+                            offsetFactory);
+        } else if (checkpoint instanceof StreamPendingSplitsState) {
+            splitAssigner =
+                    new StreamSplitAssigner(
+                            sourceConfig,
+                            (StreamPendingSplitsState) checkpoint,
+                            dataSourceDialect,
+                            offsetFactory);
+        } else {
+            throw new UnsupportedOperationException(
+                    "Unsupported restored PendingSplitsState: " + checkpoint);
+        }
+        return new IncrementalSourceEnumerator(enumContext, sourceConfig, 
splitAssigner);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<SourceSplitBase> getSplitSerializer() {
+        return sourceSplitSerializer;
+    }
+
+    @Override
+    public SimpleVersionedSerializer<PendingSplitsState> 
getEnumeratorCheckpointSerializer() {
+        SourceSplitSerializer sourceSplitSerializer = (SourceSplitSerializer) 
getSplitSerializer();
+        return new PendingSplitsStateSerializer(sourceSplitSerializer);
+    }
+
+    @Override
+    public TypeInformation<T> getProducedType() {
+        return deserializationSchema.getProducedType();
+    }
+
+    protected RecordEmitter<SourceRecords, T, SourceSplitState> 
createRecordEmitter(
+            SourceConfig sourceConfig, SourceReaderMetrics 
sourceReaderMetrics) {
+        return new IncrementalSourceRecordEmitter<>(
+                deserializationSchema,
+                sourceReaderMetrics,
+                sourceConfig.isIncludeSchemaChanges(),
+                offsetFactory);
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/IncrementalSourceRecordEmitter.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/IncrementalSourceRecordEmitter.java
new file mode 100644
index 0000000000..f8747a04d7
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/IncrementalSourceRecordEmitter.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.mongodb.source;
+
+import org.apache.inlong.sort.mongodb.DebeziumDeserializationSchema;
+
+import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
+import com.ververica.cdc.connectors.base.source.meta.offset.OffsetFactory;
+import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords;
+import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitState;
+import com.ververica.cdc.connectors.base.source.metrics.SourceReaderMetrics;
+import com.ververica.cdc.connectors.base.source.reader.IncrementalSourceReader;
+import com.ververica.cdc.debezium.history.FlinkJsonTableChangeSerializer;
+import io.debezium.document.Array;
+import io.debezium.relational.history.HistoryRecord;
+import io.debezium.relational.history.TableChanges;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.flink.util.Collector;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static 
com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isHighWatermarkEvent;
+import static 
com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isWatermarkEvent;
+import static com.ververica.cdc.connectors.base.utils.SourceRecordUtils.*;
+
+/**
+ * The {@link RecordEmitter} implementation for {@link 
IncrementalSourceReader}.
+ *
+ * <p>The {@link RecordEmitter} buffers the snapshot records of split and call 
the stream reader to
+ * emit records rather than emit the records directly.
+ * <p>
+ * Copy from com.ververica:flink-connector-mongodb-cdc-2.3.0
+ */
+public class IncrementalSourceRecordEmitter<T>
+        implements
+            RecordEmitter<SourceRecords, T, SourceSplitState> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(IncrementalSourceRecordEmitter.class);
+    private static final FlinkJsonTableChangeSerializer 
TABLE_CHANGE_SERIALIZER =
+            new FlinkJsonTableChangeSerializer();
+
+    protected final DebeziumDeserializationSchema<T> 
debeziumDeserializationSchema;
+    protected final SourceReaderMetrics sourceReaderMetrics;
+    protected final boolean includeSchemaChanges;
+    protected final OutputCollector<T> outputCollector;
+    protected final OffsetFactory offsetFactory;
+
+    public IncrementalSourceRecordEmitter(
+            DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
+            SourceReaderMetrics sourceReaderMetrics,
+            boolean includeSchemaChanges,
+            OffsetFactory offsetFactory) {
+        this.debeziumDeserializationSchema = debeziumDeserializationSchema;
+        this.sourceReaderMetrics = sourceReaderMetrics;
+        this.includeSchemaChanges = includeSchemaChanges;
+        this.outputCollector = new OutputCollector<>();
+        this.offsetFactory = offsetFactory;
+    }
+
+    @Override
+    public void emitRecord(
+            SourceRecords sourceRecords, SourceOutput<T> output, 
SourceSplitState splitState)
+            throws Exception {
+        final Iterator<SourceRecord> elementIterator = 
sourceRecords.iterator();
+        while (elementIterator.hasNext()) {
+            processElement(elementIterator.next(), output, splitState);
+        }
+    }
+
+    protected void processElement(
+            SourceRecord element, SourceOutput<T> output, SourceSplitState 
splitState)
+            throws Exception {
+        if (isWatermarkEvent(element)) {
+            Offset watermark = getWatermark(element);
+            if (isHighWatermarkEvent(element) && 
splitState.isSnapshotSplitState()) {
+                splitState.asSnapshotSplitState().setHighWatermark(watermark);
+            }
+        } else if (isSchemaChangeEvent(element) && 
splitState.isStreamSplitState()) {
+            HistoryRecord historyRecord = getHistoryRecord(element);
+            Array tableChanges =
+                    
historyRecord.document().getArray(HistoryRecord.Fields.TABLE_CHANGES);
+            TableChanges changes = 
TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true);
+            for (TableChanges.TableChange tableChange : changes) {
+                
splitState.asStreamSplitState().recordSchema(tableChange.getId(), tableChange);
+            }
+            if (includeSchemaChanges) {
+                emitElement(element, output);
+            }
+        } else if (isDataChangeRecord(element)) {
+            if (splitState.isStreamSplitState()) {
+                Offset position = getOffsetPosition(element);
+                splitState.asStreamSplitState().setStartingOffset(position);
+            }
+            reportMetrics(element);
+            emitElement(element, output);
+        } else {
+            // unknown element
+            LOG.info("Meet unknown element {}, just skip.", element);
+        }
+    }
+
+    private Offset getWatermark(SourceRecord watermarkEvent) {
+        return getOffsetPosition(watermarkEvent.sourceOffset());
+    }
+
+    public Offset getOffsetPosition(SourceRecord dataRecord) {
+        return getOffsetPosition(dataRecord.sourceOffset());
+    }
+
+    public Offset getOffsetPosition(Map<String, ?> offset) {
+        Map<String, String> offsetStrMap = new HashMap<>();
+        for (Map.Entry<String, ?> entry : offset.entrySet()) {
+            offsetStrMap.put(
+                    entry.getKey(), entry.getValue() == null ? null : 
entry.getValue().toString());
+        }
+        return offsetFactory.newOffset(offsetStrMap);
+    }
+
+    protected void emitElement(SourceRecord element, SourceOutput<T> output) 
throws Exception {
+        outputCollector.output = output;
+        debeziumDeserializationSchema.deserialize(element, outputCollector);
+    }
+
+    protected void reportMetrics(SourceRecord element) {
+        long now = System.currentTimeMillis();
+        // record the latest process time
+        sourceReaderMetrics.recordProcessTime(now);
+        Long messageTimestamp = getMessageTimestamp(element);
+
+        if (messageTimestamp != null && messageTimestamp > 0L) {
+            // report fetch delay
+            Long fetchTimestamp = getFetchTimestamp(element);
+            if (fetchTimestamp != null) {
+                sourceReaderMetrics.recordFetchDelay(fetchTimestamp - 
messageTimestamp);
+            }
+            // report emit delay
+            sourceReaderMetrics.recordEmitDelay(now - messageTimestamp);
+        }
+    }
+
+    private static class OutputCollector<T> implements Collector<T> {
+
+        private SourceOutput<T> output;
+
+        @Override
+        public void collect(T record) {
+            output.collect(record);
+        }
+
+        @Override
+        public void close() {
+            // do nothing
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBRecordEmitter.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBRecordEmitter.java
new file mode 100644
index 0000000000..392b39fedd
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBRecordEmitter.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.mongodb.source;
+
+import org.apache.inlong.sort.mongodb.DebeziumDeserializationSchema;
+
+import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
+import com.ververica.cdc.connectors.base.source.meta.offset.OffsetFactory;
+import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitState;
+import com.ververica.cdc.connectors.base.source.meta.split.StreamSplitState;
+import com.ververica.cdc.connectors.base.source.metrics.SourceReaderMetrics;
+import com.ververica.cdc.connectors.base.source.reader.IncrementalSourceReader;
+import com.ververica.cdc.connectors.mongodb.source.offset.ChangeStreamOffset;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.bson.BsonDocument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isHighWatermarkEvent;
+import static 
com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isWatermarkEvent;
+import static 
com.ververica.cdc.connectors.mongodb.source.utils.MongoRecordUtils.*;
+
+/**
+ * The {@link RecordEmitter} implementation for {@link 
IncrementalSourceReader}.
+ *
+ * <p>The {@link RecordEmitter} buffers the snapshot records of split and call 
the stream reader to
+ * emit records rather than emit the records directly.
+ * <p>
+ * Copy from com.ververica:flink-connector-mongodb-cdc-2.3.0
+ */
+public final class MongoDBRecordEmitter<T> extends 
IncrementalSourceRecordEmitter<T> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MongoDBRecordEmitter.class);
+
+    public MongoDBRecordEmitter(
+            DebeziumDeserializationSchema<T> deserializationSchema,
+            SourceReaderMetrics sourceReaderMetrics,
+            OffsetFactory offsetFactory) {
+        super(deserializationSchema, sourceReaderMetrics, false, 
offsetFactory);
+    }
+
+    @Override
+    protected void processElement(
+            SourceRecord element, SourceOutput<T> output, SourceSplitState 
splitState)
+            throws Exception {
+        if (isWatermarkEvent(element)) {
+            Offset watermark = getOffsetPosition(element);
+            if (isHighWatermarkEvent(element) && 
splitState.isSnapshotSplitState()) {
+                splitState.asSnapshotSplitState().setHighWatermark(watermark);
+            }
+        } else if (isHeartbeatEvent(element)) {
+            if (splitState.isStreamSplitState()) {
+                updatePositionForStreamSplit(element, splitState);
+            }
+        } else if (isDataChangeRecord(element)) {
+            if (splitState.isStreamSplitState()) {
+                updatePositionForStreamSplit(element, splitState);
+            }
+            reportMetrics(element);
+            emitElement(element, output);
+        } else {
+            // unknown element
+            LOG.info("Meet unknown element {}, just skip.", element);
+        }
+    }
+
+    private void updatePositionForStreamSplit(SourceRecord element, 
SourceSplitState splitState) {
+        BsonDocument resumeToken = getResumeToken(element);
+        StreamSplitState streamSplitState = splitState.asStreamSplitState();
+        ChangeStreamOffset offset = (ChangeStreamOffset) 
streamSplitState.getStartingOffset();
+        if (offset != null) {
+            offset.updatePosition(resumeToken);
+        }
+        splitState.asStreamSplitState().setStartingOffset(offset);
+    }
+
+    @Override
+    protected void reportMetrics(SourceRecord element) {
+        long now = System.currentTimeMillis();
+        // record the latest process time
+        sourceReaderMetrics.recordProcessTime(now);
+        Long messageTimestamp = getMessageTimestamp(element);
+
+        if (messageTimestamp != null && messageTimestamp > 0L) {
+            // report fetch delay
+            Long fetchTimestamp = getFetchTimestamp(element);
+            if (fetchTimestamp != null && fetchTimestamp >= messageTimestamp) {
+                sourceReaderMetrics.recordFetchDelay(fetchTimestamp - 
messageTimestamp);
+            }
+            // report emit delay
+            sourceReaderMetrics.recordEmitDelay(now - messageTimestamp);
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBSource.java
new file mode 100644
index 0000000000..32a149a229
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBSource.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.mongodb.source;
+
+import org.apache.inlong.sort.mongodb.DebeziumDeserializationSchema;
+
+import com.ververica.cdc.connectors.base.config.SourceConfig;
+import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords;
+import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitState;
+import com.ververica.cdc.connectors.base.source.metrics.SourceReaderMetrics;
+import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceConfig;
+import 
com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceConfigFactory;
+import com.ververica.cdc.connectors.mongodb.source.dialect.MongoDBDialect;
+import 
com.ververica.cdc.connectors.mongodb.source.offset.ChangeStreamOffsetFactory;
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+
+/**
+ * The MongoDB CDC Source based on FLIP-27 which supports parallel reading 
snapshot of collection
+ * and then continue to capture data change from change stream.
+ *
+ * <pre>
+ *     1. The source supports parallel capturing database(s) or collection(s) 
change.
+ *     2. The source supports checkpoint in split level when read snapshot 
data.
+ *     3. The source doesn't need apply any lock of MongoDB.
+ * </pre>
+ *
+ * <pre>{@code
+ * MongoDBSource
+ *     .<String>builder()
+ *     .hosts("localhost:27017")
+ *     .databaseList("mydb")
+ *     .collectionList("mydb.users")
+ *     .username(username)
+ *     .password(password)
+ *     .deserializer(new JsonDebeziumDeserializationSchema())
+ *     .build();
+ * }</pre>
+ *
+ * <p>See {@link MongoDBSourceBuilder} for more details.
+ *
+ * @param <T> the output type of the source.
+ * <p>
+ * Copy from com.ververica:flink-connector-mongodb-cdc-2.3.0
+ */
+@Internal
+@Experimental
+public class MongoDBSource<T> extends IncrementalSource<T, 
MongoDBSourceConfig> {
+
+    private static final long serialVersionUID = 1L;
+
+    MongoDBSource(
+            MongoDBSourceConfigFactory configFactory,
+            DebeziumDeserializationSchema<T> deserializationSchema) {
+        super(
+                configFactory,
+                deserializationSchema,
+                new ChangeStreamOffsetFactory(),
+                new MongoDBDialect());
+    }
+
+    /**
+     * Get a MongoDBSourceBuilder to build a {@link MongoDBSource}.
+     *
+     * @return a MongoDB parallel source builder.
+     */
+    @PublicEvolving
+    public static <T> MongoDBSourceBuilder<T> builder() {
+        return new MongoDBSourceBuilder<>();
+    }
+
+    @Override
+    protected RecordEmitter<SourceRecords, T, SourceSplitState> 
createRecordEmitter(
+            SourceConfig sourceConfig, SourceReaderMetrics 
sourceReaderMetrics) {
+        return new MongoDBRecordEmitter<>(
+                deserializationSchema, sourceReaderMetrics, offsetFactory);
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBSourceBuilder.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBSourceBuilder.java
new file mode 100644
index 0000000000..a95f238a0b
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBSourceBuilder.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.mongodb.source;
+
+import org.apache.inlong.sort.mongodb.DebeziumDeserializationSchema;
+
+import com.ververica.cdc.connectors.base.options.StartupOptions;
+import 
com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceConfigFactory;
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The builder class for {@link MongoDBSource} to make it easier for the users 
to construct a {@link
+ * MongoDBSource}.
+ *
+ * <pre>{@code
+ * MongoDBSource
+ *     .<String>builder()
+ *     .hosts("localhost:27017")
+ *     .databaseList("mydb")
+ *     .collectionList("mydb.users")
+ *     .username(username)
+ *     .password(password)
+ *     .deserializer(new JsonDebeziumDeserializationSchema())
+ *     .build();
+ * }</pre>
+ *
+ * <p>Check the Java docs of each individual method to learn more about the 
settings to build a
+ * {@link MongoDBSource}.
+ * <p>
+ * Copy from com.ververica:flink-connector-mongodb-cdc-2.3.0
+ */
+@Experimental
+@PublicEvolving
+public class MongoDBSourceBuilder<T> {
+
+    private final MongoDBSourceConfigFactory configFactory = new 
MongoDBSourceConfigFactory();
+    private DebeziumDeserializationSchema<T> deserializer;
+
+    /** The comma-separated list of hostname and port pairs of mongodb 
servers. */
+    public MongoDBSourceBuilder<T> hosts(String hosts) {
+        this.configFactory.hosts(hosts);
+        return this;
+    }
+
+    /**
+     * Ampersand (i.e. &) separated MongoDB connection options eg
+     * replicaSet=test&connectTimeoutMS=300000
+     * 
https://docs.mongodb.com/manual/reference/connection-string/#std-label-connections-connection-options
+     */
+    public MongoDBSourceBuilder<T> connectionOptions(String connectionOptions) 
{
+        this.configFactory.connectionOptions(connectionOptions);
+        return this;
+    }
+
+    /** Name of the database user to be used when connecting to MongoDB. */
+    public MongoDBSourceBuilder<T> username(String username) {
+        this.configFactory.username(username);
+        return this;
+    }
+
+    /** Password to be used when connecting to MongoDB. */
+    public MongoDBSourceBuilder<T> password(String password) {
+        this.configFactory.password(password);
+        return this;
+    }
+
+    /** Regular expressions list that match database names to be monitored. */
+    public MongoDBSourceBuilder<T> databaseList(String... databases) {
+        this.configFactory.databaseList(databases);
+        return this;
+    }
+
+    /**
+     * Regular expressions that match fully-qualified collection identifiers 
for collections to be
+     * monitored. Each identifier is of the form {@code 
<databaseName>.<collectionName>}.
+     */
+    public MongoDBSourceBuilder<T> collectionList(String... collections) {
+        this.configFactory.collectionList(collections);
+        return this;
+    }
+
+    /**
+     * batch.size
+     *
+     * <p>The cursor batch size. Default: 1024
+     */
+    public MongoDBSourceBuilder<T> batchSize(int batchSize) {
+        this.configFactory.batchSize(batchSize);
+        return this;
+    }
+
+    /**
+     * poll.await.time.ms
+     *
+     * <p>The amount of time to wait before checking for new results on the 
change stream. Default:
+     * 1000
+     */
+    public MongoDBSourceBuilder<T> pollAwaitTimeMillis(int 
pollAwaitTimeMillis) {
+        checkArgument(pollAwaitTimeMillis > 0);
+        this.configFactory.pollAwaitTimeMillis(pollAwaitTimeMillis);
+        return this;
+    }
+
+    /**
+     * poll.max.batch.size
+     *
+     * <p>Maximum number of change stream documents to include in a single 
batch when polling for
+     * new data. This setting can be used to limit the amount of data buffered 
internally in the
+     * connector. Default: 1024
+     */
+    public MongoDBSourceBuilder<T> pollMaxBatchSize(int pollMaxBatchSize) {
+        this.configFactory.pollMaxBatchSize(pollMaxBatchSize);
+        return this;
+    }
+
+    /**
+     * copy.existing
+     *
+     * <p>Copy existing data from source collections and convert them to 
Change Stream events on
+     * their respective topics. Any changes to the data that occur during the 
copy process are
+     * applied once the copy is completed.
+     */
+    public MongoDBSourceBuilder<T> copyExisting(boolean copyExisting) {
+        if (copyExisting) {
+            this.configFactory.startupOptions(StartupOptions.initial());
+        } else {
+            this.configFactory.startupOptions(StartupOptions.latest());
+        }
+        return this;
+    }
+
+    /**
+     * heartbeat.interval.ms
+     *
+     * <p>The length of time in milliseconds between sending heartbeat 
messages. Heartbeat messages
+     * contain the post batch resume token and are sent when no source records 
have been published
+     * in the specified interval. This improves the resumability of the 
connector for low volume
+     * namespaces. Use 0 to disable.
+     */
+    public MongoDBSourceBuilder<T> heartbeatIntervalMillis(int 
heartbeatIntervalMillis) {
+        this.configFactory.heartbeatIntervalMillis(heartbeatIntervalMillis);
+        return this;
+    }
+
+    /**
+     * scan.incremental.snapshot.chunk.size.mb
+     *
+     * <p>The chunk size mb of incremental snapshot. Default: 64mb.
+     */
+    public MongoDBSourceBuilder<T> splitSizeMB(int splitSizeMB) {
+        this.configFactory.splitSizeMB(splitSizeMB);
+        return this;
+    }
+
+    /**
+     * The group size of split meta, if the meta size exceeds the group size, 
the meta will be
+     * divided into multiple groups.
+     */
+    public MongoDBSourceBuilder<T> splitMetaGroupSize(int splitMetaGroupSize) {
+        this.configFactory.splitMetaGroupSize(splitMetaGroupSize);
+        return this;
+    }
+
+    /**
+     * The deserializer used to convert from consumed {@link
+     * org.apache.kafka.connect.source.SourceRecord}.
+     */
+    public MongoDBSourceBuilder<T> 
deserializer(DebeziumDeserializationSchema<T> deserializer) {
+        this.deserializer = deserializer;
+        return this;
+    }
+
+    /**
+     * Build the {@link MongoDBSource}.
+     *
+     * @return a MongoDBParallelSource with the settings made for this builder.
+     */
+    public MongoDBSource<T> build() {
+        configFactory.validate();
+        return new MongoDBSource<>(configFactory, checkNotNull(deserializer));
+    }
+}
diff --git a/licenses/inlong-sort-connectors/LICENSE 
b/licenses/inlong-sort-connectors/LICENSE
index 261b1c9f97..4af3c47722 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -857,8 +857,18 @@
     Source  : org.apache.flink:flink-connector-pulsar:4.1.0-1.18 (Please note 
that the software have been modified.)
     License : 
https://github.com/apache/flink-connector-pulsar/blob/main/LICENSE
 
-1.3.23 
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBConnectorDeserializationSchema.java
-       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodbMongoDBTableSource.java
+1.3.23 
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/IncrementalSource.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/IncrementalSourceRecordEmitter.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBRecordEmitter.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBSource.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBSourceBuilder.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumChangeFetcher.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumDeserializationSchema.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumSourceFunction.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBConnectorDeserializationSchema.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBSource.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBTableFactory.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBTableSource.java
     Source  : com.ververica:flink-connector-mongodb-cdc:2.3.0 (Please note 
that the software have been modified.)
     License : 
https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
 

Reply via email to