This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 57985782d0 [INLONG-10317][Sort] Make Kafka source support report audit 
information exactly once (#10550)
57985782d0 is described below

commit 57985782d0a074ad869d8061c3e870b6b18d3928
Author: XiaoYou201 <58425449+xiaoyou...@users.noreply.github.com>
AuthorDate: Wed Jul 3 10:10:55 2024 +0800

    [INLONG-10317][Sort] Make Kafka source support report audit information 
exactly once (#10550)
---
 .../sort-flink-v1.15/sort-connectors/kafka/pom.xml |   1 +
 .../inlong/sort/kafka/source/KafkaSource.java      | 239 +++++++++
 .../sort/kafka/source/KafkaSourceBuilder.java      | 534 +++++++++++++++++++++
 .../kafka/source/reader/KafkaSourceReader.java     | 217 +++++++++
 .../table/DynamicKafkaDeserializationSchema.java   |  81 ++--
 .../sort/kafka/table/KafkaDynamicSource.java       |  11 +-
 licenses/inlong-sort-connectors/LICENSE            |   3 +
 7 files changed, 1047 insertions(+), 39 deletions(-)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml
index 019b0f456c..2ecb22476e 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml
@@ -84,6 +84,7 @@
                                     
<include>org.apache.httpcomponents:*</include>
                                     
<include>org.apache.commons:commons-lang3</include>
                                     <include>com.google.protobuf:*</include>
+                                    <include>com.google.guava:*</include>
                                     <include>joda-time:*</include>
                                     
<include>com.fasterxml.jackson.core:*</include>
                                     <include>com.amazonaws:*</include>
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSource.java
new file mode 100644
index 0000000000..5004ec34a3
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSource.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka.source;
+
+import org.apache.inlong.sort.kafka.source.reader.KafkaSourceReader;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState;
+import 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer;
+import 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator;
+import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import 
org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
+import 
org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics;
+import 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader;
+import org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter;
+import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import 
org.apache.flink.connector.kafka.source.reader.fetcher.KafkaSourceFetcherManager;
+import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
+import 
org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.UserCodeClassLoader;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/**
+ * The Source implementation of Kafka. Please use a {@link KafkaSourceBuilder} 
to construct a {@link
+ * KafkaSource}. The following example shows how to create a KafkaSource 
emitting records of <code>
+ * String</code> type.
+ *
+ * <pre>{@code
+ * KafkaSource<String> source = KafkaSource
+ *     .<String>builder()
+ *     .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
+ *     .setGroupId("MyGroup")
+ *     .setTopics(Arrays.asList(TOPIC1, TOPIC2))
+ *     .setDeserializer(new TestingKafkaRecordDeserializationSchema())
+ *     .setStartingOffsets(OffsetsInitializer.earliest())
+ *     .build();
+ * }</pre>
+ *
+ * <p>See {@link KafkaSourceBuilder} for more details.
+ *
+ * @param <OUT> the output type of the source.
+ * Copy from org.apache.flink:flink-connector-kafka:1.15.4
+ * Add a variable metricSchema to report audit information
+ */
+@PublicEvolving
+public class KafkaSource<OUT>
+        implements
+            Source<OUT, KafkaPartitionSplit, KafkaSourceEnumState>,
+            ResultTypeQueryable<OUT> {
+
+    private static final long serialVersionUID = -8755372893283732098L;
+    // Users can choose only one of the following ways to specify the topics 
to consume from.
+    private final KafkaSubscriber subscriber;
+    // Users can specify the starting / stopping offset initializer.
+    private final OffsetsInitializer startingOffsetsInitializer;
+    private final OffsetsInitializer stoppingOffsetsInitializer;
+    // Boundedness
+    private final Boundedness boundedness;
+    private final KafkaRecordDeserializationSchema<OUT> deserializationSchema;
+    private final KafkaDeserializationSchema<RowData> metricSchema;
+    // The configurations.
+    private final Properties props;
+
+    KafkaSource(
+            KafkaSubscriber subscriber,
+            OffsetsInitializer startingOffsetsInitializer,
+            @Nullable OffsetsInitializer stoppingOffsetsInitializer,
+            Boundedness boundedness,
+            KafkaRecordDeserializationSchema<OUT> deserializationSchema,
+            KafkaDeserializationSchema<RowData> metricSchema,
+            Properties props) {
+        this.subscriber = subscriber;
+        this.startingOffsetsInitializer = startingOffsetsInitializer;
+        this.stoppingOffsetsInitializer = stoppingOffsetsInitializer;
+        this.boundedness = boundedness;
+        this.deserializationSchema = deserializationSchema;
+        this.metricSchema = metricSchema;
+        this.props = props;
+    }
+
+    /**
+     * Get a kafkaSourceBuilder to build a {@link KafkaSource}.
+     *
+     * @return a Kafka source builder.
+     */
+    public static <OUT> KafkaSourceBuilder<OUT> builder() {
+        return new KafkaSourceBuilder<>();
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return this.boundedness;
+    }
+
+    @Internal
+    @Override
+    public SourceReader<OUT, KafkaPartitionSplit> 
createReader(SourceReaderContext readerContext)
+            throws Exception {
+        return createReader(readerContext, (ignore) -> {
+        });
+    }
+
+    @VisibleForTesting
+    SourceReader<OUT, KafkaPartitionSplit> createReader(
+            SourceReaderContext readerContext, Consumer<Collection<String>> 
splitFinishedHook)
+            throws Exception {
+        
FutureCompletingBlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], 
byte[]>>> elementsQueue =
+                new FutureCompletingBlockingQueue<>();
+        deserializationSchema.open(
+                new DeserializationSchema.InitializationContext() {
+
+                    @Override
+                    public MetricGroup getMetricGroup() {
+                        return 
readerContext.metricGroup().addGroup("deserializer");
+                    }
+
+                    @Override
+                    public UserCodeClassLoader getUserCodeClassLoader() {
+                        return readerContext.getUserCodeClassLoader();
+                    }
+                });
+        final KafkaSourceReaderMetrics kafkaSourceReaderMetrics =
+                new KafkaSourceReaderMetrics(readerContext.metricGroup());
+
+        Supplier<KafkaPartitionSplitReader> splitReaderSupplier =
+                () -> new KafkaPartitionSplitReader(props, readerContext, 
kafkaSourceReaderMetrics);
+        KafkaRecordEmitter<OUT> recordEmitter = new 
KafkaRecordEmitter<>(deserializationSchema);
+
+        return new KafkaSourceReader<>(
+                elementsQueue,
+                new KafkaSourceFetcherManager(
+                        elementsQueue, splitReaderSupplier::get, 
splitFinishedHook),
+                recordEmitter,
+                toConfiguration(props),
+                readerContext,
+                kafkaSourceReaderMetrics,
+                metricSchema);
+    }
+
+    @Internal
+    @Override
+    public SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState> 
createEnumerator(
+            SplitEnumeratorContext<KafkaPartitionSplit> enumContext) {
+        return new KafkaSourceEnumerator(
+                subscriber,
+                startingOffsetsInitializer,
+                stoppingOffsetsInitializer,
+                props,
+                enumContext,
+                boundedness);
+    }
+
+    @Internal
+    @Override
+    public SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState> 
restoreEnumerator(
+            SplitEnumeratorContext<KafkaPartitionSplit> enumContext,
+            KafkaSourceEnumState checkpoint)
+            throws IOException {
+        return new KafkaSourceEnumerator(
+                subscriber,
+                startingOffsetsInitializer,
+                stoppingOffsetsInitializer,
+                props,
+                enumContext,
+                boundedness,
+                checkpoint.assignedPartitions());
+    }
+
+    @Internal
+    @Override
+    public SimpleVersionedSerializer<KafkaPartitionSplit> getSplitSerializer() 
{
+        return new KafkaPartitionSplitSerializer();
+    }
+
+    @Internal
+    @Override
+    public SimpleVersionedSerializer<KafkaSourceEnumState> 
getEnumeratorCheckpointSerializer() {
+        return new KafkaSourceEnumStateSerializer();
+    }
+
+    @Override
+    public TypeInformation<OUT> getProducedType() {
+        return deserializationSchema.getProducedType();
+    }
+
+    // ----------- private helper methods ---------------
+
+    private Configuration toConfiguration(Properties props) {
+        Configuration config = new Configuration();
+        props.stringPropertyNames().forEach(key -> config.setString(key, 
props.getProperty(key)));
+        return config;
+    }
+
+    @VisibleForTesting
+    Configuration getConfiguration() {
+        return toConfiguration(props);
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSourceBuilder.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSourceBuilder.java
new file mode 100644
index 0000000000..58bb651b24
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSourceBuilder.java
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
+import 
org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer;
+import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializerValidator;
+import 
org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
+import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The @builder class for {@link KafkaSource} to make it easier for the users 
to construct a {@link
+ * KafkaSource}.
+ *
+ * <p>The following example shows the minimum setup to create a KafkaSource 
that reads the String
+ * values from a Kafka topic.
+ *
+ * <pre>{@code
+ * KafkaSource<String> source = KafkaSource
+ *     .<String>builder()
+ *     .setBootstrapServers(MY_BOOTSTRAP_SERVERS)
+ *     .setTopics(Arrays.asList(TOPIC1, TOPIC2))
+ *     
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
+ *     .build();
+ * }</pre>
+ *
+ * <p>The bootstrap servers, topics/partitions to consume, and the record 
deserializer are required
+ * fields that must be set.
+ *
+ * <p>To specify the starting offsets of the KafkaSource, one can call {@link
+ * #setStartingOffsets(OffsetsInitializer)}.
+ *
+ * <p>By default the KafkaSource runs in an {@link 
Boundedness#CONTINUOUS_UNBOUNDED} mode and never
+ * stops until the Flink job is canceled or fails. To let the KafkaSource run 
in {@link
+ * Boundedness#CONTINUOUS_UNBOUNDED} but stops at some given offsets, one can 
call {@link
+ * #setUnbounded(OffsetsInitializer)}. For example the following KafkaSource 
stops after it consumes
+ * up to the latest partition offsets at the point when the Flink started.
+ *
+ * <pre>{@code
+ * KafkaSource<String> source = KafkaSource
+ *     .<String>builder()
+ *     .setBootstrapServers(MY_BOOTSTRAP_SERVERS)
+ *     .setTopics(Arrays.asList(TOPIC1, TOPIC2))
+ *     
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
+ *     .setUnbounded(OffsetsInitializer.latest())
+ *     .build();
+ * }</pre>
+ *
+ * <p>Check the Java docs of each individual methods to learn more about the 
settings to build a
+ * KafkaSource.
+ * Copy from org.apache.flink:flink-connector-kafka:1.15.4
+ * Add a variable metricSchema to report audit information
+ */
+@PublicEvolving
+public class KafkaSourceBuilder<OUT> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaSourceBuilder.class);
+    private static final String[] REQUIRED_CONFIGS = 
{ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG};
+    // The subscriber specifies the partitions to subscribe to.
+    private KafkaSubscriber subscriber;
+    // Users can specify the starting / stopping offset initializer.
+    private OffsetsInitializer startingOffsetsInitializer;
+    private OffsetsInitializer stoppingOffsetsInitializer;
+    // Boundedness
+    private Boundedness boundedness;
+    private KafkaRecordDeserializationSchema<OUT> deserializationSchema;
+    private KafkaDeserializationSchema<RowData> metricSchema;
+    // The configurations.
+    protected Properties props;
+
+    KafkaSourceBuilder() {
+        this.subscriber = null;
+        this.startingOffsetsInitializer = OffsetsInitializer.earliest();
+        this.stoppingOffsetsInitializer = new NoStoppingOffsetsInitializer();
+        this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
+        this.deserializationSchema = null;
+        this.metricSchema = null;
+        this.props = new Properties();
+    }
+
+    /**
+     * Sets the bootstrap servers for the KafkaConsumer of the KafkaSource.
+     *
+     * @param bootstrapServers the bootstrap servers of the Kafka cluster.
+     * @return this KafkaSourceBuilder.
+     */
+    public KafkaSourceBuilder<OUT> setBootstrapServers(String 
bootstrapServers) {
+        return setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
+    }
+
+    /**
+     * Sets the consumer group id of the KafkaSource.
+     *
+     * @param groupId the group id of the KafkaSource.
+     * @return this KafkaSourceBuilder.
+     */
+    public KafkaSourceBuilder<OUT> setGroupId(String groupId) {
+        return setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+    }
+
+    /**
+     * Set a list of topics the KafkaSource should consume from. All the 
topics in the list should
+     * have existed in the Kafka cluster. Otherwise an exception will be 
thrown. To allow some of
+     * the topics to be created lazily, please use {@link 
#setTopicPattern(Pattern)} instead.
+     *
+     * @param topics the list of topics to consume from.
+     * @return this KafkaSourceBuilder.
+     * @see 
org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Collection)
+     */
+    public KafkaSourceBuilder<OUT> setTopics(List<String> topics) {
+        ensureSubscriberIsNull("topics");
+        subscriber = KafkaSubscriber.getTopicListSubscriber(topics);
+        return this;
+    }
+
+    /**
+     * Set a list of topics the KafkaSource should consume from. All the 
topics in the list should
+     * have existed in the Kafka cluster. Otherwise an exception will be 
thrown. To allow some of
+     * the topics to be created lazily, please use {@link 
#setTopicPattern(Pattern)} instead.
+     *
+     * @param topics the list of topics to consume from.
+     * @return this KafkaSourceBuilder.
+     * @see 
org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Collection)
+     */
+    public KafkaSourceBuilder<OUT> setTopics(String... topics) {
+        return setTopics(Arrays.asList(topics));
+    }
+
+    /**
+     * Set a topic pattern to consume from use the java {@link Pattern}.
+     *
+     * @param topicPattern the pattern of the topic name to consume from.
+     * @return this KafkaSourceBuilder.
+     * @see org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Pattern)
+     */
+    public KafkaSourceBuilder<OUT> setTopicPattern(Pattern topicPattern) {
+        ensureSubscriberIsNull("topic pattern");
+        subscriber = KafkaSubscriber.getTopicPatternSubscriber(topicPattern);
+        return this;
+    }
+
+    /**
+     * Set a set of partitions to consume from.
+     *
+     * @param partitions the set of partitions to consume from.
+     * @return this KafkaSourceBuilder.
+     * @see org.apache.kafka.clients.consumer.KafkaConsumer#assign(Collection)
+     */
+    public KafkaSourceBuilder<OUT> setPartitions(Set<TopicPartition> 
partitions) {
+        ensureSubscriberIsNull("partitions");
+        subscriber = KafkaSubscriber.getPartitionSetSubscriber(partitions);
+        return this;
+    }
+
+    /**
+     * Specify from which offsets the KafkaSource should start consume from by 
providing an {@link
+     * OffsetsInitializer}.
+     *
+     * <p>The following {@link OffsetsInitializer}s are commonly used and 
provided out of the box.
+     * Users can also implement their own {@link OffsetsInitializer} for 
custom behaviors.
+     *
+     * <ul>
+     *   <li>{@link OffsetsInitializer#earliest()} - starting from the 
earliest offsets. This is
+     *       also the default {@link OffsetsInitializer} of the KafkaSource 
for starting offsets.
+     *   <li>{@link OffsetsInitializer#latest()} - starting from the latest 
offsets.
+     *   <li>{@link OffsetsInitializer#committedOffsets()} - starting from the 
committed offsets of
+     *       the consumer group.
+     *   <li>{@link
+     *       
OffsetsInitializer#committedOffsets(org.apache.kafka.clients.consumer.OffsetResetStrategy)}
+     *       - starting from the committed offsets of the consumer group. If 
there is no committed
+     *       offsets, starting from the offsets specified by the {@link
+     *       org.apache.kafka.clients.consumer.OffsetResetStrategy 
OffsetResetStrategy}.
+     *   <li>{@link OffsetsInitializer#offsets(Map)} - starting from the 
specified offsets for each
+     *       partition.
+     *   <li>{@link OffsetsInitializer#timestamp(long)} - starting from the 
specified timestamp for
+     *       each partition. Note that the guarantee here is that all the 
records in Kafka whose
+     *       {@link 
org.apache.kafka.clients.consumer.ConsumerRecord#timestamp()} is greater than
+     *       the given starting timestamp will be consumed. However, it is 
possible that some
+     *       consumer records whose timestamp is smaller than the given 
starting timestamp are also
+     *       consumed.
+     * </ul>
+     *
+     * @param startingOffsetsInitializer the {@link OffsetsInitializer} 
setting the starting offsets
+     *     for the Source.
+     * @return this KafkaSourceBuilder.
+     */
+    public KafkaSourceBuilder<OUT> setStartingOffsets(
+            OffsetsInitializer startingOffsetsInitializer) {
+        this.startingOffsetsInitializer = startingOffsetsInitializer;
+        return this;
+    }
+
+    /**
+     * By default the KafkaSource is set to run in {@link 
Boundedness#CONTINUOUS_UNBOUNDED} manner
+     * and thus never stops until the Flink job fails or is canceled. To let 
the KafkaSource run as
+     * a streaming source but still stops at some point, one can set an {@link 
OffsetsInitializer}
+     * to specify the stopping offsets for each partition. When all the 
partitions have reached
+     * their stopping offsets, the KafkaSource will then exit.
+     *
+     * <p>This method is different from {@link 
#setBounded(OffsetsInitializer)} that after setting
+     * the stopping offsets with this method, {@link 
KafkaSource#getBoundedness()} will still return
+     * {@link Boundedness#CONTINUOUS_UNBOUNDED} even though it will stop at 
the stopping offsets
+     * specified by the stopping offsets {@link OffsetsInitializer}.
+     *
+     * <p>The following {@link OffsetsInitializer} are commonly used and 
provided out of the box.
+     * Users can also implement their own {@link OffsetsInitializer} for 
custom behaviors.
+     *
+     * <ul>
+     *   <li>{@link OffsetsInitializer#latest()} - stop at the latest offsets 
of the partitions when
+     *       the KafkaSource starts to run.
+     *   <li>{@link OffsetsInitializer#committedOffsets()} - stops at the 
committed offsets of the
+     *       consumer group.
+     *   <li>{@link OffsetsInitializer#offsets(Map)} - stops at the specified 
offsets for each
+     *       partition.
+     *   <li>{@link OffsetsInitializer#timestamp(long)} - stops at the 
specified timestamp for each
+     *       partition. The guarantee of setting the stopping timestamp is 
that no Kafka records
+     *       whose {@link 
org.apache.kafka.clients.consumer.ConsumerRecord#timestamp()} is greater
+     *       than the given stopping timestamp will be consumed. However, it 
is possible that some
+     *       records whose timestamp is smaller than the specified stopping 
timestamp are not
+     *       consumed.
+     * </ul>
+     *
+     * @param stoppingOffsetsInitializer The {@link OffsetsInitializer} to 
specify the stopping
+     *     offset.
+     * @return this KafkaSourceBuilder.
+     * @see #setBounded(OffsetsInitializer)
+     */
+    public KafkaSourceBuilder<OUT> setUnbounded(OffsetsInitializer 
stoppingOffsetsInitializer) {
+        this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
+        this.stoppingOffsetsInitializer = stoppingOffsetsInitializer;
+        return this;
+    }
+
+    /**
+     * By default the KafkaSource is set to run in {@link 
Boundedness#CONTINUOUS_UNBOUNDED} manner
+     * and thus never stops until the Flink job fails or is canceled. To let 
the KafkaSource run in
+     * {@link Boundedness#BOUNDED} manner and stops at some point, one can set 
an {@link
+     * OffsetsInitializer} to specify the stopping offsets for each partition. 
When all the
+     * partitions have reached their stopping offsets, the KafkaSource will 
then exit.
+     *
+     * <p>This method is different from {@link 
#setUnbounded(OffsetsInitializer)} that after setting
+     * the stopping offsets with this method, {@link 
KafkaSource#getBoundedness()} will return
+     * {@link Boundedness#BOUNDED} instead of {@link 
Boundedness#CONTINUOUS_UNBOUNDED}.
+     *
+     * <p>The following {@link OffsetsInitializer} are commonly used and 
provided out of the box.
+     * Users can also implement their own {@link OffsetsInitializer} for 
custom behaviors.
+     *
+     * <ul>
+     *   <li>{@link OffsetsInitializer#latest()} - stop at the latest offsets 
of the partitions when
+     *       the KafkaSource starts to run.
+     *   <li>{@link OffsetsInitializer#committedOffsets()} - stops at the 
committed offsets of the
+     *       consumer group.
+     *   <li>{@link OffsetsInitializer#offsets(Map)} - stops at the specified 
offsets for each
+     *       partition.
+     *   <li>{@link OffsetsInitializer#timestamp(long)} - stops at the 
specified timestamp for each
+     *       partition. The guarantee of setting the stopping timestamp is 
that no Kafka records
+     *       whose {@link 
org.apache.kafka.clients.consumer.ConsumerRecord#timestamp()} is greater
+     *       than the given stopping timestamp will be consumed. However, it 
is possible that some
+     *       records whose timestamp is smaller than the specified stopping 
timestamp are not
+     *       consumed.
+     * </ul>
+     *
+     * @param stoppingOffsetsInitializer the {@link OffsetsInitializer} to 
specify the stopping
+     *     offsets.
+     * @return this KafkaSourceBuilder.
+     * @see #setUnbounded(OffsetsInitializer)
+     */
+    public KafkaSourceBuilder<OUT> setBounded(OffsetsInitializer 
stoppingOffsetsInitializer) {
+        this.boundedness = Boundedness.BOUNDED;
+        this.stoppingOffsetsInitializer = stoppingOffsetsInitializer;
+        return this;
+    }
+
+    /**
+     * Sets the {@link KafkaRecordDeserializationSchema deserializer} of the 
{@link
+     * org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} for 
KafkaSource.
+     *
+     * @param recordDeserializer the deserializer for Kafka {@link
+     *     org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord}.
+     * @return this KafkaSourceBuilder.
+     */
+    public KafkaSourceBuilder<OUT> setDeserializer(
+            KafkaRecordDeserializationSchema<OUT> recordDeserializer) {
+        this.deserializationSchema = recordDeserializer;
+        return this;
+    }
+
+    /**
+     * Sets the {@link KafkaRecordDeserializationSchema deserializer} of the 
{@link
+     * org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} for 
KafkaSource. The given
+     * {@link DeserializationSchema} will be used to deserialize the value of 
ConsumerRecord. The
+     * other information (e.g. key) in a ConsumerRecord will be ignored.
+     *
+     * @param deserializationSchema the {@link DeserializationSchema} to use 
for deserialization.
+     * @return this KafkaSourceBuilder.
+     */
+    public KafkaSourceBuilder<OUT> setValueOnlyDeserializer(
+            DeserializationSchema<OUT> deserializationSchema) {
+        this.deserializationSchema =
+                
KafkaRecordDeserializationSchema.valueOnly(deserializationSchema);
+        return this;
+    }
+
+    /**
+     * Sets the client id prefix of this KafkaSource.
+     *
+     * @param prefix the client id prefix to use for this KafkaSource.
+     * @return this KafkaSourceBuilder.
+     */
+    public KafkaSourceBuilder<OUT> setClientIdPrefix(String prefix) {
+        return setProperty(KafkaSourceOptions.CLIENT_ID_PREFIX.key(), prefix);
+    }
+
+    /**
+     * Set an arbitrary property for the KafkaSource and KafkaConsumer. The 
valid keys can be found
+     * in {@link ConsumerConfig} and {@link KafkaSourceOptions}.
+     *
+     * <p>Note that the following keys will be overridden by the builder when 
the KafkaSource is
+     * created.
+     *
+     * <ul>
+     *   <li><code>key.deserializer</code> is always set to {@link 
ByteArrayDeserializer}.
+     *   <li><code>value.deserializer</code> is always set to {@link 
ByteArrayDeserializer}.
+     *   <li><code>auto.offset.reset.strategy</code> is overridden by {@link
+     *       OffsetsInitializer#getAutoOffsetResetStrategy()} for the starting 
offsets, which is by
+     *       default {@link OffsetsInitializer#earliest()}.
+     *   <li><code>partition.discovery.interval.ms</code> is overridden to -1 
when {@link
+     *       #setBounded(OffsetsInitializer)} has been invoked.
+     * </ul>
+     *
+     * @param key the key of the property.
+     * @param value the value of the property.
+     * @return this KafkaSourceBuilder.
+     */
+    public KafkaSourceBuilder<OUT> setProperty(String key, String value) {
+        props.setProperty(key, value);
+        return this;
+    }
+
+    /**
+     * Set arbitrary properties for the KafkaSource and KafkaConsumer. The 
valid keys can be found
+     * in {@link ConsumerConfig} and {@link KafkaSourceOptions}.
+     *
+     * <p>Note that the following keys will be overridden by the builder when 
the KafkaSource is
+     * created.
+     *
+     * <ul>
+     *   <li><code>key.deserializer</code> is always set to {@link 
ByteArrayDeserializer}.
+     *   <li><code>value.deserializer</code> is always set to {@link 
ByteArrayDeserializer}.
+     *   <li><code>auto.offset.reset.strategy</code> is overridden by {@link
+     *       OffsetsInitializer#getAutoOffsetResetStrategy()} for the starting 
offsets, which is by
+     *       default {@link OffsetsInitializer#earliest()}.
+     *   <li><code>partition.discovery.interval.ms</code> is overridden to -1 
when {@link
+     *       #setBounded(OffsetsInitializer)} has been invoked.
+     *   <li><code>client.id</code> is overridden to the 
"client.id.prefix-RANDOM_LONG", or
+     *       "group.id-RANDOM_LONG" if the client id prefix is not set.
+     * </ul>
+     *
+     * @param props the properties to set for the KafkaSource.
+     * @return this KafkaSourceBuilder.
+     */
+    public KafkaSourceBuilder<OUT> setProperties(Properties props) {
+        this.props.putAll(props);
+        return this;
+    }
+
+    public KafkaSourceBuilder<OUT> 
setMetricSchema(KafkaDeserializationSchema<RowData> metricSchema) {
+        this.metricSchema = metricSchema;
+        return this;
+    }
+
+    /**
+     * Build the {@link KafkaSource}.
+     *
+     * @return a KafkaSource with the settings made for this builder.
+     */
+    public KafkaSource<OUT> build() {
+        sanityCheck();
+        parseAndSetRequiredProperties();
+        return new KafkaSource<>(
+                subscriber,
+                startingOffsetsInitializer,
+                stoppingOffsetsInitializer,
+                boundedness,
+                deserializationSchema,
+                metricSchema,
+                props);
+    }
+
+    // ------------- private helpers --------------
+
+    private void ensureSubscriberIsNull(String attemptingSubscribeMode) {
+        if (subscriber != null) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Cannot use %s for consumption because a %s is 
already set for consumption.",
+                            attemptingSubscribeMode, 
subscriber.getClass().getSimpleName()));
+        }
+    }
+
+    private void parseAndSetRequiredProperties() {
+        maybeOverride(
+                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                ByteArrayDeserializer.class.getName(),
+                true);
+        maybeOverride(
+                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                ByteArrayDeserializer.class.getName(),
+                true);
+        if (!props.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
+            LOG.warn(
+                    "Offset commit on checkpoint is disabled because {} is not 
specified",
+                    ConsumerConfig.GROUP_ID_CONFIG);
+            
maybeOverride(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false", 
false);
+        }
+        maybeOverride(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", 
false);
+        maybeOverride(
+                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+                
startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase(),
+                true);
+
+        // If the source is bounded, do not run periodic partition discovery.
+        maybeOverride(
+                KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(),
+                "-1",
+                boundedness == Boundedness.BOUNDED);
+
+        // If the client id prefix is not set, reuse the consumer group id as 
the client id prefix,
+        // or generate a random string if consumer group id is not specified.
+        maybeOverride(
+                KafkaSourceOptions.CLIENT_ID_PREFIX.key(),
+                props.containsKey(ConsumerConfig.GROUP_ID_CONFIG)
+                        ? props.getProperty(ConsumerConfig.GROUP_ID_CONFIG)
+                        : "KafkaSource-" + new Random().nextLong(),
+                false);
+    }
+
+    private boolean maybeOverride(String key, String value, boolean override) {
+        boolean overridden = false;
+        String userValue = props.getProperty(key);
+        if (userValue != null) {
+            if (override) {
+                LOG.warn(
+                        String.format(
+                                "Property %s is provided but will be 
overridden from %s to %s",
+                                key, userValue, value));
+                props.setProperty(key, value);
+                overridden = true;
+            }
+        } else {
+            props.setProperty(key, value);
+        }
+        return overridden;
+    }
+
+    private void sanityCheck() {
+        // Check required configs.
+        for (String requiredConfig : REQUIRED_CONFIGS) {
+            checkNotNull(
+                    props.getProperty(requiredConfig),
+                    String.format("Property %s is required but not provided", 
requiredConfig));
+        }
+        // Check required settings.
+        checkNotNull(
+                subscriber,
+                "No subscribe mode is specified, "
+                        + "should be one of topics, topic pattern and 
partition set.");
+        checkNotNull(deserializationSchema, "Deserialization schema is 
required but not provided.");
+        // Check consumer group ID
+        checkState(
+                props.containsKey(ConsumerConfig.GROUP_ID_CONFIG) || 
!offsetCommitEnabledManually(),
+                String.format(
+                        "Property %s is required when offset commit is 
enabled",
+                        ConsumerConfig.GROUP_ID_CONFIG));
+        // Check offsets initializers
+        if (startingOffsetsInitializer instanceof OffsetsInitializerValidator) 
{
+            ((OffsetsInitializerValidator) 
startingOffsetsInitializer).validate(props);
+        }
+        if (stoppingOffsetsInitializer instanceof OffsetsInitializerValidator) 
{
+            ((OffsetsInitializerValidator) 
stoppingOffsetsInitializer).validate(props);
+        }
+    }
+
+    private boolean offsetCommitEnabledManually() {
+        boolean autoCommit =
+                props.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
+                        && Boolean.parseBoolean(
+                                
props.getProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
+        boolean commitOnCheckpoint =
+                
props.containsKey(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key())
+                        && Boolean.parseBoolean(
+                                props.getProperty(
+                                        
KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key()));
+        return autoCommit || commitOnCheckpoint;
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/reader/KafkaSourceReader.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/reader/KafkaSourceReader.java
new file mode 100644
index 0000000000..4643887c49
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/reader/KafkaSourceReader.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka.source.reader;
+
+import org.apache.inlong.sort.kafka.table.DynamicKafkaDeserializationSchema;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
+import 
org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics;
+import 
org.apache.flink.connector.kafka.source.reader.fetcher.KafkaSourceFetcherManager;
+import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
+import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitState;
+import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/** The source reader for Kafka partitions.
+ * Copy from org.apache.flink:flink-connector-kafka:1.15.4
+ * Add some method to make report audit information exactly once
+ * */
+@Internal
+public class KafkaSourceReader<T>
+        extends
+            SingleThreadMultiplexSourceReaderBase<ConsumerRecord<byte[], 
byte[]>, T, KafkaPartitionSplit, KafkaPartitionSplitState> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaSourceReader.class);
+    // These maps need to be concurrent because it will be accessed by both 
the main thread
+    // and the split fetcher thread in the callback.
+    private final SortedMap<Long, Map<TopicPartition, OffsetAndMetadata>> 
offsetsToCommit;
+    private final ConcurrentMap<TopicPartition, OffsetAndMetadata> 
offsetsOfFinishedSplits;
+    private final KafkaSourceReaderMetrics kafkaSourceReaderMetrics;
+    private final boolean commitOffsetsOnCheckpoint;
+    private final KafkaDeserializationSchema<RowData> metricSchema;
+
+    public KafkaSourceReader(
+            
FutureCompletingBlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], 
byte[]>>> elementsQueue,
+            KafkaSourceFetcherManager kafkaSourceFetcherManager,
+            RecordEmitter<ConsumerRecord<byte[], byte[]>, T, 
KafkaPartitionSplitState> recordEmitter,
+            Configuration config,
+            SourceReaderContext context,
+            KafkaSourceReaderMetrics kafkaSourceReaderMetrics,
+            KafkaDeserializationSchema<RowData> metricSchema) {
+        super(elementsQueue, kafkaSourceFetcherManager, recordEmitter, config, 
context);
+        this.offsetsToCommit = Collections.synchronizedSortedMap(new 
TreeMap<>());
+        this.offsetsOfFinishedSplits = new ConcurrentHashMap<>();
+        this.kafkaSourceReaderMetrics = kafkaSourceReaderMetrics;
+        this.commitOffsetsOnCheckpoint =
+                config.get(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT);
+        this.metricSchema = metricSchema;
+        if (!commitOffsetsOnCheckpoint) {
+            LOG.warn(
+                    "Offset commit on checkpoint is disabled. "
+                            + "Consuming offset will not be reported back to 
Kafka cluster.");
+        }
+    }
+
+    @Override
+    protected void onSplitFinished(Map<String, KafkaPartitionSplitState> 
finishedSplitIds) {
+        finishedSplitIds.forEach(
+                (ignored, splitState) -> {
+                    if (splitState.getCurrentOffset() >= 0) {
+                        offsetsOfFinishedSplits.put(
+                                splitState.getTopicPartition(),
+                                new 
OffsetAndMetadata(splitState.getCurrentOffset()));
+                    }
+                });
+    }
+
+    @Override
+    public List<KafkaPartitionSplit> snapshotState(long checkpointId) {
+        if (metricSchema instanceof DynamicKafkaDeserializationSchema) {
+            ((DynamicKafkaDeserializationSchema) 
metricSchema).updateCurrentCheckpointId(checkpointId);
+        }
+        List<KafkaPartitionSplit> splits = super.snapshotState(checkpointId);
+        if (!commitOffsetsOnCheckpoint) {
+            return splits;
+        }
+
+        if (splits.isEmpty() && offsetsOfFinishedSplits.isEmpty()) {
+            offsetsToCommit.put(checkpointId, Collections.emptyMap());
+        } else {
+            Map<TopicPartition, OffsetAndMetadata> offsetsMap =
+                    offsetsToCommit.computeIfAbsent(checkpointId, id -> new 
HashMap<>());
+            // Put the offsets of the active splits.
+            for (KafkaPartitionSplit split : splits) {
+                // If the checkpoint is triggered before the partition 
starting offsets
+                // is retrieved, do not commit the offsets for those 
partitions.
+                if (split.getStartingOffset() >= 0) {
+                    offsetsMap.put(
+                            split.getTopicPartition(),
+                            new OffsetAndMetadata(split.getStartingOffset()));
+                }
+            }
+            // Put offsets of all the finished splits.
+            offsetsMap.putAll(offsetsOfFinishedSplits);
+        }
+        return splits;
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        LOG.debug("Committing offsets for checkpoint {}", checkpointId);
+        if (!commitOffsetsOnCheckpoint) {
+            flushAudit(checkpointId);
+            return;
+        }
+
+        Map<TopicPartition, OffsetAndMetadata> committedPartitions =
+                offsetsToCommit.get(checkpointId);
+        if (committedPartitions == null) {
+            LOG.debug(
+                    "Offsets for checkpoint {} either do not exist or have 
already been committed.",
+                    checkpointId);
+            flushAudit(checkpointId);
+            return;
+        }
+
+        ((KafkaSourceFetcherManager) splitFetcherManager)
+                .commitOffsets(
+                        committedPartitions,
+                        (ignored, e) -> {
+                            // The offset commit here is needed by the 
external monitoring. It won't
+                            // break Flink job's correctness if we fail to 
commit the offset here.
+                            if (e != null) {
+                                kafkaSourceReaderMetrics.recordFailedCommit();
+                                LOG.warn(
+                                        "Failed to commit consumer offsets for 
checkpoint {}",
+                                        checkpointId,
+                                        e);
+                            } else {
+                                LOG.debug(
+                                        "Successfully committed offsets for 
checkpoint {}",
+                                        checkpointId);
+                                
kafkaSourceReaderMetrics.recordSucceededCommit();
+                                // If the finished topic partition has been 
committed, we remove it
+                                // from the offsets of the finished splits map.
+                                committedPartitions.forEach(
+                                        (tp, offset) -> 
kafkaSourceReaderMetrics.recordCommittedOffset(
+                                                tp, offset.offset()));
+                                offsetsOfFinishedSplits
+                                        .entrySet()
+                                        .removeIf(
+                                                entry -> 
committedPartitions.containsKey(
+                                                        entry.getKey()));
+                                while (!offsetsToCommit.isEmpty()
+                                        && offsetsToCommit.firstKey() <= 
checkpointId) {
+                                    
offsetsToCommit.remove(offsetsToCommit.firstKey());
+                                }
+                            }
+                        });
+        flushAudit(checkpointId);
+    }
+
+    private void flushAudit(long checkpointId) {
+        if (metricSchema instanceof DynamicKafkaDeserializationSchema) {
+            DynamicKafkaDeserializationSchema schema = 
(DynamicKafkaDeserializationSchema) metricSchema;
+            schema.flushAudit();
+            schema.updateLastCheckpointId(checkpointId);
+        }
+    }
+    @Override
+    protected KafkaPartitionSplitState initializedState(KafkaPartitionSplit 
split) {
+        return new KafkaPartitionSplitState(split);
+    }
+
+    @Override
+    protected KafkaPartitionSplit toSplitType(String splitId, 
KafkaPartitionSplitState splitState) {
+        return splitState.toKafkaPartitionSplit();
+    }
+
+    // ------------------------
+
+    @VisibleForTesting
+    SortedMap<Long, Map<TopicPartition, OffsetAndMetadata>> 
getOffsetsToCommit() {
+        return offsetsToCommit;
+    }
+
+    @VisibleForTesting
+    int getNumAliveFetchers() {
+        return splitFetcherManager.getNumAliveFetchers();
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
index 4406189081..28de46fb08 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
@@ -19,7 +19,7 @@ package org.apache.inlong.sort.kafka.table;
 
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricsCollector;
-import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.base.metric.SourceExactlyMetric;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -38,11 +38,13 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
+import static 
org.apache.inlong.sort.kafka.table.KafkaDynamicSource.ReadableMetadata.CONSUME_TIME;
+
 /** A specific {KafkaSerializationSchema} for {KafkaDynamicSource}.
  * <p>
  * Copy from org.apache.flink:flink-connector-kafka:1.15.4
  * */
-class DynamicKafkaDeserializationSchema implements 
KafkaDeserializationSchema<RowData> {
+public class DynamicKafkaDeserializationSchema implements 
KafkaDeserializationSchema<RowData> {
 
     private static final long serialVersionUID = 1L;
 
@@ -62,9 +64,7 @@ class DynamicKafkaDeserializationSchema implements 
KafkaDeserializationSchema<Ro
 
     private final MetricOption metricOption;
 
-    private SourceMetricData sourceMetricData;
-
-    private int consumeTimeIndex = -1;
+    private SourceExactlyMetric sourceExactlyMetric;
 
     DynamicKafkaDeserializationSchema(
             int physicalArity,
@@ -76,7 +76,8 @@ class DynamicKafkaDeserializationSchema implements 
KafkaDeserializationSchema<Ro
             MetadataConverter[] metadataConverters,
             TypeInformation<RowData> producedTypeInfo,
             boolean upsertMode,
-            MetricOption metricOption) {
+            MetricOption metricOption,
+            List<String> metadataKeys) {
         if (upsertMode) {
             Preconditions.checkArgument(
                     keyDeserialization != null && keyProjection.length > 0,
@@ -92,7 +93,8 @@ class DynamicKafkaDeserializationSchema implements 
KafkaDeserializationSchema<Ro
                         keyProjection,
                         valueProjection,
                         metadataConverters,
-                        upsertMode);
+                        upsertMode,
+                        metadataKeys);
         this.producedTypeInfo = producedTypeInfo;
         this.upsertMode = upsertMode;
         this.metricOption = metricOption;
@@ -105,13 +107,7 @@ class DynamicKafkaDeserializationSchema implements 
KafkaDeserializationSchema<Ro
         }
         valueDeserialization.open(context);
         if (metricOption != null) {
-            sourceMetricData = new SourceMetricData(metricOption);
-        }
-        for (int i = 0; i < outputCollector.metadataConverters.length; i++) {
-            if (outputCollector.metadataConverters[i]
-                    
.equals(KafkaDynamicSource.ReadableMetadata.CONSUME_TIME.converter)) {
-                consumeTimeIndex = i;
-            }
+            sourceExactlyMetric = new SourceExactlyMetric(metricOption);
         }
     }
 
@@ -121,7 +117,7 @@ class DynamicKafkaDeserializationSchema implements 
KafkaDeserializationSchema<Ro
     }
 
     @Override
-    public RowData deserialize(ConsumerRecord<byte[], byte[]> record) throws 
Exception {
+    public RowData deserialize(ConsumerRecord<byte[], byte[]> record) {
         throw new IllegalStateException("A collector is required for 
deserializing.");
     }
 
@@ -132,10 +128,9 @@ class DynamicKafkaDeserializationSchema implements 
KafkaDeserializationSchema<Ro
         // also not for a cartesian product with the keys
         if (keyDeserialization == null && !hasMetadata) {
             valueDeserialization.deserialize(record.value(),
-                    sourceMetricData == null ? collector : new 
MetricsCollector<>(collector, sourceMetricData));
+                    sourceExactlyMetric == null ? collector : new 
MetricsCollector<>(collector, sourceExactlyMetric));
             return;
         }
-
         // buffer key(s)
         if (keyDeserialization != null) {
             keyDeserialization.deserialize(record.key(), keyCollector);
@@ -143,10 +138,8 @@ class DynamicKafkaDeserializationSchema implements 
KafkaDeserializationSchema<Ro
         // project output while emitting values
         outputCollector.inputRecord = record;
         outputCollector.physicalKeyRows = keyCollector.buffer;
-        if (sourceMetricData != null) {
-            MetricsCollector<RowData> metricsCollector = new 
MetricsCollector<>(collector, sourceMetricData);
-            
metricsCollector.resetTimestamp(getRecordTime(outputCollector.metadataConverters,
 record));
-            outputCollector.outputCollector = metricsCollector;
+        if (sourceExactlyMetric != null) {
+            outputCollector.outputCollector = new 
MetricsCollector<>(collector, sourceExactlyMetric);
         } else {
             outputCollector.outputCollector = collector;
         }
@@ -159,14 +152,6 @@ class DynamicKafkaDeserializationSchema implements 
KafkaDeserializationSchema<Ro
         keyCollector.buffer.clear();
     }
 
-    private Long getRecordTime(MetadataConverter[] metadataConverters,
-            ConsumerRecord<byte[], byte[]> record) {
-        if (consumeTimeIndex == -1) {
-            return System.currentTimeMillis();
-        }
-        return (Long) metadataConverters[consumeTimeIndex].read(record);
-    }
-
     @Override
     public TypeInformation<RowData> getProducedType() {
         return producedTypeInfo;
@@ -198,6 +183,24 @@ class DynamicKafkaDeserializationSchema implements 
KafkaDeserializationSchema<Ro
         }
     }
 
+    public void flushAudit() {
+        if (sourceExactlyMetric != null) {
+            sourceExactlyMetric.flushAudit();
+        }
+    }
+
+    public void updateCurrentCheckpointId(long checkpointId) {
+        if (sourceExactlyMetric != null) {
+            sourceExactlyMetric.updateCurrentCheckpointId(checkpointId);
+        }
+    }
+
+    public void updateLastCheckpointId(long checkpointId) {
+        if (sourceExactlyMetric != null) {
+            sourceExactlyMetric.updateLastCheckpointId(checkpointId);
+        }
+    }
+
     // 
--------------------------------------------------------------------------------------------
 
     /**
@@ -236,30 +239,34 @@ class DynamicKafkaDeserializationSchema implements 
KafkaDeserializationSchema<Ro
 
         private transient Collector<RowData> outputCollector;
 
+        private final List<String> metadataKeys;
+
         OutputProjectionCollector(
                 int physicalArity,
                 int[] keyProjection,
                 int[] valueProjection,
                 MetadataConverter[] metadataConverters,
-                boolean upsertMode) {
+                boolean upsertMode,
+                List<String> metadataKeys) {
             this.physicalArity = physicalArity;
             this.keyProjection = keyProjection;
             this.valueProjection = valueProjection;
             this.metadataConverters = metadataConverters;
             this.upsertMode = upsertMode;
+            this.metadataKeys = metadataKeys;
         }
 
         @Override
         public void collect(RowData physicalValueRow) {
             // no key defined
             if (keyProjection.length == 0) {
-                emitRow(null, (GenericRowData) physicalValueRow);
+                emitRow(null, (GenericRowData) physicalValueRow, metadataKeys);
                 return;
             }
 
             // otherwise emit a value for each key
             for (RowData physicalKeyRow : physicalKeyRows) {
-                emitRow((GenericRowData) physicalKeyRow, (GenericRowData) 
physicalValueRow);
+                emitRow((GenericRowData) physicalKeyRow, (GenericRowData) 
physicalValueRow, metadataKeys);
             }
         }
 
@@ -270,7 +277,8 @@ class DynamicKafkaDeserializationSchema implements 
KafkaDeserializationSchema<Ro
 
         private void emitRow(
                 @Nullable GenericRowData physicalKeyRow,
-                @Nullable GenericRowData physicalValueRow) {
+                @Nullable GenericRowData physicalValueRow,
+                List<String> metadataKeys) {
             final RowKind rowKind;
             if (physicalValueRow == null) {
                 if (upsertMode) {
@@ -300,9 +308,14 @@ class DynamicKafkaDeserializationSchema implements 
KafkaDeserializationSchema<Ro
             }
 
             for (int metadataPos = 0; metadataPos < metadataArity; 
metadataPos++) {
+                Object metadata = 
metadataConverters[metadataPos].read(inputRecord);
                 producedRow.setField(
                         physicalArity + metadataPos,
-                        metadataConverters[metadataPos].read(inputRecord));
+                        metadata);
+                if (CONSUME_TIME.key.equals(metadataKeys.get(metadataPos)) &&
+                        outputCollector instanceof MetricsCollector) {
+                    ((MetricsCollector<RowData>) 
outputCollector).resetTimestamp((Long) metadata);
+                }
             }
             outputCollector.collect(producedRow);
         }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
index 322e6fc758..9b0b0aff64 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
@@ -18,6 +18,8 @@
 package org.apache.inlong.sort.kafka.table;
 
 import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.kafka.source.KafkaSource;
+import org.apache.inlong.sort.kafka.source.KafkaSourceBuilder;
 import 
org.apache.inlong.sort.kafka.table.DynamicKafkaDeserializationSchema.MetadataConverter;
 import org.apache.inlong.sort.protocol.node.ExtractNode;
 
@@ -26,8 +28,6 @@ import 
org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.connector.kafka.source.KafkaSource;
-import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
 import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
 import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -442,8 +442,8 @@ public class KafkaDynamicSource
         }
         kafkaSourceBuilder
                 .setProperties(properties)
-                
.setDeserializer(KafkaRecordDeserializationSchema.of(kafkaDeserializer));
-
+                
.setDeserializer(KafkaRecordDeserializationSchema.of(kafkaDeserializer))
+                .setMetricSchema(kafkaDeserializer);
         return kafkaSourceBuilder.build();
     }
 
@@ -502,7 +502,8 @@ public class KafkaDynamicSource
                 metadataConverters,
                 producedTypeInfo,
                 upsertMode,
-                metricOption);
+                metricOption,
+                metadataKeys);
     }
 
     private @Nullable DeserializationSchema<RowData> createDeserialization(
diff --git a/licenses/inlong-sort-connectors/LICENSE 
b/licenses/inlong-sort-connectors/LICENSE
index e9b2f0b4cd..e4c4590fd4 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -831,6 +831,9 @@
        
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/ReducingUpsertSink.java
        
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/ReducingUpsertWriter.java
        
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSource.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSourceBuilder.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/reader/KafkaSourceReader.java
   Source  : org.apache.flink:flink-connector-kafka:1.15.4 (Please note that 
the software have been modified.)
   License : https://github.com/apache/flink/blob/master/LICENSE
 

Reply via email to